-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dl/translation/scheduling: API for scheduling translations #24921
Conversation
still a draft because the patch needs a little more polish and cleanup, just pushing it out so its easy to discuss the changes tomorrow (so folks can go through it before the meeting). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just went through the headers for now. Will go through the rest of the implementation later this morning
/** | ||
* Hooks for the components to notify the scheduler about events. | ||
*/ | ||
class state_change_notifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need a virtual destructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question. I think a note about the intended ownership semantics would be helpful. I see polymorphic non-owning references at various interfaces, but at a glance it's not clear to me where the actual scheduler object will sit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaned it up a bit more, lmk if it still not clear.
* Used to force a flush of in flight translation if the scheduler chooses | ||
* to. Upon this request, the translation should stop the inflight | ||
* translation and release all the reserved resources to allow other | ||
* translations to make progress. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also document what backoff is for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was experimenting a few designs for the scheduler and one of the ideas i had was to propagate a backoff from the scheduler to the translator so it marks itself ready at a later time, but I didn't end up liking the idea very much, forgot to remove the param. In this "cooperative" world the translator can figure that "backoff" part itself depending on factors like batching / how much data is available to translate / lag etc, and making that decision inside the scheduling policy only makes the scheduler more stateful and complicated I think.
Also, the scheduling policy can simulate the backoff by keeping the task in the waiting queue for a longer period if it wishes to, but that logic will be baked in the policy implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty nice! Just posting my main takeaway from the meeting: certain things weren't initially clear, like whether there's expected to be a relationship between commit deadline and time quota (i.e. the case where a user-facing deadline is 24h), or whose responsibility it is to track translation completion from an application's perspective (i.e. the case where there is a lot for the partition to translate but we only finish part of it in one run of the translator).
I think in either case, the answer is that this interface isn't necessarily opinionated about that kind of thing, though I think you mentioned extensions of the interface to address the latter.
// deposited back. | ||
size_t _total_memory_blocks; | ||
ssx::semaphore _available_memory_blocks; | ||
size_t _memory_block_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be const?
state_change_notifier& notifier) | ||
: _total_memory_blocks( | ||
static_cast<size_t>(total_memory / memory_block_size)) | ||
, _available_memory_blocks{_total_memory_blocks, "dl/translation/memory"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious what the rationale is for making the semaphore count blocks instead of bytes (but still reserving in the unit of blocks). For debugging, it seems like debugging with bytes would be one fewer hurdle to understand what's going on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no particular rationale, just to reserve in the units of 1, as you say we can modify it to reserve multiple units at once and just track memory, will do.
ss::future<> scheduler::main() { | ||
vlog(datalake_log.trace, "Starting scheduling loop"); | ||
while (!_state.as.abort_requested()) { | ||
auto holder = _state.gate.hold(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: generally should this holder be outside the loop? I seems like it may not be problematic in this case, but it still feels off to access _state without the gate held
return; | ||
} | ||
vlog(datalake_log.trace, "Marking the translator ready: {}", it->second); | ||
it->second._running_hook.unlink(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we guaranteed we're either in the running or waiting lists? Or is this just a no-op in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we guaranteed we're either in the running or waiting lists?
Yes thats the invariant that is supposed to be held, will try to cleanup this a bit more, I'm planning to add an explicit "translation_ended" callback to make the flow more explicit and the state changes easier to reason about.
also, unlink() is a noop if not linked.
|
||
class translator_state { | ||
public: | ||
translator_state() = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: curious why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. I think I added it to use [] operator on translators map that requires a default c'tor. I switched to using insert(pair()) to avoid it.
// A translator that overshoots deadline and requires explict force flushing | ||
// from the scheduler. | ||
class delaying_translator : public mock_translator { | ||
public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also makes sense to add a slow flushing translator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I was thinking about it.. I think it mocks the case when we try to stream-to-s3 ?
model::ntp _ntp; | ||
clock::duration _max_target_lag; | ||
size_t _translation_tput_bytes_per_sec; | ||
// throughput spread evently among writers | ||
// to simulate partitioning. | ||
size_t _num_writers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. clang-tidy complains about const members, fyi
vassert( | ||
!translators.contains(id), | ||
"Duplicate translator registration: {}", | ||
translator->id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a precaution, maybe we should return a bool instead of crashing? I guess this is relying on the datalake manager to only ever add one at a time, but that's a pretty distant invariant to rely on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, was just using it for testing, will propagate an error in such a case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just flushing out high level comments, I have a few more ideas I wanted to implement, will update the PR soon.
Looks pretty nice! Just posting my main takeaway from the meeting: certain things weren't initially clear, like whether there's expected to be a relationship between commit deadline and time quota (i.e. the case where a user-facing deadline is 24h), or whose responsibility it is to track translation completion from an application's perspective (i.e. the case where there is a lot for the partition to translate but we only finish part of it in one run of the translator).
I think in either case, the answer is that this interface isn't necessarily opinionated about that kind of thing, though I think you mentioned extensions of the interface to address the latter.
You're correct. My intention was to decouple quotas/time slices from the target delay. Will reword the docs to make it clear.
The scheduling policy should still consider the target delay (how close the translator is to the next delay) and dynamically adjust resource allocation (e.g., increase time slices) accordingly. This flexibility should be built into the policy itself IMO.
Another potential enhancement is to allow for dynamic extension of time slices without requiring translation interruption or flushing. For instance, if a single translator is allocated a 1min time quota and no other tasks are waiting, the time slice could be automatically extended. This would enable better batching opportunities (especially for row group sizes) by avoiding unnecessary interruptions. As you put it correctly, these can be qualified as extensions to the API.
* Used to force a flush of in flight translation if the scheduler chooses | ||
* to. Upon this request, the translation should stop the inflight | ||
* translation and release all the reserved resources to allow other | ||
* translations to make progress. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was experimenting a few designs for the scheduler and one of the ideas i had was to propagate a backoff from the scheduler to the translator so it marks itself ready at a later time, but I didn't end up liking the idea very much, forgot to remove the param. In this "cooperative" world the translator can figure that "backoff" part itself depending on factors like batching / how much data is available to translate / lag etc, and making that decision inside the scheduling policy only makes the scheduler more stateful and complicated I think.
Also, the scheduling policy can simulate the backoff by keeping the task in the waiting queue for a longer period if it wishes to, but that logic will be baked in the policy implementation.
state_change_notifier& notifier) | ||
: _total_memory_blocks( | ||
static_cast<size_t>(total_memory / memory_block_size)) | ||
, _available_memory_blocks{_total_memory_blocks, "dl/translation/memory"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no particular rationale, just to reserve in the units of 1, as you say we can modify it to reserve multiple units at once and just track memory, will do.
return; | ||
} | ||
vlog(datalake_log.trace, "Marking the translator ready: {}", it->second); | ||
it->second._running_hook.unlink(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we guaranteed we're either in the running or waiting lists?
Yes thats the invariant that is supposed to be held, will try to cleanup this a bit more, I'm planning to add an explicit "translation_ended" callback to make the flow more explicit and the state changes easier to reason about.
also, unlink() is a noop if not linked.
vassert( | ||
!translators.contains(id), | ||
"Duplicate translator registration: {}", | ||
translator->id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, was just using it for testing, will propagate an error in such a case.
// A translator that overshoots deadline and requires explict force flushing | ||
// from the scheduler. | ||
class delaying_translator : public mock_translator { | ||
public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I was thinking about it.. I think it mocks the case when we try to stream-to-s3 ?
/dt |
CI test resultstest results on build#61209
test results on build#61250
test results on build#61283
test results on build#61528
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay did a full pass. I like it! I think there are a few invariants that I'm a little worried about enforcing over time as we extend this and at the very least need some documentation.
I would also love to see the test adding some assertions in the test to ensure we're not over scheduling memory blocks. Maybe we can have an additional loop there that reads the numbers of used blocks and ensures that it's consistent between what is assigned to translators and what the scheduler thinks is the current state? Also maybe verifying the translator_state is consistent with the reality. Since this test is the flavor of "ensuring no invariants are broken" if there are additional property checks we can verify while we're here.
vassert(state._running_hook.is_linked(), "Translator not in running state"); | ||
vassert(!state._stop_in_progress, "Duplicate stop request"); | ||
vlog(datalake_log.debug, "stopping translator: {}", state); | ||
state._translator->stop_translation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should either document in the API contract here that there are no exceptions are allowed or handle it here like we do above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point done, I meant to do the latter actually, added some test coverage here with some randomness in exceptional_translator
void notify_done(const translator_id&) override; | ||
void notify_memory_exhausted() override; | ||
|
||
ss::future<> main(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be private since it's called in the ctor? Or should we make an explicit start
for duality with stop
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be private.. made it public unintentionally, moved.
co_return false; | ||
} | ||
_state.translators[id] = translator_state{std::move(translator)}; | ||
co_await _state.translators[id]._translator->start(*this, *_mem_tracker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to worry about concurrent start/start_translation/stop, etc with translators? I think we might want to document what guarantee (if any) we provide here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to, at least thats the intention and hence I added the fuzz test for verification, just inspecting the trace logs from the fuzz test I think it has a decent coverage of many failure scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to
Are you saying that:
- We won't ever have concurrent start/stop calls
OR
- It's OK to have concurrent start/stop calls - the implementation will handle it fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant (2), we can have concurrent start/stop calls and it should be handled by the implementation. I updated the docs for the methods with the expectations. Fixed a tiny test bug in that process.
[end_time] { return clock::now() >= end_time; }, | ||
[&]() { return remove_one(); }); | ||
|
||
co_await ss::when_all_succeed(std::move(add_f), std::move(remove_f)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great fuzz test, but is there anything we can actually assert? For example making sure we adhere to the policy and no resource is oversubscribed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some basic validation.. I was thinking we could add more interesting validations along with the actual scheduling policy patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will push shortly.
|
||
class translator_state { | ||
public: | ||
translator_state() = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. I think I added it to use [] operator on translators map that requires a default c'tor. I switched to using insert(pair()) to avoid it.
void notify_done(const translator_id&) override; | ||
void notify_memory_exhausted() override; | ||
|
||
ss::future<> main(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be private.. made it public unintentionally, moved.
vassert(state._running_hook.is_linked(), "Translator not in running state"); | ||
vassert(!state._stop_in_progress, "Duplicate stop request"); | ||
vlog(datalake_log.debug, "stopping translator: {}", state); | ||
state._translator->stop_translation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point done, I meant to do the latter actually, added some test coverage here with some randomness in exceptional_translator
co_return false; | ||
} | ||
_state.translators[id] = translator_state{std::move(translator)}; | ||
co_await _state.translators[id]._translator->start(*this, *_mem_tracker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to, at least thats the intention and hence I added the fuzz test for verification, just inspecting the trace logs from the fuzz test I think it has a decent coverage of many failure scenarios.
model::ntp _ntp; | ||
clock::duration _max_target_lag; | ||
size_t _translation_tput_bytes_per_sec; | ||
// throughput spread evently among writers | ||
// to simulate partitioning. | ||
size_t _num_writers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. clang-tidy complains about const members, fyi
[end_time] { return clock::now() >= end_time; }, | ||
[&]() { return remove_one(); }); | ||
|
||
co_await ss::when_all_succeed(std::move(add_f), std::move(remove_f)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some basic validation.. I was thinking we could add more interesting validations along with the actual scheduling policy patch.
/** | ||
* Hooks for the components to notify the scheduler about events. | ||
*/ | ||
class state_change_notifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaned it up a bit more, lmk if it still not clear.
9fc2618
to
f613938
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pretty much LGTM. I do think a name change would really help keep the abstractions straight
*/ | ||
struct scheduler_state { | ||
void start_translation(translator_state&, clock::duration time_slice); | ||
void stop_translation(translator_state&); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might just be me, but reading through this I kept finding it slightly confusing to follow because we refer to a couple things as state
and it's easy to conflate x_state for x. Taking a step back, what do you think about the following renames:
- scheduler_state -> executor: tracks the registered translations and carries out the execution of translation/stop requests
- translation_state -> translation_executable: a hook into the executor, as indicated by the "execut-" part of the name
Then the differences between scheduler and scheduler_state become clearer IMO: the scheduler repeatedly evaluates the policy and listens for signals in the system, while the scheduler_state/executor just perform tasks at the request of the scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, renamed.
}); | ||
} | ||
|
||
ss::future<bool> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider returning an error code
* (via stop_translation) and the translator should stop the inflight | ||
* translation and release the resources. | ||
*/ | ||
class translator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, the translator is an abstraction representing a long living executor of the translation that may be asked to execute a piece of work via start_translation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, there is a reference implementation in mock translator in the next commit which is how I envision it to be although its an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I addressed the comments, moved around the code a bit more and added some instrumentation to prep it for the next PR (scheduling policy change), not much logically changed.
* (via stop_translation) and the translator should stop the inflight | ||
* translation and release the resources. | ||
*/ | ||
class translator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, there is a reference implementation in mock translator in the next commit which is how I envision it to be although its an implementation detail.
*/ | ||
struct scheduler_state { | ||
void start_translation(translator_state&, clock::duration time_slice); | ||
void stop_translation(translator_state&); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, renamed.
Adds the API for a cooperative scheduler for scheduling partition translators. It includes a pluggable scheduling policy and a basic implementation is provided for testing. Notes: * Actual partition translator implementation implementing the translator interface comes a separate patch (or patches) * Actual scheduling policy (with more tests) is a separate patch, that needs more heuristics and testing various pathological scenarios.
Adds a scheduler fixture with a mock translator implementation to simulate scenarios for scheduling testing.
Notes:
Backports Required
Release Notes