Skip to content

Commit

Permalink
fix: suggestion
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Jan 9, 2025
1 parent 96e0673 commit a0d0ae2
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,17 @@ func PullCdcRecords[Items model.Items](

pkmRequiresResponse := false
waitingForCommit := false

lastEmptyBatchPkmSentTime := time.Now()
for {
if pkmRequiresResponse {
if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() &&
time.Since(standByLastLogged) >= 1*time.Minute {
time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute {
metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool)
if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil {
return err
}
req.ConsumedOffset.Store(int64(clientXLogPos))
lastEmptyBatchPkmSentTime = time.Now()
}

if err := sendStandbyAfterReplLock("pkm-response"); err != nil {
Expand Down

0 comments on commit a0d0ae2

Please sign in to comment.