diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 3df7926ea..809eaef77 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" @@ -20,7 +21,7 @@ import ( type SlotSnapshotState struct { connector connectors.CDCPullConnector - signal connpostgres.SlotSignal + slotConn *pgx.Conn snapshotName string } @@ -43,7 +44,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s defer a.SnapshotStatesMutex.Unlock() if s, ok := a.SlotSnapshotStates[flowJobName]; ok { - close(s.signal.CloneComplete) + s.slotConn.Close(ctx) connectors.CloseConnector(ctx, s.connector) delete(a.SlotSnapshotStates, flowJobName) } @@ -76,15 +77,12 @@ func (a *SnapshotActivity) SetupReplication( connectors.CloseConnector(ctx, conn) } - slotSignal := connpostgres.NewSlotSignal() - go conn.SetupReplication(ctx, slotSignal, config) - logger.Info("waiting for slot to be created...") - slotInfo := <-slotSignal.SlotCreated + slotInfo, err := conn.SetupReplication(ctx, config) - if slotInfo.Err != nil { - closeConnectionForError(slotInfo.Err) - return nil, fmt.Errorf("slot error: %w", slotInfo.Err) + if err != nil { + closeConnectionForError(err) + return nil, fmt.Errorf("slot error: %w", err) } else { logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName)) } @@ -93,7 +91,7 @@ func (a *SnapshotActivity) SetupReplication( defer a.SnapshotStatesMutex.Unlock() a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{ - signal: slotSignal, + slotConn: slotInfo.Conn, snapshotName: slotInfo.SnapshotName, connector: conn, } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index cfda1d2a0..ce993ecd0 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -356,13 +356,12 @@ func (c *PostgresConnector) CreatePublication( // createSlotAndPublication creates the replication slot and publication. func (c *PostgresConnector) createSlotAndPublication( ctx context.Context, - signal SlotSignal, s SlotCheckResult, slot string, publication string, tableNameMapping map[string]model.NameAndExclude, doInitialCopy bool, -) { +) (SlotCreationResult, error) { // iterate through source tables and create publication, // expecting tablenames to be schema qualified if !s.PublicationExists { @@ -370,16 +369,12 @@ func (c *PostgresConnector) createSlotAndPublication( for srcTableName := range tableNameMapping { parsedSrcTableName, err := utils.ParseSchemaTable(srcTableName) if err != nil { - signal.SlotCreated <- SlotCreationResult{ - Err: fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName), - } - return + return SlotCreationResult{}, fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName) } srcTableNames = append(srcTableNames, parsedSrcTableName.String()) } if err := c.CreatePublication(ctx, srcTableNames, publication); err != nil { - signal.SlotCreated <- SlotCreationResult{Err: err} - return + return SlotCreationResult{}, err } } @@ -387,22 +382,20 @@ func (c *PostgresConnector) createSlotAndPublication( if !s.SlotExists { conn, err := c.CreateReplConn(ctx) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error acquiring connection: %w", err)} - return + return SlotCreationResult{}, fmt.Errorf("[slot] error acquiring connection: %w", err) } - defer conn.Close(ctx) c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot)) // THIS IS NOT IN A TX! if _, err := conn.Exec(ctx, "SET idle_in_transaction_session_timeout=0"); err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)} - return + conn.Close(ctx) + return SlotCreationResult{}, fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err) } if _, err := conn.Exec(ctx, "SET lock_timeout=0"); err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting lock_timeout: %w", err)} - return + conn.Close(ctx) + return SlotCreationResult{}, fmt.Errorf("[slot] error setting lock_timeout: %w", err) } opts := pglogrepl.CreateReplicationSlotOptions{ @@ -411,39 +404,33 @@ func (c *PostgresConnector) createSlotAndPublication( } res, err := pglogrepl.CreateReplicationSlot(ctx, conn.PgConn(), slot, "pgoutput", opts) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error creating replication slot: %w", err)} - return + conn.Close(ctx) + return SlotCreationResult{}, fmt.Errorf("[slot] error creating replication slot: %w", err) } pgversion, err := c.MajorVersion(ctx) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error getting PG version: %w", err)} - return + conn.Close(ctx) + return SlotCreationResult{}, fmt.Errorf("[slot] error getting PG version: %w", err) } c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot)) - slotDetails := SlotCreationResult{ + return SlotCreationResult{ SlotName: res.SlotName, SnapshotName: res.SnapshotName, - Err: nil, SupportsTIDScans: pgversion >= shared.POSTGRES_13, - } - signal.SlotCreated <- slotDetails - c.logger.Info("Waiting for clone to complete") - <-signal.CloneComplete - c.logger.Info("Clone complete") + }, nil } else { c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot)) - slotDetails := SlotCreationResult{ + var err error + if doInitialCopy { + err = ErrSlotAlreadyExists + } + return SlotCreationResult{ SlotName: slot, SnapshotName: "", - Err: nil, SupportsTIDScans: false, - } - if doInitialCopy { - slotDetails.Err = ErrSlotAlreadyExists - } - signal.SlotCreated <- slotDetails + }, err } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 24cfcff9f..17a146d97 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -57,6 +57,13 @@ type ReplState struct { LastOffset atomic.Int64 } +type SlotCreationResult struct { + SlotName string + SnapshotName string + SupportsTIDScans bool + Conn *pgx.Conn +} + func NewPostgresConnector(ctx context.Context, env map[string]string, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { logger := shared.LoggerFromCtx(ctx) flowNameInApplicationName, err := peerdbenv.PeerDBApplicationNamePerMirrorName(ctx, nil) @@ -1088,13 +1095,13 @@ func (c *PostgresConnector) FinishExport(tx any) error { return pgtx.Commit(timeout) } -// SetupReplication sets up replication for the source connector. -func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSignal, req *protos.SetupReplicationInput) { +// SetupReplication sets up replication for the source connector +func (c *PostgresConnector) SetupReplication( + ctx context.Context, + req *protos.SetupReplicationInput, +) (SlotCreationResult, error) { if !shared.IsValidReplicationName(req.FlowJobName) { - signal.SlotCreated <- SlotCreationResult{ - Err: fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName), - } - return + return SlotCreationResult{}, fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName) } // Slotname would be the job name prefixed with "peerflow_slot_" @@ -1111,8 +1118,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig // Check if the replication slot and publication exist exists, err := c.checkSlotAndPublication(ctx, slotName, publicationName) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: err} - return + return SlotCreationResult{}, err } tableNameMapping := make(map[string]model.NameAndExclude, len(req.TableNameMapping)) @@ -1123,7 +1129,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig } } // Create the replication slot and publication - c.createSlotAndPublication(ctx, signal, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot) + return c.createSlotAndPublication(ctx, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot) } func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error { diff --git a/flow/connectors/postgres/slot_signal.go b/flow/connectors/postgres/slot_signal.go deleted file mode 100644 index bd5bcbbd5..000000000 --- a/flow/connectors/postgres/slot_signal.go +++ /dev/null @@ -1,24 +0,0 @@ -package connpostgres - -type SlotCreationResult struct { - Err error - SlotName string - SnapshotName string - SupportsTIDScans bool -} - -// This struct contains two signals. -// 1. SlotCreated - this can be waited on to ensure that the slot has been created. -// 2. CloneComplete - which can be waited on to ensure that the clone has completed. -type SlotSignal struct { - SlotCreated chan SlotCreationResult - CloneComplete chan struct{} -} - -// NewSlotSignal returns a new SlotSignal. -func NewSlotSignal() SlotSignal { - return SlotSignal{ - SlotCreated: make(chan SlotCreationResult, 1), - CloneComplete: make(chan struct{}), - } -} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 875e8a0fd..a9951a7aa 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -12,7 +12,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" "github.com/PeerDB-io/peerdb/flow/e2e" "github.com/PeerDB-io/peerdb/flow/e2eshared" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -159,16 +158,13 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { }, } - signal := connpostgres.NewSlotSignal() - go s.conn.SetupReplication(context.Background(), signal, setupReplicationInput) - s.t.Log("waiting for slot creation to complete: " + flowJobName) - slotInfo := <-signal.SlotCreated - s.t.Logf("slot creation complete: %v. Signaling clone complete in 2 seconds", slotInfo) - time.Sleep(2 * time.Second) - close(signal.CloneComplete) + slotInfo, err := s.conn.SetupReplication(context.Background(), setupReplicationInput) + require.NoError(s.t, err) + + s.t.Logf("slot creation complete: %v", slotInfo) + slotInfo.Conn.Close(nil) - require.NoError(s.t, slotInfo.Err) s.t.Logf("successfully setup replication: %s", flowJobName) }