Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Automatically reconnect when the PG listener connection fails #128

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 53 additions & 10 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
// DefaultConnectionTimeout defines the default amount of time that Neoq waits for connections to become available.
DefaultConnectionTimeout = 30 * time.Second
txCtxVarKey contextKey
reconnectWaitTime = 5 * time.Second
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
Expand All @@ -88,6 +89,7 @@ type PgBackend struct {
newQueues chan string // a channel that indicates that new queues are ready to be processed
readyQueues chan string // a channel that indicates which queues are ready to have jobs processed.
listenCancelCh chan context.CancelFunc // cancellation channel for the listenerConn's WaitForNotification call.
listenConnDown chan bool // listenConnDown indicates that the listener connection is down
listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues
listenerConnMu *sync.RWMutex // listenerConnMu protects the listener connection from concurrent access
logger logging.Logger // backend-wide logger
Expand Down Expand Up @@ -135,6 +137,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
listenerConnMu: &sync.RWMutex{},
mu: &sync.RWMutex{},
listenCancelCh: make(chan context.CancelFunc, 1),
listenConnDown: make(chan bool),
}

// Set all options
Expand Down Expand Up @@ -183,14 +186,10 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
}
}

p.listenerConn, err = p.newListenerConn(ctx)
if err != nil {
p.logger.Error("unable to initialize listener connection", slog.Any("error", err))
return nil, fmt.Errorf("unable to create neoq listener connection: %w", err)
}

// monitor handlers for changes and LISTEN when new queues are added
go p.newQueueMonitor(ctx)
go p.listenerManager(ctx)

p.listenConnDown <- true

p.cron.Start()

Expand All @@ -199,12 +198,36 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
return pb, nil
}

// newQueueMonitor monitors for new queues and instruct's the listener connection to LISTEN for jobs on them
func (p *PgBackend) newQueueMonitor(ctx context.Context) {
// listenerManager manages the LISTENer connection and adding queue to it
func (p *PgBackend) listenerManager(ctx context.Context) {
var err error
for {
select {
case <-ctx.Done():
return
case <-p.listenConnDown:
lc, err := p.newListenerConn(ctx)
if err != nil {
p.logger.Error("listener connection is down, and unable to reconnect", slog.Any("error", err))
continue
}

p.listenerConnMu.Lock()
p.listenerConn = lc
p.mu.Lock()
handlers := p.handlers
p.mu.Unlock()

for queue := range handlers {
_, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue))
if err != nil {
p.logger.Error("unable to listen on queue", slog.Any("error", err), slog.String("queue", queue))
}
}

p.listenerConnMu.Unlock()

p.logger.Debug("worker database connection established")
case newQueue := <-p.newQueues:
p.logger.Debug("configure new handler", "queue", newQueue)
setup_listeners:
Expand All @@ -217,9 +240,19 @@ func (p *PgBackend) newQueueMonitor(ctx context.Context) {
default:
}

p.listenerConnMu.Lock()
lc := p.listenerConn
p.listenerConnMu.Unlock()
if lc == nil || lc.IsClosed() {
p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed")
p.listenConnDown <- true
time.Sleep(reconnectWaitTime)
continue
}

p.listenerConnMu.Lock()
// note: 'LISTEN, channel' is idempotent
_, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue))
_, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue))
p.listenerConnMu.Unlock()
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
Expand Down Expand Up @@ -951,6 +984,16 @@ func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, er
}

p.logger.Error("failed to wait for notification", slog.Any("error", waitErr))

p.listenerConnMu.Lock()
lc := p.listenerConn
p.listenerConnMu.Unlock()
if lc == nil || lc.IsClosed() {
p.logger.Error("worker database connection closed and will attempt to reconnect periodically. jobs are not being processed")
p.listenConnDown <- true
time.Sleep(reconnectWaitTime)
}

continue
}

Expand Down
Loading