From 85dbc042d96abd49a57870dcf82faf4f8a7696c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 24 Dec 2024 14:47:01 +0000 Subject: [PATCH] no retries, handle retry timeout ourselves --- flow/workflows/cdc_flow.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index adcc82835a..4526cf2b4f 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -100,6 +100,8 @@ func processCDCFlowConfigUpdate( } if flowConfigUpdate.NumberOfSyncs > 0 { state.SyncFlowOptions.NumberOfSyncs = flowConfigUpdate.NumberOfSyncs + } else if flowConfigUpdate.NumberOfSyncs < 0 { + state.SyncFlowOptions.NumberOfSyncs = 0 } if flowConfigUpdate.UpdatedEnv != nil { maps.Copy(cfg.Env, flowConfigUpdate.UpdatedEnv) @@ -478,9 +480,7 @@ func CDCFlowWorkflow( StartToCloseTimeout: 365 * 24 * time.Hour, HeartbeatTimeout: time.Minute, WaitForCancellation: true, - RetryPolicy: &temporal.RetryPolicy{ - InitialInterval: 30 * time.Second, - }, + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, }) syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) @@ -500,6 +500,7 @@ func CDCFlowWorkflow( } else { logger.Error("error in sync flow", slog.Any("error", err)) } + _ = workflow.Sleep(ctx, 30*time.Second) } else { logger.Info("sync finished") }