77 "github.com/jackc/pgx/v5"
88 "github.com/jackc/pgx/v5/pgconn"
99 "github.com/jackc/pgx/v5/pgxpool"
10+ "github.com/jackc/pgxlisten"
1011 "gitlab.com/tozd/go/errors"
12+ "gitlab.com/tozd/go/x"
1113 "gitlab.com/tozd/identifier"
1214
1315 internal "gitlab.com/peerdb/peerdb/internal/store"
@@ -52,26 +54,40 @@ type Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata any] struct
5254 // for the session are deleted and session is ended.
5355 EndCallback func (ctx context.Context , session identifier.Identifier , metadata EndMetadata ) (EndMetadata , errors.E )
5456
57+ // AppendedSize is the size of the channel to which operations are send when they are appended.
58+ //
59+ // Set to a negative value to disable creating the channel.
60+ AppendedSize int `exhaustruct:"optional"`
61+
5562 // A channel to which operations are send when they are appended.
63+ // Operations are sent in the order in which they were appended to the database.
64+ //
65+ // Channel is created by the listener when started and recreated on reconnection.
66+ Appended x.RecreatableChannel [AppendedOperation ] `exhaustruct:"optional"`
67+
68+ // EndedSize is the size of the channel to which sessions are send when they end.
5669 //
57- // The order in which they are sent is not necessary the order in which
58- // they were appended. You should not rely on the order.
59- Appended chan <- AppendedOperation
70+ // Set to a negative value to disable creating the channel.
71+ EndedSize int `exhaustruct:"optional"`
6072
6173 // A channel to which sessions are send when they end.
74+ // Sessions are sent in the order in which they ended in the database.
6275 //
63- // The order in which they are sent is not necessary the order in which
64- // they ended. You should not rely on the order.
65- Ended chan <- identifier.Identifier
76+ // Channel is created by the listener when started and recreated on reconnection.
77+ Ended x.RecreatableChannel [identifier.Identifier ] `exhaustruct:"optional"`
6678
67- dbpool * pgxpool.Pool
79+ dbpool * pgxpool.Pool
80+ appended chan <- AppendedOperation
81+ ended chan <- identifier.Identifier
6882}
6983
7084// Init initializes the Coordinator.
7185//
7286// It creates and configures the PostgreSQL tables, indices, and
7387// stored procedures if they do not already exist.
74- func (c * Coordinator [Data , BeginMetadata , EndMetadata , OperationMetadata ]) Init (ctx context.Context , dbpool * pgxpool.Pool ) errors.E {
88+ //
89+ // A non-nil listener is required when the Appended or Ended channel is set.
90+ func (c * Coordinator [Data , BeginMetadata , EndMetadata , OperationMetadata ]) Init (ctx context.Context , dbpool * pgxpool.Pool , listener * pgxlisten.Listener ) errors.E {
7591 if c .dbpool != nil {
7692 return errors .New ("already initialized" )
7793 }
@@ -111,6 +127,7 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) Init(
111127 END IF;
112128 DELETE FROM "` + c .Prefix + `Operations" WHERE "session"=_session;
113129 UPDATE "` + c .Prefix + `Sessions" SET "endMetadata"=_metadata WHERE "session"=_session;
130+ PERFORM pg_notify('` + c .Prefix + `EndedSession', json_build_object('session', _session)::text);
114131 END;
115132 $$;
116133
@@ -135,6 +152,7 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) Init(
135152 IF NOT FOUND THEN
136153 RAISE EXCEPTION 'conflict' USING ERRCODE='` + errorCodeConflict + `';
137154 END IF;
155+ PERFORM pg_notify('` + c .Prefix + `AppendedOperation', json_build_object('session', _session, 'operation', _operation)::text);
138156 RETURN _operation;
139157 END;
140158 $$;
@@ -144,7 +162,7 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) Init(
144162 }
145163
146164 return nil
147- }, nil )
165+ })
148166 if errE != nil {
149167 if pgError , ok := errors.AsType [* pgconn.PgError ](errE ); ok {
150168 switch pgError .Code {
@@ -164,6 +182,15 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) Init(
164182
165183 c .dbpool = dbpool
166184
185+ if listener != nil {
186+ if c .AppendedSize >= 0 {
187+ listener .Handle (c .Prefix + "AppendedOperation" , c )
188+ }
189+ if c .EndedSize >= 0 {
190+ listener .Handle (c .Prefix + "EndedSession" , c )
191+ }
192+ }
193+
167194 return nil
168195}
169196
@@ -178,7 +205,7 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) Begin
178205 errE := internal .RetryTransaction (ctx , c .dbpool , pgx .ReadWrite , func (ctx context.Context , tx pgx.Tx ) errors.E {
179206 _ , err := tx .Exec (ctx , `INSERT INTO "` + c .Prefix + `Sessions" VALUES ($1, $2, NULL)` , arguments ... )
180207 return internal .WithPgxError (err )
181- }, nil )
208+ })
182209 if errE != nil {
183210 return identifier.Identifier {}, errE
184211 }
@@ -222,11 +249,8 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) End(
222249 return errE
223250 }
224251 return nil
225- }, func () {
226- if c .Ended != nil {
227- c .Ended <- session
228- }
229252 })
253+
230254 if errE != nil {
231255 errors .Details (errE )["session" ] = session .String ()
232256 }
@@ -267,14 +291,8 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) Appen
267291 return errE
268292 }
269293 return nil
270- }, func () {
271- if c .Appended != nil {
272- c .Appended <- AppendedOperation {
273- Session : session ,
274- Operation : operation ,
275- }
276- }
277294 })
295+
278296 if errE != nil {
279297 errors .Details (errE )["session" ] = session .String ()
280298 }
@@ -340,7 +358,7 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) List(
340358 // There is nothing wrong with having no operations.
341359 }
342360 return nil
343- }, nil )
361+ })
344362 if errE != nil {
345363 details := errors .Details (errE )
346364 details ["session" ] = session .String ()
@@ -393,7 +411,7 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) GetDa
393411 return errE
394412 }
395413 return nil
396- }, nil )
414+ })
397415 if errE != nil {
398416 details := errors .Details (errE )
399417 details ["session" ] = session .String ()
@@ -440,7 +458,7 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) GetMe
440458 return errE
441459 }
442460 return nil
443- }, nil )
461+ })
444462 if errE != nil {
445463 details := errors .Details (errE )
446464 details ["session" ] = session .String ()
@@ -477,10 +495,90 @@ func (c *Coordinator[Data, BeginMetadata, EndMetadata, OperationMetadata]) Get(
477495 return errE
478496 }
479497 return nil
480- }, nil )
498+ })
481499 if errE != nil {
482500 details := errors .Details (errE )
483501 details ["session" ] = session .String ()
484502 }
485503 return beginMetadata , endMetadata , errE
486504}
505+
506+ // HandleNotification implements pgxlisten.Handler interface.
507+ func (c * Coordinator [Data , BeginMetadata , EndMetadata , OperationMetadata ]) HandleNotification (
508+ ctx context.Context , notification * pgconn.Notification , conn * pgx.Conn ,
509+ ) error {
510+ switch notification .Channel {
511+ case c .Prefix + "AppendedOperation" :
512+ return c .handleAppendedOperation (ctx , notification , conn )
513+ case c .Prefix + "EndedSession" :
514+ return c .handleEndedSession (ctx , notification , conn )
515+ default :
516+ errE := errors .New ("unknown notification channel" )
517+ errors .Details (errE )["channel" ] = notification .Channel
518+ return errE
519+ }
520+ }
521+
522+ // HandleBacklog implements pgxlisten.BacklogHandler interface.
523+ //
524+ // It recreates channels to signal to their consumers that notifications might have been
525+ // missed and that they should take corrective actions, if possible.
526+ func (c * Coordinator [Data , BeginMetadata , EndMetadata , OperationMetadata ]) HandleBacklog (
527+ _ context.Context , channel string , _ * pgx.Conn ,
528+ ) error {
529+ switch channel {
530+ case c .Prefix + "AppendedOperation" :
531+ // AppendedSize should be >= 0 here unless it was changed after initialization which is not allowed.
532+ c .appended = c .Appended .Recreate (c .AppendedSize )
533+ case c .Prefix + "EndedSession" :
534+ // EndedSize should be >= 0 here unless it was changed after initialization which is not allowed.
535+ c .ended = c .Ended .Recreate (c .EndedSize )
536+ default :
537+ errE := errors .New ("unknown notification channel" )
538+ errors .Details (errE )["channel" ] = channel
539+ return errE
540+ }
541+ return nil
542+ }
543+
544+ // handleAppendedOperation handles AppendedOperation notifications and forwards
545+ // the operation to the Appended channel.
546+ func (c * Coordinator [Data , BeginMetadata , EndMetadata , OperationMetadata ]) handleAppendedOperation (
547+ ctx context.Context , notification * pgconn.Notification , _ * pgx.Conn ,
548+ ) error {
549+ var payload struct {
550+ Session string `json:"session"`
551+ Operation int64 `json:"operation"`
552+ }
553+ errE := x .UnmarshalWithoutUnknownFields ([]byte (notification .Payload ), & payload )
554+ if errE != nil {
555+ return errE
556+ }
557+ select {
558+ case c .appended <- AppendedOperation {
559+ Session : identifier .String (payload .Session ),
560+ Operation : payload .Operation ,
561+ }:
562+ case <- ctx .Done ():
563+ }
564+ return nil
565+ }
566+
567+ // handleEndedSession handles EndedSession notifications and forwards
568+ // the session identifier to the Ended channel.
569+ func (c * Coordinator [Data , BeginMetadata , EndMetadata , OperationMetadata ]) handleEndedSession (
570+ ctx context.Context , notification * pgconn.Notification , _ * pgx.Conn ,
571+ ) error {
572+ var payload struct {
573+ Session string `json:"session"`
574+ }
575+ errE := x .UnmarshalWithoutUnknownFields ([]byte (notification .Payload ), & payload )
576+ if errE != nil {
577+ return errE
578+ }
579+ select {
580+ case c .ended <- identifier .String (payload .Session ):
581+ case <- ctx .Done ():
582+ }
583+ return nil
584+ }
0 commit comments