Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dl/translation/scheduling: API for scheduling translations #24921

Merged
merged 2 commits into from
Feb 4, 2025

Conversation

bharathv
Copy link
Contributor

  • adds API for a cooperative scheduler for scheduling datalake translations. Scheduling policy is pluggable and a very basic implementation is included for testing
  • Adds a test fixture for testing scheduler and also works as a simulator for simulating various translation workloads. It works with a mock translator implementation that closely resembles an actual partition translator (also gives a feel for further partition translator changes).

Notes:

  • actual scheduling policy with tests will be a separate change.
  • partition translator port to the new interface will be a separate change.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.3.x
  • v24.2.x
  • v24.1.x

Release Notes

  • none

@bharathv
Copy link
Contributor Author

still a draft because the patch needs a little more polish and cleanup, just pushing it out so its easy to discuss the changes tomorrow (so folks can go through it before the meeting).

Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just went through the headers for now. Will go through the rest of the implementation later this morning

/**
* Hooks for the components to notify the scheduler about events.
*/
class state_change_notifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need a virtual destructor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question. I think a note about the intended ownership semantics would be helpful. I see polymorphic non-owning references at various interfaces, but at a glance it's not clear to me where the actual scheduler object will sit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaned it up a bit more, lmk if it still not clear.

src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
* Used to force a flush of in flight translation if the scheduler chooses
* to. Upon this request, the translation should stop the inflight
* translation and release all the reserved resources to allow other
* translations to make progress.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also document what backoff is for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was experimenting a few designs for the scheduler and one of the ideas i had was to propagate a backoff from the scheduler to the translator so it marks itself ready at a later time, but I didn't end up liking the idea very much, forgot to remove the param. In this "cooperative" world the translator can figure that "backoff" part itself depending on factors like batching / how much data is available to translate / lag etc, and making that decision inside the scheduling policy only makes the scheduler more stateful and complicated I think.

Also, the scheduling policy can simulate the backoff by keeping the task in the waiting queue for a longer period if it wishes to, but that logic will be baked in the policy implementation.

src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty nice! Just posting my main takeaway from the meeting: certain things weren't initially clear, like whether there's expected to be a relationship between commit deadline and time quota (i.e. the case where a user-facing deadline is 24h), or whose responsibility it is to track translation completion from an application's perspective (i.e. the case where there is a lot for the partition to translate but we only finish part of it in one run of the translator).

I think in either case, the answer is that this interface isn't necessarily opinionated about that kind of thing, though I think you mentioned extensions of the interface to address the latter.

// deposited back.
size_t _total_memory_blocks;
ssx::semaphore _available_memory_blocks;
size_t _memory_block_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be const?

