Skip to content

add NewBatchFuture API #1426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

shijiesheng
Copy link
Member

@shijiesheng shijiesheng commented Jun 11, 2025

Why?

Fanout pattern is common in user workflows. However, it's only possible to fanout <100 futures concurrently. Anything more than that would cause history service to stuck processing.

This BatchFuture caps the number of concurrent futures and thus promotes the correct usage.

How did you test it?

Unit Test in test environment

Detailed Description

  • Added NewBatchFuture API for batch use cases
  • Added BatchFuture interface

Impact Analysis

  • Backward Compatibility: yes
  • Forward Compatibility: yes

Testing Plan

  • Unit Tests: Yes
  • Persistence Tests: Not related
  • Integration Tests: No
  • Compatibility Tests: No due to new API change only

Rollout Plan

  • What is the rollout plan? deploy new version
  • Does the order of deployment matter? No
  • Is it safe to rollback? Does the order of rollback matter? Yes
  • Is there a kill switch to mitigate the impact immediately? No

@shijiesheng shijiesheng changed the title add BatchExecutor API add NewBatchExecutor API Jun 11, 2025
Copy link
Member

@Groxx Groxx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need to think about it a bit more 🤔

Broad feelings though are in inline comments, and:

  • I think this is a good candidate for (finally) creating a go.uber.org/cadence/x/ experimental folder, for early-stage things like this. We can "promote" it to /workflow when we're sure we're fully happy with it, and have a couple user feedbacks.
  • Overall makes sense and I see the use, we have a lot of problems with people doing stuff like this and it'll almost certainly help.
  • The current setup where everything is enqueued synchronously, and then .Execute(ctx) runs the closures async, seems probably best 👍
    • In particular it avoids making closures "maybe sync, maybe async" which is the source of a lot of logic bugs in my experience.
    • We should make sure this is documented very clearly tho.

So mostly I like it. Might need a little tweaking / a couple discussion rounds with people, but it seems worth building.

type batchExecutor struct {
isRunning bool
factories []func(ctx internal.Context) internal.Future
valuePtrs []interface{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to stick it in here and not just chat:
tbh I think I'd prefer to avoid trying to write out results.
trying to write to an out-var internally risks decoding errors that you can't retry, and you have to decide what to do with the error.

I'm not quite sure what the api should be, but I think we might be better off just exposing a list of futures / giving access to futures. then they can .Get on them like normal, and we could even keep both successful and error cases in the same list, so correlating indexes to success-or-error is easier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, this won't work. We rely on futures.get() to ensure we can safely enqueue next one (thus buffered.Receive(ctx, nil) to release one token). Let users to access future meaning we need to keep track of future status periodically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires users to preallocate an array and they will only have access once the whole batch completes executing.
No strong opinion but another alternative is to have user pass a callback to process result. Maybe they will just do some kind of map-reduce. No need to require them to allocate upfront and have access at the end.
e.g.


sum := 0
var errs multierr.Error
for _, row := range myData {
  executor.AddFuture(
    func(ctx internal.Context) internal.Future {
      aCtx := internal.WithActivityOptions(ctx, internal.ActivityOptions{
        ScheduleToStartTimeout: time.Second * 10,
        StartToCloseTimeout:    time.Second * 10,
      })
      return internal.ExecuteActivity(aCtx, batchActivity, row)
    },
    func(result any, err error) { // process result
       if err != nil {
         errs = multierr.Append(errs, err) 
         return
       }
       sum += any.(int)
    }
  )
}

Copy link
Member

@Groxx Groxx Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, this won't work. We rely on futures.get() to ensure we can safely enqueue next one (thus buffered.Receive(ctx, nil) to release one token). Let users to access future meaning we need to keep track of future status periodically.

I don't think it'd change anything, you can .Get multiple times on futures - just use a nil out-var internally because the value doesn't matter. They can .Get(ctx, &usefulOutVar) any time, totally unaffected by this other .Get(ctx, nil) call.

Or do you mean something else?


To be clear, I'm definitely not trying to claim "this should definitely return a list of futures", more of a "this might be more flexible, has it been considered". It'd likely be more complicated to use and that might be a net-worse result. I'm just a bit reluctant to hide futures without a good reason, since they're a useful primitive in many cases.

@shijiesheng shijiesheng changed the title add NewBatchExecutor API add NewBatchFuture API Jun 19, 2025
Copy link

codecov bot commented Jun 20, 2025

Codecov Report

Attention: Patch coverage is 90.10989% with 9 lines in your changes missing coverage. Please review.

Project coverage is 82.17%. Comparing base (221cc4e) to head (43a3c39).

Files with missing lines Patch % Lines
internal/batch/batch_future.go 90.10% 8 Missing and 1 partial ⚠️
Files with missing lines Coverage Δ
internal/batch/batch_future.go 90.10% <90.10%> (ø)

... and 1 file with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 221cc4e...43a3c39. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines 12 to 15
type BatchFuture interface {
internal.Future
GetFutures() []internal.Future
}
Copy link
Member

@Groxx Groxx Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably fine either way, but it's a bit odd to have this both internal and public, and the internal one is essentially unused.

maybe just make batchFutureImpl public, and keep (and document) the public one? it's in internal/, it can't be used or referenced externally, should be functionally the same as keeping it private here.

that said, I don't believe there's any downside to this interface-duplication. so I think it's fine to leave it like it is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I use alias then? It still doesn't feel right to expose batchFutureImpl directly.

Copy link
Member

@Groxx Groxx Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also IRL but) no, I'm just thinking:

  • delete this interface, nothing uses it as an interface (no other impls)
  • make batchFutureImpl public so the linter doesn't complain / it is legitimately annoying to return a private type, and this is still in an internal package so it's still hidden from users
  • keep the public interface, it's useful / matches our other future APIs even if we should perhaps prefer returning a *BatchFuture struct for API-stability reasons. but I doubt this'll need to change, the interface is extremely straightforward.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Sorry I pushed wrong commit ealier that overrode my change

select {
case <-ctx.Done():
return taskID, fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err())
case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond):
Copy link
Member

