From 4790d1dca82a1dfc1d83fe53ca2e9427282f5fd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 17 Jan 2025 03:19:50 +0000 Subject: [PATCH] Remove slot signal Splitting out from mysql --- flow/activities/flowable.go | 55 +++++++++++++++--------- flow/activities/snapshot_activity.go | 38 +++++++---------- flow/connectors/core.go | 5 ++- flow/connectors/postgres/client.go | 57 +++++++++---------------- flow/connectors/postgres/postgres.go | 19 ++++----- flow/connectors/postgres/slot_signal.go | 24 ----------- flow/e2e/postgres/qrep_flow_pg_test.go | 16 +++---- flow/model/model.go | 8 ++++ flow/workflows/cdc_flow.go | 9 +--- flow/workflows/qrep_flow.go | 24 ++++------- flow/workflows/setup_flow.go | 25 ++++++----- flow/workflows/snapshot_flow.go | 34 +++++++-------- 12 files changed, 135 insertions(+), 179 deletions(-) delete mode 100644 flow/connectors/postgres/slot_signal.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 48c10cc698..c1cda5de77 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -31,8 +31,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/shared" ) -// CheckConnectionResult is the result of a CheckConnection call. -type CheckConnectionResult struct { +type CheckMetadataTablesResult struct { NeedsSetupMetadataTables bool } @@ -54,18 +53,36 @@ type StreamCloser interface { func (a *FlowableActivity) CheckConnection( ctx context.Context, config *protos.SetupInput, -) (*CheckConnectionResult, error) { +) error { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) - dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName) + conn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName) + if err != nil { + if errors.Is(err, errors.ErrUnsupported) { + return nil + } + a.Alerter.LogFlowError(ctx, config.FlowName, err) + return fmt.Errorf("failed to get connector: %w", err) + } + defer connectors.CloseConnector(ctx, conn) + + return conn.ConnectionActive(ctx) +} + +func (a *FlowableActivity) CheckMetadataTables( + ctx context.Context, + config *protos.SetupInput, +) (*CheckMetadataTablesResult, error) { + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) + conn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowName, err) return nil, fmt.Errorf("failed to get connector: %w", err) } - defer connectors.CloseConnector(ctx, dstConn) + defer connectors.CloseConnector(ctx, conn) - needsSetup := dstConn.NeedsSetupMetadataTables(ctx) + needsSetup := conn.NeedsSetupMetadataTables(ctx) - return &CheckConnectionResult{ + return &CheckMetadataTablesResult{ NeedsSetupMetadataTables: needsSetup, }, nil } @@ -447,8 +464,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, runUUID string, ) (*protos.QRepParitionResult, error) { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - err := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, nil, config.ParentMirrorName) - if err != nil { + if err := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, nil, config.ParentMirrorName); err != nil { return nil, err } srcConn, err := connectors.GetByNameAs[connectors.QRepPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName) @@ -467,15 +483,14 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, return nil, fmt.Errorf("failed to get partitions from source: %w", err) } if len(partitions) > 0 { - err = monitoring.InitializeQRepRun( + if err := monitoring.InitializeQRepRun( ctx, a.CatalogPool, config, runUUID, partitions, config.ParentMirrorName, - ) - if err != nil { + ); err != nil { return nil, err } } @@ -932,15 +947,15 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot } defer connectors.CloseConnector(ctx, srcConn) - err = srcConn.AddTablesToPublication(ctx, &protos.AddTablesToPublicationInput{ + if err := srcConn.AddTablesToPublication(ctx, &protos.AddTablesToPublicationInput{ FlowJobName: cfg.FlowJobName, PublicationName: cfg.PublicationName, AdditionalTables: additionalTableMappings, - }) - if err != nil { + }); err != nil { a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) + return err } - return err + return nil } func (a *FlowableActivity) RemoveTablesFromPublication( @@ -955,15 +970,15 @@ func (a *FlowableActivity) RemoveTablesFromPublication( } defer connectors.CloseConnector(ctx, srcConn) - err = srcConn.RemoveTablesFromPublication(ctx, &protos.RemoveTablesFromPublicationInput{ + if err := srcConn.RemoveTablesFromPublication(ctx, &protos.RemoveTablesFromPublicationInput{ FlowJobName: cfg.FlowJobName, PublicationName: cfg.PublicationName, TablesToRemove: removedTablesMapping, - }) - if err != nil { + }); err != nil { a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) + return err } - return err + return nil } func (a *FlowableActivity) RemoveTablesFromRawTable( diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 3df7926ea6..3312eae63a 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -2,7 +2,6 @@ package activities import ( "context" - "errors" "fmt" "log/slog" "sync" @@ -13,14 +12,13 @@ import ( "github.com/PeerDB-io/peerdb/flow/alerting" "github.com/PeerDB-io/peerdb/flow/connectors" - connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/shared" ) type SlotSnapshotState struct { - connector connectors.CDCPullConnector - signal connpostgres.SlotSignal + connector connectors.CDCPullConnectorCore + slotConn interface{ Close(context.Context) error } snapshotName string } @@ -43,7 +41,9 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s defer a.SnapshotStatesMutex.Unlock() if s, ok := a.SlotSnapshotStates[flowJobName]; ok { - close(s.signal.CloneComplete) + if s.slotConn != nil { + s.slotConn.Close(ctx) + } connectors.CloseConnector(ctx, s.connector) delete(a.SlotSnapshotStates, flowJobName) } @@ -61,30 +61,22 @@ func (a *SnapshotActivity) SetupReplication( a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job") - conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, config.PeerName) + conn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, nil, a.CatalogPool, config.PeerName) if err != nil { - if errors.Is(err, errors.ErrUnsupported) { - logger.Info("setup replication is no-op for non-postgres source") - return nil, nil - } return nil, fmt.Errorf("failed to get connector: %w", err) } - closeConnectionForError := func(err error) { + logger.Info("waiting for slot to be created...") + slotInfo, err := conn.SetupReplication(ctx, config) + + if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) // it is important to close the connection here as it is not closed in CloseSlotKeepAlive connectors.CloseConnector(ctx, conn) - } - - slotSignal := connpostgres.NewSlotSignal() - go conn.SetupReplication(ctx, slotSignal, config) - - logger.Info("waiting for slot to be created...") - slotInfo := <-slotSignal.SlotCreated - - if slotInfo.Err != nil { - closeConnectionForError(slotInfo.Err) - return nil, fmt.Errorf("slot error: %w", slotInfo.Err) + return nil, fmt.Errorf("slot error: %w", err) + } else if slotInfo.Conn == nil && slotInfo.SlotName == "" { + logger.Info("replication setup without slot") + return nil, nil } else { logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName)) } @@ -93,7 +85,7 @@ func (a *SnapshotActivity) SetupReplication( defer a.SnapshotStatesMutex.Unlock() a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{ - signal: slotSignal, + slotConn: slotInfo.Conn, snapshotName: slotInfo.SnapshotName, connector: conn, } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 0e1081dec5..4f52732b09 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -44,7 +44,7 @@ type ValidationConnector interface { type GetTableSchemaConnector interface { Connector - // GetTableSchema returns the schema of a table in terms of QValueKind. + // GetTableSchema returns the schema of a table in terms of type system. GetTableSchema( ctx context.Context, env map[string]string, @@ -67,6 +67,9 @@ type CDCPullConnectorCore interface { // `any` from ExportSnapshot passed here when done, allowing transaction to commit FinishExport(any) error + // Setup replication in prep for initial copy + SetupReplication(context.Context, *protos.SetupReplicationInput) (model.SetupReplicationResult, error) + // Methods related to retrieving and pushing records for this connector as a source and destination. SetupReplConn(context.Context) error diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 1d7d7d589f..d6bfba1e0a 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -215,7 +215,7 @@ func (c *PostgresConnector) getNullableColumns(ctx context.Context, relID uint32 func (c *PostgresConnector) tableExists(ctx context.Context, schemaTable *utils.SchemaTable) (bool, error) { var exists pgtype.Bool - err := c.conn.QueryRow(ctx, + if err := c.conn.QueryRow(ctx, `SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = $1 @@ -223,8 +223,7 @@ func (c *PostgresConnector) tableExists(ctx context.Context, schemaTable *utils. )`, schemaTable.Schema, schemaTable.Table, - ).Scan(&exists) - if err != nil { + ).Scan(&exists); err != nil { return false, fmt.Errorf("error checking if table exists: %w", err) } @@ -356,13 +355,12 @@ func (c *PostgresConnector) CreatePublication( // createSlotAndPublication creates the replication slot and publication. func (c *PostgresConnector) createSlotAndPublication( ctx context.Context, - signal SlotSignal, s SlotCheckResult, slot string, publication string, tableNameMapping map[string]model.NameAndExclude, doInitialCopy bool, -) { +) (model.SetupReplicationResult, error) { // iterate through source tables and create publication, // expecting tablenames to be schema qualified if !s.PublicationExists { @@ -370,16 +368,12 @@ func (c *PostgresConnector) createSlotAndPublication( for srcTableName := range tableNameMapping { parsedSrcTableName, err := utils.ParseSchemaTable(srcTableName) if err != nil { - signal.SlotCreated <- SlotCreationResult{ - Err: fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName), - } - return + return model.SetupReplicationResult{}, fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName) } srcTableNames = append(srcTableNames, parsedSrcTableName.String()) } if err := c.CreatePublication(ctx, srcTableNames, publication); err != nil { - signal.SlotCreated <- SlotCreationResult{Err: err} - return + return model.SetupReplicationResult{}, err } } @@ -387,22 +381,20 @@ func (c *PostgresConnector) createSlotAndPublication( if !s.SlotExists { conn, err := c.CreateReplConn(ctx) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error acquiring connection: %w", err)} - return + return model.SetupReplicationResult{}, fmt.Errorf("[slot] error acquiring connection: %w", err) } - defer conn.Close(ctx) c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot)) // THIS IS NOT IN A TX! if _, err := conn.Exec(ctx, "SET idle_in_transaction_session_timeout=0"); err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)} - return + conn.Close(ctx) + return model.SetupReplicationResult{}, fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err) } if _, err := conn.Exec(ctx, "SET lock_timeout=0"); err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting lock_timeout: %w", err)} - return + conn.Close(ctx) + return model.SetupReplicationResult{}, fmt.Errorf("[slot] error setting lock_timeout: %w", err) } opts := pglogrepl.CreateReplicationSlotOptions{ @@ -411,39 +403,30 @@ func (c *PostgresConnector) createSlotAndPublication( } res, err := pglogrepl.CreateReplicationSlot(ctx, conn.PgConn(), slot, "pgoutput", opts) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error creating replication slot: %w", err)} - return + conn.Close(ctx) + return model.SetupReplicationResult{}, fmt.Errorf("[slot] error creating replication slot: %w", err) } pgversion, err := c.MajorVersion(ctx) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error getting PG version: %w", err)} - return + conn.Close(ctx) + return model.SetupReplicationResult{}, fmt.Errorf("[slot] error getting PG version: %w", err) } c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot)) - slotDetails := SlotCreationResult{ + return model.SetupReplicationResult{ + Conn: conn, SlotName: res.SlotName, SnapshotName: res.SnapshotName, - Err: nil, SupportsTIDScans: pgversion >= shared.POSTGRES_13, - } - signal.SlotCreated <- slotDetails - c.logger.Info("Waiting for clone to complete") - <-signal.CloneComplete - c.logger.Info("Clone complete") + }, nil } else { c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot)) - slotDetails := SlotCreationResult{ - SlotName: slot, - SnapshotName: "", - Err: nil, - SupportsTIDScans: false, - } + var err error if doInitialCopy { - slotDetails.Err = ErrSlotAlreadyExists + err = ErrSlotAlreadyExists } - signal.SlotCreated <- slotDetails + return model.SetupReplicationResult{SlotName: slot}, err } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1df5970f2b..3296db0b58 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -125,7 +125,7 @@ func (c *PostgresConnector) fetchCustomTypeMapping(ctx context.Context) (map[uin } func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, error) { - // create a separate connection pool for non-replication queries as replication connections cannot + // create a separate connection for non-replication queries as replication connections cannot // be used for extended query protocol, i.e. prepared statements replConfig, err := pgx.ParseConfig(c.connStr) if err != nil { @@ -1092,13 +1092,13 @@ func (c *PostgresConnector) FinishExport(tx any) error { return pgtx.Commit(timeout) } -// SetupReplication sets up replication for the source connector. -func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSignal, req *protos.SetupReplicationInput) { +// SetupReplication sets up replication for the source connector +func (c *PostgresConnector) SetupReplication( + ctx context.Context, + req *protos.SetupReplicationInput, +) (model.SetupReplicationResult, error) { if !shared.IsValidReplicationName(req.FlowJobName) { - signal.SlotCreated <- SlotCreationResult{ - Err: fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName), - } - return + return model.SetupReplicationResult{}, fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName) } // Slotname would be the job name prefixed with "peerflow_slot_" @@ -1115,8 +1115,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig // Check if the replication slot and publication exist exists, err := c.checkSlotAndPublication(ctx, slotName, publicationName) if err != nil { - signal.SlotCreated <- SlotCreationResult{Err: err} - return + return model.SetupReplicationResult{}, err } tableNameMapping := make(map[string]model.NameAndExclude, len(req.TableNameMapping)) @@ -1127,7 +1126,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig } } // Create the replication slot and publication - c.createSlotAndPublication(ctx, signal, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot) + return c.createSlotAndPublication(ctx, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot) } func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error { diff --git a/flow/connectors/postgres/slot_signal.go b/flow/connectors/postgres/slot_signal.go deleted file mode 100644 index bd5bcbbd52..0000000000 --- a/flow/connectors/postgres/slot_signal.go +++ /dev/null @@ -1,24 +0,0 @@ -package connpostgres - -type SlotCreationResult struct { - Err error - SlotName string - SnapshotName string - SupportsTIDScans bool -} - -// This struct contains two signals. -// 1. SlotCreated - this can be waited on to ensure that the slot has been created. -// 2. CloneComplete - which can be waited on to ensure that the clone has completed. -type SlotSignal struct { - SlotCreated chan SlotCreationResult - CloneComplete chan struct{} -} - -// NewSlotSignal returns a new SlotSignal. -func NewSlotSignal() SlotSignal { - return SlotSignal{ - SlotCreated: make(chan SlotCreationResult, 1), - CloneComplete: make(chan struct{}), - } -} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 875e8a0fd6..a9df50fbbd 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -12,7 +12,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" "github.com/PeerDB-io/peerdb/flow/e2e" "github.com/PeerDB-io/peerdb/flow/e2eshared" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -159,17 +158,14 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { }, } - signal := connpostgres.NewSlotSignal() - go s.conn.SetupReplication(context.Background(), signal, setupReplicationInput) - s.t.Log("waiting for slot creation to complete: " + flowJobName) - slotInfo := <-signal.SlotCreated - s.t.Logf("slot creation complete: %v. Signaling clone complete in 2 seconds", slotInfo) - time.Sleep(2 * time.Second) - close(signal.CloneComplete) + slotInfo, err := s.conn.SetupReplication(context.Background(), setupReplicationInput) + require.NoError(s.t, err) - require.NoError(s.t, slotInfo.Err) - s.t.Logf("successfully setup replication: %s", flowJobName) + s.t.Logf("slot creation complete: %v", slotInfo) + if slotInfo.Conn != nil { + require.NoError(s.t, slotInfo.Conn.Close(context.Background())) + } } func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { diff --git a/flow/model/model.go b/flow/model/model.go index f5fdb54fab..bbfbc357bc 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -1,6 +1,7 @@ package model import ( + "context" "crypto/sha256" "fmt" "sync/atomic" @@ -180,3 +181,10 @@ type SyncCompositeResponse struct { SyncResponse *SyncResponse NeedsNormalize bool } + +type SetupReplicationResult struct { + Conn interface{ Close(context.Context) error } + SlotName string + SnapshotName string + SupportsTIDScans bool +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 497e2d4bb8..7615f0fe63 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -58,8 +58,7 @@ func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) T { }) var result T - err := sideEffect.Get(&result) - if err != nil { + if err := sideEffect.Get(&result); err != nil { panic(err) } return result @@ -429,11 +428,7 @@ func CDCFlowWorkflow( WaitForCancellation: true, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) - snapshotFlowFuture := workflow.ExecuteChildWorkflow( - snapshotFlowCtx, - SnapshotFlowWorkflow, - cfg, - ) + snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil { logger.Error("snapshot flow failed", slog.Any("error", err)) return state, fmt.Errorf("failed to execute snapshot workflow: %w", err) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 87555371a7..72e83a3f20 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -187,9 +187,8 @@ func (q *QRepFlowExecution) getPartitions( }, }) - partitionsFuture := workflow.ExecuteActivity(ctx, flowable.GetQRepPartitions, q.config, last, q.runUUID) var partitions *protos.QRepParitionResult - if err := partitionsFuture.Get(ctx, &partitions); err != nil { + if err := workflow.ExecuteActivity(ctx, flowable.GetQRepPartitions, q.config, last, q.runUUID).Get(ctx, &partitions); err != nil { return nil, fmt.Errorf("failed to fetch partitions to replicate: %w", err) } @@ -213,8 +212,7 @@ func (q *QRepPartitionFlowExecution) replicatePartitions(ctx workflow.Context, }, }) - msg := fmt.Sprintf("replicating partition batch - %d", partitions.BatchId) - q.logger.Info(msg) + q.logger.Info("replicating partition batch", slog.Int64("BatchID", int64(partitions.BatchId))) if err := workflow.ExecuteActivity(ctx, flowable.ReplicateQRepPartitions, q.config, partitions, q.runUUID).Get(ctx, nil); err != nil { return fmt.Errorf("failed to replicate partition: %w", err) @@ -225,8 +223,7 @@ func (q *QRepPartitionFlowExecution) replicatePartitions(ctx workflow.Context, // getPartitionWorkflowID returns the child workflow ID for a new sync flow. func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) string { - id := GetUUID(ctx) - return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, id) + return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, GetUUID(ctx)) } // startChildWorkflow starts a single child workflow. @@ -268,11 +265,10 @@ func (q *QRepFlowExecution) processPartitions( partitionWorkflows := make([]workflow.Future, 0, len(batches)) for i, parts := range batches { - batch := &protos.QRepPartitionBatch{ + future := q.startChildWorkflow(ctx, &protos.QRepPartitionBatch{ Partitions: parts, BatchId: int32(i + 1), - } - future := q.startChildWorkflow(ctx, batch) + }) partitionWorkflows = append(partitionWorkflows, future) } @@ -424,18 +420,16 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error { // Support a Query for the current state of the qrep flow. - err := workflow.SetQueryHandler(ctx, shared.QRepFlowStateQuery, func() (*protos.QRepFlowState, error) { + if err := workflow.SetQueryHandler(ctx, shared.QRepFlowStateQuery, func() (*protos.QRepFlowState, error) { return state, nil - }) - if err != nil { + }); err != nil { return fmt.Errorf("failed to set `%s` query handler: %w", shared.QRepFlowStateQuery, err) } // Support a Query for the current status of the qrep flow. - err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) { + if err := workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) { return state.CurrentFlowStatus, nil - }) - if err != nil { + }); err != nil { return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) } diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 5c666ac537..752ce7ec77 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -66,20 +66,18 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( PeerName: config.SourceName, FlowName: config.FlowJobName, }) - var srcConnStatus activities.CheckConnectionResult - if err := srcConnStatusFuture.Get(checkCtx, &srcConnStatus); err != nil { - return fmt.Errorf("failed to check source peer connection: %w", err) - } - dstSetupInput := &protos.SetupInput{ Env: config.Env, PeerName: config.DestinationName, FlowName: config.FlowJobName, } + destConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckMetadataTables, dstSetupInput) + if err := srcConnStatusFuture.Get(checkCtx, nil); err != nil { + return fmt.Errorf("failed to check source peer connection: %w", err) + } // then check the destination peer connection - destConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckConnection, dstSetupInput) - var destConnStatus activities.CheckConnectionResult + var destConnStatus activities.CheckMetadataTablesResult if err := destConnStatusFuture.Get(checkCtx, &destConnStatus); err != nil { return fmt.Errorf("failed to check destination peer connection: %w", err) } @@ -94,8 +92,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( InitialInterval: 1 * time.Minute, }, }) - fDst := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput) - if err := fDst.Get(setupCtx, nil); err != nil { + if err := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput).Get(setupCtx, nil); err != nil { return fmt.Errorf("failed to setup destination peer metadata tables: %w", err) } } else { @@ -119,15 +116,12 @@ func (s *SetupFlowExecution) ensurePullability( InitialInterval: 1 * time.Minute, }, }) - srcTableIdNameMapping := make(map[uint32]string) - - srcTblIdentifiers := slices.Sorted(maps.Keys(s.tableNameMapping)) // create EnsurePullabilityInput for the srcTableName ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ PeerName: config.SourceName, FlowJobName: s.cdcFlowName, - SourceTableIdentifiers: srcTblIdentifiers, + SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)), CheckConstraints: checkConstraints, } @@ -138,8 +132,13 @@ func (s *SetupFlowExecution) ensurePullability( return nil, fmt.Errorf("failed to ensure pullability for tables: %w", err) } + if ensurePullabilityOutput == nil { + return nil, nil + } + sortedTableNames := slices.Sorted(maps.Keys(ensurePullabilityOutput.TableIdentifierMapping)) + srcTableIdNameMapping := make(map[uint32]string, len(sortedTableNames)) for _, tableName := range sortedTableNames { tableIdentifier := ensurePullabilityOutput.TableIdentifierMapping[tableName] srcTableIdNameMapping[tableIdentifier.RelId] = tableName diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 908a78ba79..74728c1f61 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -32,7 +32,6 @@ type SnapshotFlowExecution struct { logger log.Logger } -// ensurePullability ensures that the source peer is pullable. func (s *SnapshotFlowExecution) setupReplication( ctx workflow.Context, ) (*protos.SetupReplicationOutput, error) { @@ -62,8 +61,7 @@ func (s *SnapshotFlowExecution) setupReplication( } res := &protos.SetupReplicationOutput{} - setupReplicationFuture := workflow.ExecuteActivity(ctx, snapshot.SetupReplication, setupReplicationInput) - if err := setupReplicationFuture.Get(ctx, &res); err != nil { + if err := workflow.ExecuteActivity(ctx, snapshot.SetupReplication, setupReplicationInput).Get(ctx, &res); err != nil { return nil, fmt.Errorf("failed to setup replication on source peer: %w", err) } @@ -257,8 +255,7 @@ func (s *SnapshotFlowExecution) cloneTables( if v.PartitionKey == "" { v.PartitionKey = defaultPartitionCol } - err := s.cloneTable(ctx, boundSelector, snapshotName, v) - if err != nil { + if err := s.cloneTable(ctx, boundSelector, snapshotName, v); err != nil { s.logger.Error("failed to start clone child workflow", slog.Any("error", err)) continue } @@ -318,31 +315,30 @@ func SnapshotFlowWorkflow( numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1)) - if !config.DoInitialSnapshot { - _, err := se.setupReplication(ctx) - if err != nil { - return fmt.Errorf("failed to setup replication: %w", err) - } - - if err := se.closeSlotKeepAlive(ctx); err != nil { - return fmt.Errorf("failed to close slot keep alive: %w", err) - } - - return nil - } - sessionOpts := &workflow.SessionOptions{ CreationTimeout: 5 * time.Minute, ExecutionTimeout: time.Hour * 24 * 365 * 100, // 100 years HeartbeatTimeout: time.Hour, } - sessionCtx, err := workflow.CreateSession(ctx, sessionOpts) if err != nil { return fmt.Errorf("failed to create session: %w", err) } defer workflow.CompleteSession(sessionCtx) + if !config.DoInitialSnapshot { + _, err := se.setupReplication(sessionCtx) + if err != nil { + return fmt.Errorf("failed to setup replication: %w", err) + } + + if err := se.closeSlotKeepAlive(sessionCtx); err != nil { + return fmt.Errorf("failed to close slot keep alive: %w", err) + } + + return nil + } + if config.InitialSnapshotOnly { sessionInfo := workflow.GetSessionInfo(sessionCtx)