diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 15b9e6edf..206d4e392 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -493,7 +493,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, if config.ParentMirrorName != "" { _, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName) if err != nil { - return nil, err + a.Alerter.LogFlowError(ctx, "[GetQRepPartitions] "+config.FlowJobName, err) + return nil, fmt.Errorf("[GetQRepPartitions] failed to LoadSnapshotNameFromCatalog: %w", err) } } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index ed31578c8..2e6842b5b 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -428,7 +428,8 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn if config.ParentMirrorName != "" { _, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName) if err != nil { - return err + a.Alerter.LogFlowError(ctx, "[replicateQRepPartition] "+config.FlowJobName, err) + return fmt.Errorf("[replicateQRepPartition] failed to LoadSnapshotNameFromCatalog: %w", err) } } @@ -518,7 +519,8 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn var err error _, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName) if err != nil { - return err + a.Alerter.LogFlowError(ctx, "[replicateXminPartition] "+config.FlowJobName, err) + return fmt.Errorf("[replicateXminPartition] failed to LoadSnapshotNameFromCatalog: %w", err) } } diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 9a9338fc6..a789c3e9a 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" @@ -82,7 +83,29 @@ func (a *SnapshotActivity) SetupReplication( logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName)) } - // TODO if record already exists, need to remove slot + for { + var slotName string + if err := a.CatalogPool.QueryRow( + ctx, + "select slot_name from snapshot_names where flow_name = $1", + config.FlowJobName, + ).Scan(&slotName); err == nil && slotName != "" { + if err := conn.ExecuteCommand( + ctx, + "select pg_drop_replication_slot($1)", + slotName, + ); err != nil && !shared.IsSQLStateError(err, pgerrcode.UndefinedObject) { + if shared.IsSQLStateError(err, pgerrcode.ObjectInUse) { + a.Alerter.LogFlowError(ctx, "[SetupReplication] "+config.FlowJobName, err) + time.Sleep(time.Second * 15) + continue + } + return fmt.Errorf("failed to drop slot from previous run: %w", err) + } + } + break + } + if _, err := a.CatalogPool.Exec(ctx, `insert into snapshot_names (flow_name, slot_name, snapshot_name, supports_tid_scan) values ($1, $2, $3, $4) on conflict (flow_name) do update set slot_name = $2, snapshot_name = $3, supports_tid_scan = $4`, @@ -134,6 +157,9 @@ func (a *SnapshotActivity) LoadSupportsTidScan( flowJobName string, ) (bool, error) { _, _, supportsTidScan, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, flowJobName) + if err != nil { + a.Alerter.LogFlowError(ctx, "[LoadSupportsTidScan] "+flowJobName, err) + } return supportsTidScan, err } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 0d163f173..c164b5a62 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -668,8 +668,8 @@ func (c *PostgresConnector) checkIfTableExistsWithTx( return result.Bool, nil } -func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error { - _, err := c.conn.Exec(ctx, command) +func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string, args ...any) error { + _, err := c.conn.Exec(ctx, command, args...) return err }