Skip to content

Commit

Permalink
drop replication slot when overwriting slot_name in snapshot_names
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 22, 2024
1 parent d41c292 commit 55b781c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
if config.ParentMirrorName != "" {
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
return nil, err
a.Alerter.LogFlowError(ctx, "[GetQRepPartitions] "+config.FlowJobName, err)
return nil, fmt.Errorf("[GetQRepPartitions] failed to LoadSnapshotNameFromCatalog: %w", err)
}
}

Expand Down
6 changes: 4 additions & 2 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
if config.ParentMirrorName != "" {
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
return err
a.Alerter.LogFlowError(ctx, "[replicateQRepPartition] "+config.FlowJobName, err)
return fmt.Errorf("[replicateQRepPartition] failed to LoadSnapshotNameFromCatalog: %w", err)
}
}

Expand Down Expand Up @@ -518,7 +519,8 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var err error
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
return err
a.Alerter.LogFlowError(ctx, "[replicateXminPartition] "+config.FlowJobName, err)
return fmt.Errorf("[replicateXminPartition] failed to LoadSnapshotNameFromCatalog: %w", err)
}
}

Expand Down
28 changes: 27 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"

Expand Down Expand Up @@ -82,7 +83,29 @@ func (a *SnapshotActivity) SetupReplication(
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
}

// TODO if record already exists, need to remove slot
for {
var slotName string
if err := a.CatalogPool.QueryRow(
ctx,
"select slot_name from snapshot_names where flow_name = $1",
config.FlowJobName,
).Scan(&slotName); err == nil && slotName != "" {
if err := conn.ExecuteCommand(
ctx,
"select pg_drop_replication_slot($1)",
slotName,
); err != nil && !shared.IsSQLStateError(err, pgerrcode.UndefinedObject) {
if shared.IsSQLStateError(err, pgerrcode.ObjectInUse) {
a.Alerter.LogFlowError(ctx, "[SetupReplication] "+config.FlowJobName, err)
time.Sleep(time.Second * 15)
continue
}
return fmt.Errorf("failed to drop slot from previous run: %w", err)
}
}
break
}

if _, err := a.CatalogPool.Exec(ctx,
`insert into snapshot_names (flow_name, slot_name, snapshot_name, supports_tid_scan) values ($1, $2, $3, $4)
on conflict (flow_name) do update set slot_name = $2, snapshot_name = $3, supports_tid_scan = $4`,
Expand Down Expand Up @@ -134,6 +157,9 @@ func (a *SnapshotActivity) LoadSupportsTidScan(
flowJobName string,
) (bool, error) {
_, _, supportsTidScan, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, flowJobName)
if err != nil {
a.Alerter.LogFlowError(ctx, "[LoadSupportsTidScan] "+flowJobName, err)
}
return supportsTidScan, err
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ func (c *PostgresConnector) checkIfTableExistsWithTx(
return result.Bool, nil
}

func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error {
_, err := c.conn.Exec(ctx, command)
func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string, args ...any) error {
_, err := c.conn.Exec(ctx, command, args...)
return err
}

Expand Down

0 comments on commit 55b781c

Please sign in to comment.