diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index c8bdbc5..ca78ca1 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -67,6 +67,7 @@ var ( // DefaultConnectionTimeout defines the default amount of time that Neoq waits for connections to become available. DefaultConnectionTimeout = 30 * time.Second txCtxVarKey contextKey + reconnectWaitTime = 5 * time.Second shutdownJobID = "-1" // job ID announced when triggering a shutdown shutdownAnnouncementAllowance = 100 // ms ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") @@ -88,6 +89,7 @@ type PgBackend struct { newQueues chan string // a channel that indicates that new queues are ready to be processed readyQueues chan string // a channel that indicates which queues are ready to have jobs processed. listenCancelCh chan context.CancelFunc // cancellation channel for the listenerConn's WaitForNotification call. + listenConnDown chan bool // listenConnDown indicates that the listener connection is down listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues listenerConnMu *sync.RWMutex // listenerConnMu protects the listener connection from concurrent access logger logging.Logger // backend-wide logger @@ -135,6 +137,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err listenerConnMu: &sync.RWMutex{}, mu: &sync.RWMutex{}, listenCancelCh: make(chan context.CancelFunc, 1), + listenConnDown: make(chan bool), } // Set all options @@ -183,14 +186,10 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err } } - p.listenerConn, err = p.newListenerConn(ctx) - if err != nil { - p.logger.Error("unable to initialize listener connection", slog.Any("error", err)) - return nil, fmt.Errorf("unable to create neoq listener connection: %w", err) - } - // monitor handlers for changes and LISTEN when new queues are added - go p.newQueueMonitor(ctx) + go p.listenerManager(ctx) + + p.listenConnDown <- true p.cron.Start() @@ -199,12 +198,36 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err return pb, nil } -// newQueueMonitor monitors for new queues and instruct's the listener connection to LISTEN for jobs on them -func (p *PgBackend) newQueueMonitor(ctx context.Context) { +// listenerManager manages the LISTENer connection and adding queue to it +func (p *PgBackend) listenerManager(ctx context.Context) { + var err error for { select { case <-ctx.Done(): return + case <-p.listenConnDown: + lc, err := p.newListenerConn(ctx) + if err != nil { + p.logger.Error("listener connection is down, and unable to reconnect", slog.Any("error", err)) + continue + } + + p.listenerConnMu.Lock() + p.listenerConn = lc + p.mu.Lock() + handlers := p.handlers + p.mu.Unlock() + + for queue := range handlers { + _, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue)) + if err != nil { + p.logger.Error("unable to listen on queue", slog.Any("error", err), slog.String("queue", queue)) + } + } + + p.listenerConnMu.Unlock() + + p.logger.Debug("worker database connection established") case newQueue := <-p.newQueues: p.logger.Debug("configure new handler", "queue", newQueue) setup_listeners: @@ -217,9 +240,19 @@ func (p *PgBackend) newQueueMonitor(ctx context.Context) { default: } + p.listenerConnMu.Lock() + lc := p.listenerConn + p.listenerConnMu.Unlock() + if lc == nil || lc.IsClosed() { + p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed") + p.listenConnDown <- true + time.Sleep(reconnectWaitTime) + continue + } + p.listenerConnMu.Lock() // note: 'LISTEN, channel' is idempotent - _, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue)) + _, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue)) p.listenerConnMu.Unlock() if err != nil { err = fmt.Errorf("unable to configure listener connection: %w", err) @@ -951,6 +984,16 @@ func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, er } p.logger.Error("failed to wait for notification", slog.Any("error", waitErr)) + + p.listenerConnMu.Lock() + lc := p.listenerConn + p.listenerConnMu.Unlock() + if lc == nil || lc.IsClosed() { + p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed") + p.listenConnDown <- true + time.Sleep(reconnectWaitTime) + } + continue }