Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dl/translation: scheduling policy for translation lag #25016

Merged
merged 3 commits into from
Feb 6, 2025

Conversation

bharathv
Copy link
Contributor

@bharathv bharathv commented Feb 4, 2025

A scheduling policy that strives to meet the target lag deadline for the translators while still being fair so that translators
with a small lag do not starve translators with large lag. The policy uses a heuristic with shares to guarantee a degree of
fairness proportional to the alloted shares.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.3.x
  • v24.2.x
  • v24.1.x

Release Notes

  • none

@bharathv
Copy link
Contributor Author

bharathv commented Feb 4, 2025

/dt

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Feb 4, 2025

CI test results

test results on build#61544
test_id test_kind job_url test_status passed
datalake_translation_tests_rpunit.datalake_translation_tests_rpunit unit https://buildkite.com/redpanda/redpanda/builds/61544#0194cf20-a1e4-4fd8-81b4-ccb6a395b925 FAIL 0/2
gtest_raft_rpunit.gtest_raft_rpunit unit https://buildkite.com/redpanda/redpanda/builds/61544#0194cf20-a1e3-428a-904f-186259ad53a6 FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61544#0194cf6a-13a6-4d87-90ae-41c9013bdbca FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_HADOOP ducktape https://buildkite.com/redpanda/redpanda/builds/61544#0194cf6a-13a7-4762-86ef-a76b7f3f3c2b FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_JDBC ducktape https://buildkite.com/redpanda/redpanda/builds/61544#0194cf6a-13a8-43a0-986d-cb5936b671a3 FLAKY 1/5
rptest.tests.partition_movement_test.SIPartitionMovementTest.test_shadow_indexing.num_to_upgrade=2.cloud_storage_type=CloudStorageType.ABS ducktape https://buildkite.com/redpanda/redpanda/builds/61544#0194cf6a-13a8-43a0-986d-cb5936b671a3 FLAKY 1/2
rptest.tests.scaling_up_test.ScalingUpTest.test_scaling_up_with_recovered_topic ducktape https://buildkite.com/redpanda/redpanda/builds/61544#0194cf6a-13a6-4d87-90ae-41c9013bdbca FLAKY 1/3
test results on build#61582
test_id test_kind job_url test_status passed
rptest.tests.cloud_storage_timing_stress_test.CloudStorageTimingStressTest.test_cloud_storage_with_partition_moves.cleanup_policy=compact.delete ducktape https://buildkite.com/redpanda/redpanda/builds/61582#0194d316-ddba-491b-b0c5-ff442bbb1e0d FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryTest.test_index_recovery ducktape https://buildkite.com/redpanda/redpanda/builds/61582#0194d316-ddbb-492a-b82a-15a0d4e32bd5 FLAKY 1/3
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61582#0194d316-ddb9-4fd2-bb9a-98bf9a732778 FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_JDBC ducktape https://buildkite.com/redpanda/redpanda/builds/61582#0194d316-ddba-491b-b0c5-ff442bbb1e0d FLAKY 1/4
test results on build#61625
test_id test_kind job_url test_status passed
partition_balancer_planner_test_rpunit.partition_balancer_planner_test_rpunit unit https://buildkite.com/redpanda/redpanda/builds/61625#0194d721-cfd3-4a22-8caa-8b52686ebca7 FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_HADOOP ducktape https://buildkite.com/redpanda/redpanda/builds/61625#0194d76a-f03c-413e-b11f-09a455fb72ce FLAKY 1/2
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3.catalog_type=CatalogType.REST_JDBC ducktape https://buildkite.com/redpanda/redpanda/builds/61625#0194d76a-f039-4c34-9212-f015ed475786 FLAKY 1/2
rptest.tests.partition_movement_test.SIPartitionMovementTest.test_shadow_indexing.num_to_upgrade=0.cloud_storage_type=CloudStorageType.ABS ducktape https://buildkite.com/redpanda/redpanda/builds/61625#0194d76a-f03b-4e32-8cb8-7a9afab37153 FLAKY 1/2
test results on build#61680
test_id test_kind job_url test_status passed
gtest_raft_rpunit.gtest_raft_rpunit unit https://buildkite.com/redpanda/redpanda/builds/61680#0194dc29-bb4d-43e3-9191-9d3992db5a72 FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61680#0194dc72-df59-4fa3-ba73-4aa9b652306e FLAKY 1/2
rptest.tests.partition_move_interruption_test.PartitionMoveInterruption.test_cancelling_partition_move_x_core.replication_factor=3.unclean_abort=True.recovery=no_recovery.compacted=False ducktape https://buildkite.com/redpanda/redpanda/builds/61680#0194dc6d-b1dc-452c-abaa-9275421335d3 FLAKY 1/2

@bharathv bharathv force-pushed the ifff33 branch 2 times, most recently from 54d13e8 to 1ccd58c Compare February 4, 2025 16:36
@bharathv
Copy link
Contributor Author

bharathv commented Feb 4, 2025

/dt

