Skip to content

Commit

Permalink
updates mu name, adds comment
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Jul 19, 2024
1 parent 47a8ba5 commit c30fc70
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions worker/pkg/workflows/datasync/activities/sync/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ func (a *Activity) getTunnelManagerByRunId(wfId, runId string) (connectiontunnel
}

var (
benthosStreamMu sync.Mutex
// Hack that locks the instanced bento stream builder build step that causes data races if done in parallel
streamBuilderMu sync.Mutex
)

// Temporal activity that runs benthos and syncs a source connection to one or more destination connections
Expand Down Expand Up @@ -242,7 +243,7 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet
envKeyMap["TEMPORAL_WORKFLOW_ID"] = info.WorkflowExecution.ID
envKeyMap["TEMPORAL_RUN_ID"] = info.WorkflowExecution.RunID

benthosStreamMu.Lock()
streamBuilderMu.Lock()
streambldr := benthosenv.NewStreamBuilder()
// would ideally use the activity logger here but can't convert it into a slog.
streambldr.SetLogger(slogger.With(
Expand All @@ -254,12 +255,12 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet

err = streambldr.SetYAML(req.BenthosConfig)
if err != nil {
benthosStreamMu.Unlock()
streamBuilderMu.Unlock()

Check warning on line 258 in worker/pkg/workflows/datasync/activities/sync/activity.go

View check run for this annotation

Codecov / codecov/patch

worker/pkg/workflows/datasync/activities/sync/activity.go#L258

Added line #L258 was not covered by tests
return nil, fmt.Errorf("unable to convert benthos config to yaml for stream builder: %w", err)
}

stream, err := a.benthosStreamManager.NewBenthosStreamFromBuilder(streambldr)
benthosStreamMu.Unlock()
streamBuilderMu.Unlock()
if err != nil {
return nil, fmt.Errorf("unable to build benthos config: %w", err)
}
Expand Down

0 comments on commit c30fc70

Please sign in to comment.