@Groxx Groxx Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth making this a lot faster tbh, currently the tests take ~22s which seems a bit excessive.

the rest of the slowness in here is probably mostly due to trying to measure wall-clock time instead of simulated time, but I think simulated time might be more relevant - there's env.Now() for measuring simulated time. switching to that and e.g. mocking activities with .After(rand*time.Second) would make these essentially instant, but would still give you the ability to ensure "runs concurrently within batch bounds"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a TODO

Comment on lines 124 to 136
totalExpectedTime := time.Second * time.Duration(1+totalSize/concurrency)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
env.ExecuteWorkflow(batchWorkflow, batchWorkflowInput{
Concurrency: concurrency,
TotalSize: totalSize,
})
}()

time.Sleep(totalExpectedTime / 2)
env.CancelWorkflow()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised this works, but neat.

if you want to try fully simulating it, this is basically

env.RegisterDelayedCallback(func() {
	env.CancelWorkflow()
}, totalExpectedTime / 2)
env.ExecuteWorkflow(...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My approach does have race condition. This is better and I've changed to this one.

Copy link
Member

@Groxx Groxx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some small bits of polish but otherwise I think it looks good.

tests... are kinda nice that it's "real" and demonstrates things working the way they should + cancellation timing + etc, but 22s is quite a lot to pay for that. might be worth rewriting with fully simulated time, it should be pretty easy to achieve.

(and maybe keeping some small ones non-mocked to show that they both work, because it's a fairly good stress test for the test suite, and we don't have many of those. but seems fine if that's too big for some reason)


So some minor polish and docs, and otherwise I think this is good to go. Tests would be nice / interesting / a useful exercise to redo with simulated time, but 22s isn't fatally slow or anything. They seem to do a good job showing that the behavior is correct, and that's by far the most important part, so we can do that later or something if you want.

x/batch.go Outdated
//
// Any errors encountered are merged with go.uber.org/multierr, so single errors are
// exposed normally, but multiple ones are bundled in the same way as errors.Join.
// For consistency when checking individual errors, consider using `multierr.Errors(err)` in all cases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// For consistency when checking individual errors, consider using `multierr.Errors(err)` in all cases.
// For consistency when checking individual errors, consider using `multierr.Errors(err)` in all cases,
// or `GetFutures()[i].Get(ctx, nil)` to get the original errors at each index.

^ seems worth adding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

x/batch.go Outdated
Comment on lines 36 to 37
// When NewBatchFuture is called, futures created by the factories will be started sequentially until the concurrency limit (batchSize) is reached.
// The remaining futures will be queued and started as previous futures complete, maintaining the specified concurrency level.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// When NewBatchFuture is called, futures created by the factories will be started sequentially until the concurrency limit (batchSize) is reached.
// The remaining futures will be queued and started as previous futures complete, maintaining the specified concurrency level.
// When NewBatchFuture is called, futures created by the factories will be started concurrently until the concurrency limit (batchSize) is reached.
// The remaining factories will be queued and started as previous futures complete, maintaining the specified concurrency level.

to try to blend all the discussion together, since "started concurrently" is I think important to be accurate about.

we could change it to sequential, but I think concurrent is probably strictly more useful, since it'll more-reasonably let you do blocking things in the factory.

Copy link
Member

@Groxx Groxx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small comment on the NewBatchFuture doc, but otherwise LGTM and ready to go 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants