Skip to content

Commit

Permalink
Merge pull request #780 from dolthub/zachmu/replication-bug
Browse files Browse the repository at this point in the history
Don't count failure to send standby messages as terminal failures dur…
  • Loading branch information
zachmu authored Sep 30, 2024
2 parents 7b37e01 + 0023451 commit e2f1105
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions server/logrepl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,11 @@ func (r *LogicalReplicator) StartReplication(slotName string) error {
}()

connErrCnt := 0
handleErrWithRetry := func(err error) error {
handleErrWithRetry := func(err error, incrementErrorCount bool) error {
if err != nil {
connErrCnt++
if incrementErrorCount {
connErrCnt++
}
if connErrCnt < maxConsecutiveFailures {
log.Printf("Error: %v. Retrying", err)
if primaryConn != nil {
Expand All @@ -234,7 +236,7 @@ func (r *LogicalReplicator) StartReplication(slotName string) error {
WALApplyPosition: state.lastReceivedLSN + 1,
})
if err != nil {
return handleErrWithRetry(err)
return handleErrWithRetry(err, false)
}

log.Printf("Sent Standby status message with WALWritePosition = %s, WALApplyPosition = %s\n", state.lastWrittenLSN+1, state.lastReceivedLSN+1)
Expand Down Expand Up @@ -265,8 +267,8 @@ func (r *LogicalReplicator) StartReplication(slotName string) error {
if err != nil {
// unlike other error cases, back off a little here, since we're likely to just get the same error again
// on initial replication establishment
time.Sleep(100 * time.Millisecond)
return handleErrWithRetry(err)
time.Sleep(3 * time.Second)
return handleErrWithRetry(err, true)
}
}

Expand Down Expand Up @@ -304,7 +306,7 @@ func (r *LogicalReplicator) StartReplication(slotName string) error {
if pgconn.Timeout(msgAndErr.err) {
return nil
} else {
return handleErrWithRetry(msgAndErr.err)
return handleErrWithRetry(msgAndErr.err, true)
}
}

Expand Down Expand Up @@ -346,7 +348,7 @@ func (r *LogicalReplicator) StartReplication(slotName string) error {
committed, err := r.processMessage(xld, state)
if err != nil {
// TODO: do we need more than one handler, one for each connection?
return handleErrWithRetry(err)
return handleErrWithRetry(err, true)
}

// TODO: we have a two-phase commit race here: if the WAL file update doesn't happen before the process crashes,
Expand Down

0 comments on commit e2f1105

Please sign in to comment.