Skip to content

Commit

Permalink
fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Apr 23, 2024
1 parent 867b43c commit 5a3b99b
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
// DefaultConnectionTimeout defines the default amount of time that Neoq waits for connections to become available.
DefaultConnectionTimeout = 30 * time.Second
txCtxVarKey contextKey
reconnectWaitTime = 5 * time.Second
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
Expand Down Expand Up @@ -210,16 +211,23 @@ func (p *PgBackend) listenerManager(ctx context.Context) {
p.logger.Error("listener connection is down, and unable to reconnect", slog.Any("error", err))
continue
}

p.listenerConnMu.Lock()
p.listenerConn = lc
for queue := range p.handlers {
p.mu.Lock()
handlers := p.handlers
p.mu.Unlock()

for queue := range handlers {
_, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue))
if err != nil {
p.logger.Error("unable to listen on queue", slog.Any("error", err), slog.String("queue", queue))
}
}

p.listenerConnMu.Unlock()

p.logger.Debug("worker database connection established")
case newQueue := <-p.newQueues:
p.logger.Debug("configure new handler", "queue", newQueue)
setup_listeners:
Expand All @@ -232,6 +240,16 @@ func (p *PgBackend) listenerManager(ctx context.Context) {
default:
}

p.listenerConnMu.Lock()
lc := p.listenerConn
p.listenerConnMu.Unlock()
if lc == nil || lc.IsClosed() {
p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed")
p.listenConnDown <- true
time.Sleep(reconnectWaitTime)
continue
}

p.listenerConnMu.Lock()
// note: 'LISTEN, channel' is idempotent
_, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue))
Expand Down Expand Up @@ -965,13 +983,17 @@ func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, er
continue
}

if p.listenerConn.IsClosed() {
p.logger.Error("worker connection closed and will attempt to reconnect periodically. jobs are not being processed")
p.logger.Error("failed to wait for notification", slog.Any("error", waitErr))

p.listenerConnMu.Lock()
lc := p.listenerConn
p.listenerConnMu.Unlock()
if lc == nil || lc.IsClosed() {
p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed")
p.listenConnDown <- true
time.Sleep(5 * time.Second)
time.Sleep(reconnectWaitTime)
}

p.logger.Error("failed to wait for notification", slog.Any("error", waitErr))
continue
}

Expand Down

0 comments on commit 5a3b99b

Please sign in to comment.