Skip to content

Commit

Permalink
normalize: slow down retries (#2449)
Browse files Browse the repository at this point in the history
instead of every 30s, wait 1m, then 2m, then 4m, then 5m indefinitely
  • Loading branch information
serprex authored Jan 16, 2025
1 parent 0f851b4 commit 6d06907
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 53 deletions.
52 changes: 0 additions & 52 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,58 +424,6 @@ func (a *FlowableActivity) syncPg(
connectors.CDCSyncPgConnector.SyncPg)
}

func (a *FlowableActivity) startNormalize(
ctx context.Context,
config *protos.FlowConnectionConfigs,
batchID int64,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector](
ctx,
config.Env,
a.CatalogPool,
config.DestinationName,
)
if errors.Is(err, errors.ErrUnsupported) {
return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID)
} else if err != nil {
return fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch",
slog.Int64("SyncBatchID", batchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: config.FlowJobName,
Env: config.Env,
TableNameSchemaMapping: tableNameSchemaMapping,
TableMappings: config.TableMappings,
SoftDeleteColName: config.SoftDeleteColName,
SyncedAtColName: config.SyncedAtColName,
SyncBatchID: batchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to normalized records: %w", err)
}
if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg {
if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil {
return fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID))

return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
conn, err := connectors.GetByNameAs[connectors.QRepSyncConnector](ctx, config.Env, a.CatalogPool, config.DestinationName)
Expand Down
57 changes: 56 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package activities

import (
"context"
"errors"
"fmt"
"log/slog"
"sync/atomic"
Expand Down Expand Up @@ -624,6 +625,58 @@ func (a *FlowableActivity) maintainReplConn(
}
}

func (a *FlowableActivity) startNormalize(
ctx context.Context,
config *protos.FlowConnectionConfigs,
batchID int64,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector](
ctx,
config.Env,
a.CatalogPool,
config.DestinationName,
)
if errors.Is(err, errors.ErrUnsupported) {
return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID)
} else if err != nil {
return fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch",
slog.Int64("SyncBatchID", batchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: config.FlowJobName,
Env: config.Env,
TableNameSchemaMapping: tableNameSchemaMapping,
TableMappings: config.TableMappings,
SoftDeleteColName: config.SoftDeleteColName,
SyncedAtColName: config.SyncedAtColName,
SyncBatchID: batchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to normalized records: %w", err)
}
if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg {
if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil {
return fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID))

return nil
}

// Suitable to be run as goroutine
func (a *FlowableActivity) normalizeLoop(
ctx context.Context,
Expand All @@ -641,6 +694,7 @@ func (a *FlowableActivity) normalizeLoop(
select {
case req := <-normalizeRequests:
normalizeWaiting.Store(false)
retryInterval := time.Minute
retryLoop:
for {
normalizingBatchID.Store(req.BatchID)
Expand All @@ -657,7 +711,8 @@ func (a *FlowableActivity) normalizeLoop(
logger.Info("[normalize-loop] context closed before retry")
return
default:
time.Sleep(30 * time.Second)
time.Sleep(retryInterval)
retryInterval = min(retryInterval*2, 5*time.Minute)
continue retryLoop
}
}
Expand Down

0 comments on commit 6d06907

Please sign in to comment.