From a0d0ae2c2626d21eefd097742a9437e599ae384d Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 04:33:14 +0530 Subject: [PATCH] fix: suggestion --- flow/connectors/postgres/cdc.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 7d3e43c27..be03a41ed 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -383,16 +383,17 @@ func PullCdcRecords[Items model.Items]( pkmRequiresResponse := false waitingForCommit := false - + lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() && - time.Since(standByLastLogged) >= 1*time.Minute { + time.Since(lastEmptyBatchPkmSentTime) >= 1*time.Minute { metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err } req.ConsumedOffset.Store(int64(clientXLogPos)) + lastEmptyBatchPkmSentTime = time.Now() } if err := sendStandbyAfterReplLock("pkm-response"); err != nil {