From 0023451aeca04ebc6f5274c5724583914517e3c5 Mon Sep 17 00:00:00 2001 From: Zach Musgrave Date: Mon, 30 Sep 2024 15:05:37 -0700 Subject: [PATCH] Don't count failure to send standby messages as terminal failures during replication --- server/logrepl/replication.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/logrepl/replication.go b/server/logrepl/replication.go index d57b1eade9..8b9eedf1eb 100755 --- a/server/logrepl/replication.go +++ b/server/logrepl/replication.go @@ -207,9 +207,11 @@ func (r *LogicalReplicator) StartReplication(slotName string) error { }() connErrCnt := 0 - handleErrWithRetry := func(err error) error { + handleErrWithRetry := func(err error, incrementErrorCount bool) error { if err != nil { - connErrCnt++ + if incrementErrorCount { + connErrCnt++ + } if connErrCnt < maxConsecutiveFailures { log.Printf("Error: %v. Retrying", err) if primaryConn != nil { @@ -234,7 +236,7 @@ func (r *LogicalReplicator) StartReplication(slotName string) error { WALApplyPosition: state.lastReceivedLSN + 1, }) if err != nil { - return handleErrWithRetry(err) + return handleErrWithRetry(err, false) } log.Printf("Sent Standby status message with WALWritePosition = %s, WALApplyPosition = %s\n", state.lastWrittenLSN+1, state.lastReceivedLSN+1) @@ -265,8 +267,8 @@ func (r *LogicalReplicator) StartReplication(slotName string) error { if err != nil { // unlike other error cases, back off a little here, since we're likely to just get the same error again // on initial replication establishment - time.Sleep(100 * time.Millisecond) - return handleErrWithRetry(err) + time.Sleep(3 * time.Second) + return handleErrWithRetry(err, true) } } @@ -304,7 +306,7 @@ func (r *LogicalReplicator) StartReplication(slotName string) error { if pgconn.Timeout(msgAndErr.err) { return nil } else { - return handleErrWithRetry(msgAndErr.err) + return handleErrWithRetry(msgAndErr.err, true) } } @@ -346,7 +348,7 @@ func (r *LogicalReplicator) StartReplication(slotName string) error { committed, err := r.processMessage(xld, state) if err != nil { // TODO: do we need more than one handler, one for each connection? - return handleErrWithRetry(err) + return handleErrWithRetry(err, true) } // TODO: we have a two-phase commit race here: if the WAL file update doesn't happen before the process crashes,