-
Notifications
You must be signed in to change notification settings - Fork 135
add NewBatchFuture API #1426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
add NewBatchFuture API #1426
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I need to think about it a bit more 🤔
Broad feelings though are in inline comments, and:
- I think this is a good candidate for (finally) creating a
go.uber.org/cadence/x/
experimental folder, for early-stage things like this. We can "promote" it to/workflow
when we're sure we're fully happy with it, and have a couple user feedbacks. - Overall makes sense and I see the use, we have a lot of problems with people doing stuff like this and it'll almost certainly help.
- The current setup where everything is enqueued synchronously, and then
.Execute(ctx)
runs the closures async, seems probably best 👍- In particular it avoids making closures "maybe sync, maybe async" which is the source of a lot of logic bugs in my experience.
- We should make sure this is documented very clearly tho.
So mostly I like it. Might need a little tweaking / a couple discussion rounds with people, but it seems worth building.
internal/executor/executor.go
Outdated
type batchExecutor struct { | ||
isRunning bool | ||
factories []func(ctx internal.Context) internal.Future | ||
valuePtrs []interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to stick it in here and not just chat:
tbh I think I'd prefer to avoid trying to write out results.
trying to write to an out-var internally risks decoding errors that you can't retry, and you have to decide what to do with the error.
I'm not quite sure what the api should be, but I think we might be better off just exposing a list of futures / giving access to futures. then they can .Get
on them like normal, and we could even keep both successful and error cases in the same list, so correlating indexes to success-or-error is easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, this won't work. We rely on futures.get() to ensure we can safely enqueue next one (thus buffered.Receive(ctx, nil)
to release one token). Let users to access future meaning we need to keep track of future status periodically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires users to preallocate an array and they will only have access once the whole batch completes executing.
No strong opinion but another alternative is to have user pass a callback to process result. Maybe they will just do some kind of map-reduce. No need to require them to allocate upfront and have access at the end.
e.g.
sum := 0
var errs multierr.Error
for _, row := range myData {
executor.AddFuture(
func(ctx internal.Context) internal.Future {
aCtx := internal.WithActivityOptions(ctx, internal.ActivityOptions{
ScheduleToStartTimeout: time.Second * 10,
StartToCloseTimeout: time.Second * 10,
})
return internal.ExecuteActivity(aCtx, batchActivity, row)
},
func(result any, err error) { // process result
if err != nil {
errs = multierr.Append(errs, err)
return
}
sum += any.(int)
}
)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, this won't work. We rely on futures.get() to ensure we can safely enqueue next one (thus
buffered.Receive(ctx, nil)
to release one token). Let users to access future meaning we need to keep track of future status periodically.
I don't think it'd change anything, you can .Get
multiple times on futures - just use a nil
out-var internally because the value doesn't matter. They can .Get(ctx, &usefulOutVar)
any time, totally unaffected by this other .Get(ctx, nil)
call.
Or do you mean something else?
To be clear, I'm definitely not trying to claim "this should definitely return a list of futures", more of a "this might be more flexible, has it been considered". It'd likely be more complicated to use and that might be a net-worse result. I'm just a bit reluctant to hide futures without a good reason, since they're a useful primitive in many cases.
Codecov ReportAttention: Patch coverage is
... and 1 file with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
internal/batch/batch_future.go
Outdated
type BatchFuture interface { | ||
internal.Future | ||
GetFutures() []internal.Future | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's probably fine either way, but it's a bit odd to have this both internal and public, and the internal one is essentially unused.
maybe just make batchFutureImpl
public, and keep (and document) the public one? it's in internal/
, it can't be used or referenced externally, should be functionally the same as keeping it private here.
that said, I don't believe there's any downside to this interface-duplication. so I think it's fine to leave it like it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I use alias then? It still doesn't feel right to expose batchFutureImpl directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(also IRL but) no, I'm just thinking:
- delete this interface, nothing uses it as an interface (no other impls)
- make
batchFutureImpl
public so the linter doesn't complain / it is legitimately annoying to return a private type, and this is still in an internal package so it's still hidden from users - keep the public interface, it's useful / matches our other future APIs even if we should perhaps prefer returning a
*BatchFuture
struct for API-stability reasons. but I doubt this'll need to change, the interface is extremely straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Sorry I pushed wrong commit ealier that overrode my change
select { | ||
case <-ctx.Done(): | ||
return taskID, fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err()) | ||
case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably worth making this a lot faster tbh, currently the tests take ~22s which seems a bit excessive.
the rest of the slowness in here is probably mostly due to trying to measure wall-clock time instead of simulated time, but I think simulated time might be more relevant - there's env.Now()
for measuring simulated time. switching to that and e.g. mocking activities with .After(rand*time.Second)
would make these essentially instant, but would still give you the ability to ensure "runs concurrently within batch bounds"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a TODO
internal/batch/batch_future_test.go
Outdated
totalExpectedTime := time.Second * time.Duration(1+totalSize/concurrency) | ||
wg := sync.WaitGroup{} | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
env.ExecuteWorkflow(batchWorkflow, batchWorkflowInput{ | ||
Concurrency: concurrency, | ||
TotalSize: totalSize, | ||
}) | ||
}() | ||
|
||
time.Sleep(totalExpectedTime / 2) | ||
env.CancelWorkflow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little surprised this works, but neat.
if you want to try fully simulating it, this is basically
env.RegisterDelayedCallback(func() {
env.CancelWorkflow()
}, totalExpectedTime / 2)
env.ExecuteWorkflow(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My approach does have race condition. This is better and I've changed to this one.
b4e85bb
to
6230f67
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some small bits of polish but otherwise I think it looks good.
tests... are kinda nice that it's "real" and demonstrates things working the way they should + cancellation timing + etc, but 22s is quite a lot to pay for that. might be worth rewriting with fully simulated time, it should be pretty easy to achieve.
(and maybe keeping some small ones non-mocked to show that they both work, because it's a fairly good stress test for the test suite, and we don't have many of those. but seems fine if that's too big for some reason)
So some minor polish and docs, and otherwise I think this is good to go. Tests would be nice / interesting / a useful exercise to redo with simulated time, but 22s isn't fatally slow or anything. They seem to do a good job showing that the behavior is correct, and that's by far the most important part, so we can do that later or something if you want.
6230f67
to
3713ed4
Compare
x/batch.go
Outdated
// | ||
// Any errors encountered are merged with go.uber.org/multierr, so single errors are | ||
// exposed normally, but multiple ones are bundled in the same way as errors.Join. | ||
// For consistency when checking individual errors, consider using `multierr.Errors(err)` in all cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// For consistency when checking individual errors, consider using `multierr.Errors(err)` in all cases. | |
// For consistency when checking individual errors, consider using `multierr.Errors(err)` in all cases, | |
// or `GetFutures()[i].Get(ctx, nil)` to get the original errors at each index. |
^ seems worth adding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
x/batch.go
Outdated
// When NewBatchFuture is called, futures created by the factories will be started sequentially until the concurrency limit (batchSize) is reached. | ||
// The remaining futures will be queued and started as previous futures complete, maintaining the specified concurrency level. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// When NewBatchFuture is called, futures created by the factories will be started sequentially until the concurrency limit (batchSize) is reached. | |
// The remaining futures will be queued and started as previous futures complete, maintaining the specified concurrency level. | |
// When NewBatchFuture is called, futures created by the factories will be started concurrently until the concurrency limit (batchSize) is reached. | |
// The remaining factories will be queued and started as previous futures complete, maintaining the specified concurrency level. |
to try to blend all the discussion together, since "started concurrently" is I think important to be accurate about.
we could change it to sequential, but I think concurrent is probably strictly more useful, since it'll more-reasonably let you do blocking things in the factory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small comment on the NewBatchFuture doc, but otherwise LGTM and ready to go 👍
Why?
Fanout pattern is common in user workflows. However, it's only possible to fanout <100 futures concurrently. Anything more than that would cause history service to stuck processing.
This BatchFuture caps the number of concurrent futures and thus promotes the correct usage.
How did you test it?
Unit Test in test environment
Detailed Description
NewBatchFuture
API for batch use casesBatchFuture
interfaceImpact Analysis
Testing Plan
Rollout Plan