From 5a3b99b7d5e81b8810ffe72ed7f671788c5a4393 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Tue, 23 Apr 2024 08:22:59 -0600 Subject: [PATCH] fix race condition --- backends/postgres/postgres_backend.go | 32 ++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 860b590..ca78ca1 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -67,6 +67,7 @@ var ( // DefaultConnectionTimeout defines the default amount of time that Neoq waits for connections to become available. DefaultConnectionTimeout = 30 * time.Second txCtxVarKey contextKey + reconnectWaitTime = 5 * time.Second shutdownJobID = "-1" // job ID announced when triggering a shutdown shutdownAnnouncementAllowance = 100 // ms ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") @@ -210,16 +211,23 @@ func (p *PgBackend) listenerManager(ctx context.Context) { p.logger.Error("listener connection is down, and unable to reconnect", slog.Any("error", err)) continue } + p.listenerConnMu.Lock() p.listenerConn = lc - for queue := range p.handlers { + p.mu.Lock() + handlers := p.handlers + p.mu.Unlock() + + for queue := range handlers { _, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue)) if err != nil { p.logger.Error("unable to listen on queue", slog.Any("error", err), slog.String("queue", queue)) } } + p.listenerConnMu.Unlock() + p.logger.Debug("worker database connection established") case newQueue := <-p.newQueues: p.logger.Debug("configure new handler", "queue", newQueue) setup_listeners: @@ -232,6 +240,16 @@ func (p *PgBackend) listenerManager(ctx context.Context) { default: } + p.listenerConnMu.Lock() + lc := p.listenerConn + p.listenerConnMu.Unlock() + if lc == nil || lc.IsClosed() { + p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed") + p.listenConnDown <- true + time.Sleep(reconnectWaitTime) + continue + } + p.listenerConnMu.Lock() // note: 'LISTEN, channel' is idempotent _, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue)) @@ -965,13 +983,17 @@ func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, er continue } - if p.listenerConn.IsClosed() { - p.logger.Error("worker connection closed and will attempt to reconnect periodically. jobs are not being processed") + p.logger.Error("failed to wait for notification", slog.Any("error", waitErr)) + + p.listenerConnMu.Lock() + lc := p.listenerConn + p.listenerConnMu.Unlock() + if lc == nil || lc.IsClosed() { + p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed") p.listenConnDown <- true - time.Sleep(5 * time.Second) + time.Sleep(reconnectWaitTime) } - p.logger.Error("failed to wait for notification", slog.Any("error", waitErr)) continue }