diff --git a/worker/pkg/workflows/datasync/activities/sync/activity.go b/worker/pkg/workflows/datasync/activities/sync/activity.go index dddd7a38ac..1387116b62 100644 --- a/worker/pkg/workflows/datasync/activities/sync/activity.go +++ b/worker/pkg/workflows/datasync/activities/sync/activity.go @@ -106,7 +106,8 @@ func (a *Activity) getTunnelManagerByRunId(wfId, runId string) (connectiontunnel } var ( - benthosStreamMu sync.Mutex + // Hack that locks the instanced bento stream builder build step that causes data races if done in parallel + streamBuilderMu sync.Mutex ) // Temporal activity that runs benthos and syncs a source connection to one or more destination connections @@ -242,7 +243,7 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet envKeyMap["TEMPORAL_WORKFLOW_ID"] = info.WorkflowExecution.ID envKeyMap["TEMPORAL_RUN_ID"] = info.WorkflowExecution.RunID - benthosStreamMu.Lock() + streamBuilderMu.Lock() streambldr := benthosenv.NewStreamBuilder() // would ideally use the activity logger here but can't convert it into a slog. streambldr.SetLogger(slogger.With( @@ -254,12 +255,12 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet err = streambldr.SetYAML(req.BenthosConfig) if err != nil { - benthosStreamMu.Unlock() + streamBuilderMu.Unlock() return nil, fmt.Errorf("unable to convert benthos config to yaml for stream builder: %w", err) } stream, err := a.benthosStreamManager.NewBenthosStreamFromBuilder(streambldr) - benthosStreamMu.Unlock() + streamBuilderMu.Unlock() if err != nil { return nil, fmt.Errorf("unable to build benthos config: %w", err) }