state_change_notifier& notifier)
: _total_memory_blocks(
static_cast<size_t>(total_memory / memory_block_size))
, _available_memory_blocks{_total_memory_blocks, "dl/translation/memory"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious what the rationale is for making the semaphore count blocks instead of bytes (but still reserving in the unit of blocks). For debugging, it seems like debugging with bytes would be one fewer hurdle to understand what's going on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no particular rationale, just to reserve in the units of 1, as you say we can modify it to reserve multiple units at once and just track memory, will do.

ss::future<> scheduler::main() {
vlog(datalake_log.trace, "Starting scheduling loop");
while (!_state.as.abort_requested()) {
auto holder = _state.gate.hold();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: generally should this holder be outside the loop? I seems like it may not be problematic in this case, but it still feels off to access _state without the gate held

return;
}
vlog(datalake_log.trace, "Marking the translator ready: {}", it->second);
it->second._running_hook.unlink();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed we're either in the running or waiting lists? Or is this just a no-op in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed we're either in the running or waiting lists?

Yes thats the invariant that is supposed to be held, will try to cleanup this a bit more, I'm planning to add an explicit "translation_ended" callback to make the flow more explicit and the state changes easier to reason about.

also, unlink() is a noop if not linked.


class translator_state {
public:
translator_state() = default;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: curious why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. I think I added it to use [] operator on translators map that requires a default c'tor. I switched to using insert(pair()) to avoid it.

Comment on lines +100 to +95
// A translator that overshoots deadline and requires explict force flushing
// from the scheduler.
class delaying_translator : public mock_translator {
public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also makes sense to add a slow flushing translator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I was thinking about it.. I think it mocks the case when we try to stream-to-s3 ?

Comment on lines 81 to 77
model::ntp _ntp;
clock::duration _max_target_lag;
size_t _translation_tput_bytes_per_sec;
// throughput spread evently among writers
// to simulate partitioning.
size_t _num_writers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. clang-tidy complains about const members, fyi

Comment on lines 236 to 239
vassert(
!translators.contains(id),
"Duplicate translator registration: {}",
translator->id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a precaution, maybe we should return a bool instead of crashing? I guess this is relying on the datalake manager to only ever add one at a time, but that's a pretty distant invariant to rely on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, was just using it for testing, will propagate an error in such a case.

Copy link
Contributor Author

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just flushing out high level comments, I have a few more ideas I wanted to implement, will update the PR soon.

Looks pretty nice! Just posting my main takeaway from the meeting: certain things weren't initially clear, like whether there's expected to be a relationship between commit deadline and time quota (i.e. the case where a user-facing deadline is 24h), or whose responsibility it is to track translation completion from an application's perspective (i.e. the case where there is a lot for the partition to translate but we only finish part of it in one run of the translator).

I think in either case, the answer is that this interface isn't necessarily opinionated about that kind of thing, though I think you mentioned extensions of the interface to address the latter.

You're correct. My intention was to decouple quotas/time slices from the target delay. Will reword the docs to make it clear.

The scheduling policy should still consider the target delay (how close the translator is to the next delay) and dynamically adjust resource allocation (e.g., increase time slices) accordingly. This flexibility should be built into the policy itself IMO.

Another potential enhancement is to allow for dynamic extension of time slices without requiring translation interruption or flushing. For instance, if a single translator is allocated a 1min time quota and no other tasks are waiting, the time slice could be automatically extended. This would enable better batching opportunities (especially for row group sizes) by avoiding unnecessary interruptions. As you put it correctly, these can be qualified as extensions to the API.

src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
* Used to force a flush of in flight translation if the scheduler chooses
* to. Upon this request, the translation should stop the inflight
* translation and release all the reserved resources to allow other
* translations to make progress.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was experimenting a few designs for the scheduler and one of the ideas i had was to propagate a backoff from the scheduler to the translator so it marks itself ready at a later time, but I didn't end up liking the idea very much, forgot to remove the param. In this "cooperative" world the translator can figure that "backoff" part itself depending on factors like batching / how much data is available to translate / lag etc, and making that decision inside the scheduling policy only makes the scheduler more stateful and complicated I think.

Also, the scheduling policy can simulate the backoff by keeping the task in the waiting queue for a longer period if it wishes to, but that logic will be baked in the policy implementation.

src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
state_change_notifier& notifier)
: _total_memory_blocks(
static_cast<size_t>(total_memory / memory_block_size))
, _available_memory_blocks{_total_memory_blocks, "dl/translation/memory"}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no particular rationale, just to reserve in the units of 1, as you say we can modify it to reserve multiple units at once and just track memory, will do.

return;
}
vlog(datalake_log.trace, "Marking the translator ready: {}", it->second);
it->second._running_hook.unlink();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed we're either in the running or waiting lists?

Yes thats the invariant that is supposed to be held, will try to cleanup this a bit more, I'm planning to add an explicit "translation_ended" callback to make the flow more explicit and the state changes easier to reason about.

also, unlink() is a noop if not linked.

Comment on lines 236 to 239
vassert(
!translators.contains(id),
"Duplicate translator registration: {}",
translator->id());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, was just using it for testing, will propagate an error in such a case.

Comment on lines +100 to +95
// A translator that overshoots deadline and requires explict force flushing
// from the scheduler.
class delaying_translator : public mock_translator {
public:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I was thinking about it.. I think it mocks the case when we try to stream-to-s3 ?

@bharathv
Copy link
Contributor Author

/dt

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jan 27, 2025

CI test results

test results on build#61209
test_id test_kind job_url test_status passed
gtest_raft_rpunit.gtest_raft_rpunit unit https://buildkite.com/redpanda/redpanda/builds/61209#0194a6c7-cc6d-4f20-9c8d-fde0aa0aa354 FLAKY 1/2
test results on build#61250
test_id test_kind job_url test_status passed
kafka_server_rpfixture.kafka_server_rpfixture unit https://buildkite.com/redpanda/redpanda/builds/61250#0194a9ca-94c5-4a93-9e8e-7709413cd701 FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61250#0194aa0e-933b-46f7-9062-b684caca9490 FLAKY 1/2
test results on build#61283
test_id test_kind job_url test_status passed
gtest_raft_rpunit.gtest_raft_rpunit unit https://buildkite.com/redpanda/redpanda/builds/61283#0194ac02-ff37-4b50-b899-cf6cbfca37c8 FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryTest.test_index_recovery ducktape https://buildkite.com/redpanda/redpanda/builds/61283#0194ac2d-0d5e-48cd-96e3-7e354249d253 FLAKY 1/3
rptest.tests.scaling_up_test.ScalingUpTest.test_scaling_up_with_recovered_topic ducktape https://buildkite.com/redpanda/redpanda/builds/61283#0194ac2d-0d5d-43da-83ba-23bda2f1e8a2 FLAKY 1/2
rptest.tests.topic_delete_test.TopicDeleteCloudStorageTest.topic_delete_installed_snapshots_test ducktape https://buildkite.com/redpanda/redpanda/builds/61283#0194ac2d-0d5c-4795-9741-1795cf32f907 FLAKY 1/2
test results on build#61528
test_id test_kind job_url test_status passed
rptest.tests.cloud_storage_timing_stress_test.CloudStorageTimingStressTest.test_cloud_storage_with_partition_moves.cleanup_policy=compact.delete ducktape https://buildkite.com/redpanda/redpanda/builds/61528#0194cd72-b932-4204-acf0-3a5bf1d4a3e5 FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/61528#0194cd72-b931-46bf-b5df-02a7671d25b4 FLAKY 1/2

Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay did a full pass. I like it! I think there are a few invariants that I'm a little worried about enforcing over time as we extend this and at the very least need some documentation.

I would also love to see the test adding some assertions in the test to ensure we're not over scheduling memory blocks. Maybe we can have an additional loop there that reads the numbers of used blocks and ensures that it's consistent between what is assigned to translators and what the scheduler thinks is the current state? Also maybe verifying the translator_state is consistent with the reality. Since this test is the flavor of "ensuring no invariants are broken" if there are additional property checks we can verify while we're here.

src/v/datalake/translation/scheduling.cc Show resolved Hide resolved
src/v/datalake/translation/scheduling.cc Show resolved Hide resolved
vassert(state._running_hook.is_linked(), "Translator not in running state");
vassert(!state._stop_in_progress, "Duplicate stop request");
vlog(datalake_log.debug, "stopping translator: {}", state);
state._translator->stop_translation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should either document in the API contract here that there are no exceptions are allowed or handle it here like we do above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point done, I meant to do the latter actually, added some test coverage here with some randomness in exceptional_translator

void notify_done(const translator_id&) override;
void notify_memory_exhausted() override;

ss::future<> main();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private since it's called in the ctor? Or should we make an explicit start for duality with stop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be private.. made it public unintentionally, moved.

co_return false;
}
_state.translators[id] = translator_state{std::move(translator)};
co_await _state.translators[id]._translator->start(*this, *_mem_tracker);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to worry about concurrent start/start_translation/stop, etc with translators? I think we might want to document what guarantee (if any) we provide here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to, at least thats the intention and hence I added the fuzz test for verification, just inspecting the trace logs from the fuzz test I think it has a decent coverage of many failure scenarios.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to

Are you saying that:

  1. We won't ever have concurrent start/stop calls

OR

  1. It's OK to have concurrent start/stop calls - the implementation will handle it fine.

Copy link
Contributor Author

@bharathv bharathv Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant (2), we can have concurrent start/stop calls and it should be handled by the implementation. I updated the docs for the methods with the expectations. Fixed a tiny test bug in that process.

src/v/datalake/translation/scheduling.cc Outdated Show resolved Hide resolved
[end_time] { return clock::now() >= end_time; },
[&]() { return remove_one(); });

co_await ss::when_all_succeed(std::move(add_f), std::move(remove_f));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great fuzz test, but is there anything we can actually assert? For example making sure we adhere to the policy and no resource is oversubscribed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some basic validation.. I was thinking we could add more interesting validations along with the actual scheduling policy patch.

Copy link
Contributor Author

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will push shortly.


class translator_state {
public:
translator_state() = default;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. I think I added it to use [] operator on translators map that requires a default c'tor. I switched to using insert(pair()) to avoid it.

void notify_done(const translator_id&) override;
void notify_memory_exhausted() override;

ss::future<> main();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be private.. made it public unintentionally, moved.

src/v/datalake/translation/scheduling.cc Show resolved Hide resolved
src/v/datalake/translation/scheduling.cc Show resolved Hide resolved
vassert(state._running_hook.is_linked(), "Translator not in running state");
vassert(!state._stop_in_progress, "Duplicate stop request");
vlog(datalake_log.debug, "stopping translator: {}", state);
state._translator->stop_translation();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point done, I meant to do the latter actually, added some test coverage here with some randomness in exceptional_translator

co_return false;
}
_state.translators[id] = translator_state{std::move(translator)};
co_await _state.translators[id]._translator->start(*this, *_mem_tracker);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to, at least thats the intention and hence I added the fuzz test for verification, just inspecting the trace logs from the fuzz test I think it has a decent coverage of many failure scenarios.

src/v/datalake/translation/scheduling.cc Outdated Show resolved Hide resolved
Comment on lines 81 to 77
model::ntp _ntp;
clock::duration _max_target_lag;
size_t _translation_tput_bytes_per_sec;
// throughput spread evently among writers
// to simulate partitioning.
size_t _num_writers;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. clang-tidy complains about const members, fyi

[end_time] { return clock::now() >= end_time; },
[&]() { return remove_one(); });

co_await ss::when_all_succeed(std::move(add_f), std::move(remove_f));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some basic validation.. I was thinking we could add more interesting validations along with the actual scheduling policy patch.

/**
* Hooks for the components to notify the scheduler about events.
*/
class state_change_notifier {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaned it up a bit more, lmk if it still not clear.

@bharathv bharathv force-pushed the if2 branch 2 times, most recently from 9fc2618 to f613938 Compare January 27, 2025 22:01
@bharathv bharathv requested review from andrwng and rockwotj January 27, 2025 22:02
@bharathv bharathv marked this pull request as ready for review January 27, 2025 22:02
rockwotj
rockwotj previously approved these changes Jan 28, 2025
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks!

Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pretty much LGTM. I do think a name change would really help keep the abstractions straight

src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
Comment on lines 246 to 249
*/
struct scheduler_state {
void start_translation(translator_state&, clock::duration time_slice);
void stop_translation(translator_state&);
Copy link
Contributor

@andrwng andrwng Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might just be me, but reading through this I kept finding it slightly confusing to follow because we refer to a couple things as state and it's easy to conflate x_state for x. Taking a step back, what do you think about the following renames:

  • scheduler_state -> executor: tracks the registered translations and carries out the execution of translation/stop requests
  • translation_state -> translation_executable: a hook into the executor, as indicated by the "execut-" part of the name

Then the differences between scheduler and scheduler_state become clearer IMO: the scheduler repeatedly evaluates the policy and listens for signals in the system, while the scheduler_state/executor just perform tasks at the request of the scheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, renamed.

src/v/datalake/translation/scheduling.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling.cc Outdated Show resolved Hide resolved
});
}

