Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
no need for param
Browse files Browse the repository at this point in the history
serprex committed Jan 17, 2025

Verified

This commit was signed with the committer’s verified signature.
pbzweihander Kangwook Lee (이강욱)
1 parent 7e0c7dd commit ebf67f1
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
@@ -133,13 +133,13 @@ func (p *PostgresMetadata) SetLastOffset(ctx context.Context, jobName string, of
p.logger.Debug("updating last offset", slog.String("offsetID", pglogrepl.LSN(offset.ID).String()), slog.String("offsetText", offset.Text))
if _, err := p.pool.Exec(ctx, `
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, last_text, sync_batch_id)
VALUES ($1, $2, $3, $4)
VALUES ($1, $2, $3, 0)
ON CONFLICT (job_name)
DO UPDATE SET
last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
last_text = excluded.last_text,
updated_at = NOW()
`, jobName, offset.ID, offset.Text, 0); err != nil {
`, jobName, offset.ID, offset.Text); err != nil {
p.logger.Error("failed to update last offset", "error", err)
return err
}

0 comments on commit ebf67f1

Please sign in to comment.