@bharathv bharathv marked this pull request as ready for review February 4, 2025 20:45
rockwotj
rockwotj previously approved these changes Feb 4, 2025
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, a couple of nits is all.

on_resource_exhaustion(executor&, const reservations_tracker&) override;

private:
// Minium expected time slice allotment share of the total target lag.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Minium expected time slice allotment share of the total target lag.
// Minimum expected time slice allotment share of the total target lag.
Comment

// Check {@link minimum_allotment_coeff}
unfulfilled_quota,
// Cannot be classified into one of the following groups
random,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is other a better name than random?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, renamed.

static constexpr long default_about_to_expire_group_shares = 30;
static constexpr long default_expired_group_shares = 50;

absl::flat_hash_map<translator_group, long> _group_to_shares = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be static?


executor.as.check();

while (!prioritized.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the above loop needs to be reactor friendly, doesn't this one too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this loop is expensive in most cases because we start the first available translator and break the loop, added a yield just in case.

oleiman
oleiman previously approved these changes Feb 4, 2025
Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. few tiny nits and questions

src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
Copy link
Contributor Author

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick reviews.

// Check {@link minimum_allotment_coeff}
unfulfilled_quota,
// Cannot be classified into one of the following groups
random,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, renamed.

src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved

executor.as.check();

while (!prioritized.empty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this loop is expensive in most cases because we start the first available translator and break the loop, added a yield just in case.

oleiman
oleiman previously approved these changes Feb 5, 2025
Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

candidates.push_back(
{.id = id,
.group = translator_group::other,
.weight = random_generators::get_int<long>()});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but I wonder if we need to bound the random number, given we're going to be comparing against something relatively bounded (a time duration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we're going to be comparing against something relatively bounded (a time duration)

not sure what you mean by this exactly, .. this weight is used for ordering within the group, so we are just comparing amongst these random numbers.. (I'm ok with clamping it with a bound but just want to make sure we are on the same page)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah oops thanks for clarifying. I misunderstood that we don't compare across groups.

Comment on lines 315 to 317
while (mem_tracker.memory_exhausted() && !executor.as.abort_requested()) {
co_await ss::sleep_abortable(polling_interval, executor.as);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed to make progress within the policy here? I'm wondering does something like this end up happening?

  1. we have 10/10 blocks reserved
  2. t_1 attempts to reserve a block, which notifies the scheduler about memory exhaustion and waits
  3. since the scheduler and policy runs in a single loop, we eventually get to this line after asynchronously stopping t_big which has one block
  4. t_1 immediately takes the reservation since it's been waiting on the semaphore
  5. this loop doesn't exit because memory is still exhausted now that t_1 has taken the reservation, until the translators naturally finish
  6. ?? maybe other translators try to reserve, but the scheduler is stuck in this loop, so no translator can make progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this case is possible I think if all the translators are really high throughput. As for progress, the wait time is bounded here (meaning we are not stuck in this loop forever) because each translator will naturally release memory as its time slice finishes. I have an idea to improve the behavior here, let me push in next rev.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think I'm missing something. What bounds the wait time? It doesn't look like this while loop ever exits if we get to step 6 (it'd be a live lock), given neither the default_reservation_tracker::reserve_memory() nor this while loop have a timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop exits because mem_tracker.memory_exhausted() returns false eventually when the inflight translations exceed their time quota and release their resources (see mock_translator impl).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we get into a scenario where all translators are waiting on the reservation semaphore with no deadline, while this loop is also waiting for memory to be freed with no deadline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think thats possible because there is a deadline which is the time_slice allotted to every translator and the translator has to release its resources after that time slice elapses no matter what which keeps the wait here bounded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I think you're right. I missed that the abort source used by the reservation tracker is the translator abort source, not the executor's. Sorry for the noise!

src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
Copy link
Contributor Author

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a clarifying question on one of the comments, will push the next rev once that is addressed.

candidates.push_back(
{.id = id,
.group = translator_group::other,
.weight = random_generators::get_int<long>()});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we're going to be comparing against something relatively bounded (a time duration)

not sure what you mean by this exactly, .. this weight is used for ordering within the group, so we are just comparing amongst these random numbers.. (I'm ok with clamping it with a bound but just want to make sure we are on the same page)

src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
src/v/datalake/translation/scheduling_policies.cc Outdated Show resolved Hide resolved
Comment on lines 315 to 317
while (mem_tracker.memory_exhausted() && !executor.as.abort_requested()) {
co_await ss::sleep_abortable(polling_interval, executor.as);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this case is possible I think if all the translators are really high throughput. As for progress, the wait time is bounded here (meaning we are not stuck in this loop forever) because each translator will naturally release memory as its time slice finishes. I have an idea to improve the behavior here, let me push in next rev.

@bharathv bharathv requested review from andrwng and oleiman February 6, 2025 16:46
@bharathv bharathv enabled auto-merge February 6, 2025 18:16
@bharathv bharathv merged commit 317787e into redpanda-data:dev Feb 6, 2025
17 checks passed
@bharathv bharathv deleted the ifff33 branch February 6, 2025 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants