diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index fd042db0b..48c10cc69 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -424,58 +424,6 @@ func (a *FlowableActivity) syncPg( connectors.CDCSyncPgConnector.SyncPg) } -func (a *FlowableActivity) startNormalize( - ctx context.Context, - config *protos.FlowConnectionConfigs, - batchID int64, -) error { - ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - logger := activity.GetLogger(ctx) - - dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector]( - ctx, - config.Env, - a.CatalogPool, - config.DestinationName, - ) - if errors.Is(err, errors.ErrUnsupported) { - return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID) - } else if err != nil { - return fmt.Errorf("failed to get normalize connector: %w", err) - } - defer connectors.CloseConnector(ctx, dstConn) - - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) - if err != nil { - return fmt.Errorf("failed to get table name schema mapping: %w", err) - } - - logger.Info("Normalizing batch", - slog.Int64("SyncBatchID", batchID)) - res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ - FlowJobName: config.FlowJobName, - Env: config.Env, - TableNameSchemaMapping: tableNameSchemaMapping, - TableMappings: config.TableMappings, - SoftDeleteColName: config.SoftDeleteColName, - SyncedAtColName: config.SyncedAtColName, - SyncBatchID: batchID, - }) - if err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return fmt.Errorf("failed to normalized records: %w", err) - } - if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { - if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { - return fmt.Errorf("failed to update end time for cdc batch: %w", err) - } - } - - logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID)) - - return nil -} - // SetupQRepMetadataTables sets up the metadata tables for QReplication. func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error { conn, err := connectors.GetByNameAs[connectors.QRepSyncConnector](ctx, config.Env, a.CatalogPool, config.DestinationName) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index c2ce678c5..1261a7e29 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -3,6 +3,7 @@ package activities import ( "context" + "errors" "fmt" "log/slog" "sync/atomic" @@ -624,6 +625,58 @@ func (a *FlowableActivity) maintainReplConn( } } +func (a *FlowableActivity) startNormalize( + ctx context.Context, + config *protos.FlowConnectionConfigs, + batchID int64, +) error { + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) + logger := activity.GetLogger(ctx) + + dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector]( + ctx, + config.Env, + a.CatalogPool, + config.DestinationName, + ) + if errors.Is(err, errors.ErrUnsupported) { + return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID) + } else if err != nil { + return fmt.Errorf("failed to get normalize connector: %w", err) + } + defer connectors.CloseConnector(ctx, dstConn) + + tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) + if err != nil { + return fmt.Errorf("failed to get table name schema mapping: %w", err) + } + + logger.Info("Normalizing batch", + slog.Int64("SyncBatchID", batchID)) + res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ + FlowJobName: config.FlowJobName, + Env: config.Env, + TableNameSchemaMapping: tableNameSchemaMapping, + TableMappings: config.TableMappings, + SoftDeleteColName: config.SoftDeleteColName, + SyncedAtColName: config.SyncedAtColName, + SyncBatchID: batchID, + }) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to normalized records: %w", err) + } + if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { + if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { + return fmt.Errorf("failed to update end time for cdc batch: %w", err) + } + } + + logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID)) + + return nil +} + // Suitable to be run as goroutine func (a *FlowableActivity) normalizeLoop( ctx context.Context, @@ -641,6 +694,7 @@ func (a *FlowableActivity) normalizeLoop( select { case req := <-normalizeRequests: normalizeWaiting.Store(false) + retryInterval := time.Minute retryLoop: for { normalizingBatchID.Store(req.BatchID) @@ -657,7 +711,8 @@ func (a *FlowableActivity) normalizeLoop( logger.Info("[normalize-loop] context closed before retry") return default: - time.Sleep(30 * time.Second) + time.Sleep(retryInterval) + retryInterval = min(retryInterval*2, 5*time.Minute) continue retryLoop } }