ss::future<bool>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider returning an error code

* (via stop_translation) and the translator should stop the inflight
* translation and release the resources.
*/
class translator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, the translator is an abstraction representing a long living executor of the translation that may be asked to execute a piece of work via start_translation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, there is a reference implementation in mock translator in the next commit which is how I envision it to be although its an implementation detail.

Copy link
Contributor Author

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed the comments, moved around the code a bit more and added some instrumentation to prep it for the next PR (scheduling policy change), not much logically changed.

src/v/datalake/translation/scheduling.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling.cc Outdated Show resolved Hide resolved
* (via stop_translation) and the translator should stop the inflight
* translation and release the resources.
*/
class translator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, there is a reference implementation in mock translator in the next commit which is how I envision it to be although its an implementation detail.

src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
Comment on lines 246 to 249
*/
struct scheduler_state {
void start_translation(translator_state&, clock::duration time_slice);
void stop_translation(translator_state&);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, renamed.

andrwng
andrwng previously approved these changes Feb 3, 2025
src/v/datalake/translation/scheduling.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling.h Outdated Show resolved Hide resolved
Adds the API for a cooperative scheduler for scheduling partition
translators. It includes a pluggable scheduling policy and a basic
implementation is provided for testing.

Notes:

* Actual partition translator implementation implementing the translator
  interface comes a separate patch (or patches)
* Actual scheduling policy (with more tests) is a separate patch, that
  needs more heuristics and testing various pathological scenarios.
Adds a scheduler fixture with a mock translator implementation to
simulate scenarios for scheduling testing.
@bharathv bharathv merged commit c60ffdc into redpanda-data:dev Feb 4, 2025
17 checks passed
@bharathv bharathv deleted the if2 branch February 4, 2025 00:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants