Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dl/translation: scheduling policy for translation lag #25016

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/datalake/translation/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ redpanda_cc_library(
"//src/v/ssx:future_util",
"//src/v/ssx:semaphore",
"//src/v/utils:to_string",
"@abseil-cpp//absl/container:flat_hash_map",
"@seastar",
],
)
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/translation/scheduling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ ss::future<> scheduler::main() {
// temporary default until a proper scheduling policy is implemented.
std::unique_ptr<scheduling_policy> scheduling_policy::make_default(
size_t max_concurrent_translators, clock::duration translation_time_quota) {
return std::make_unique<simple_fcfs_scheduling_policy>(
return std::make_unique<fair_scheduling_policy>(
max_concurrent_translators, translation_time_quota);
}

Expand Down
254 changes: 254 additions & 0 deletions src/v/datalake/translation/scheduling_policies.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,258 @@ ss::future<> simple_fcfs_scheduling_policy::on_resource_exhaustion(
}
}

fair_scheduling_policy::fair_scheduling_policy(
size_t max_concurrent_translators, clock::duration translation_time_quota)
: _max_concurrent_translations(max_concurrent_translators)
, _translation_time_quota(translation_time_quota) {
vlog(
datalake_log.info,
"created fair_scheduling_policy policy with {} translators "
"and {} time quota",
max_concurrent_translators,
std::chrono::duration_cast<std::chrono::milliseconds>(
translation_time_quota));
initialize_group_shares();
}

std::ostream&
operator<<(std::ostream& os, fair_scheduling_policy::translator_group group) {
switch (group) {
case fair_scheduling_policy::translator_group::other:
return os << "translator_group::other";
case fair_scheduling_policy::translator_group::unfulfilled_quota:
return os << "translator_group::unfulfilled_quota";
case fair_scheduling_policy::translator_group::about_to_expire:
return os << "translator_group::about_to_expire";
case fair_scheduling_policy::translator_group::expired:
return os << "translator_group::expired";
}
}

fair_scheduling_policy::group_shares_t fair_scheduling_policy::_group_to_shares;
fair_scheduling_policy::group_intervals_t
fair_scheduling_policy::_group_intervals;

void fair_scheduling_policy::initialize_group_shares() {
_group_to_shares = {
{other, fair_scheduling_policy::default_other_shares},
{unfulfilled_quota,
fair_scheduling_policy::default_unfulfilled_group_shares},
{about_to_expire,
fair_scheduling_policy::default_about_to_expire_group_shares},
{expired, fair_scheduling_policy::default_expired_group_shares}};

const double total_shares = std::accumulate(
std::begin(_group_to_shares),
std::end(_group_to_shares),
0,
[](const double so_far, const auto& p) { return so_far + p.second; });

vassert(
total_shares > 0,
"invalid share assignment for translation groups, total shares should be "
"> 0");

double previous_end = 0.0;
for (const auto& [group, shares] : _group_to_shares) {
double end = previous_end + shares / total_shares;
_group_intervals[group] = std::make_pair(previous_end, end);
previous_end = end;
}
}

fair_scheduling_policy::translator_group
fair_scheduling_policy::choose_random_translator_group() const {
auto random = random_generators::get_real<double>(0, 1);
auto it = std::ranges::find_if(
_group_intervals, [&random](const auto& entry) {
auto interval_begin = entry.second.first;
auto interval_end = entry.second.second;
return interval_begin <= random && random <= interval_end;
});
vassert(
it != _group_intervals.end(),
"Invalid share assignment for translation groups, no group found for {}",
random);
return it->first;
}

ss::future<> fair_scheduling_policy::schedule_one_translation(
executor& executor, const reservations_tracker& mem_tracker) {
// Wait until an empty slot frees up.
while (!executor.as.abort_requested() && !executor.waiting.empty()
&& !mem_tracker.memory_exhausted()
&& executor.running.size() >= _max_concurrent_translations) {
co_await ss::sleep_abortable(polling_interval, executor.as);
oleiman marked this conversation as resolved.
Show resolved Hide resolved
}

if (executor.as.abort_requested() || mem_tracker.memory_exhausted()) {
co_return;
}

auto prioritized_group = choose_random_translator_group();
vlog(
datalake_log.trace,
"prioritizing translator group of type: {}",
prioritized_group);

struct entry {
translator_id id;
translator_group group;
// weight within the group, used to sort entries within
// a single group.
long weight{0};
};
// A comparator to sort all the {@link entry} entries based on two
// dimensions
// 1. group - if the group is prioritized, it is bubbled to the top relative
// to other groups (since we want to pick the prioritized group among all
// the available translators)
// 2. weight - 2nd dimension of sorting among all translators within a group

// This scheme allows us to always pick a translator if a prioritized group
// is unavailable. In such a case pick a group with highest shares (and
// weight).
auto max_heap_cmp = [&](const entry& lhs, const entry& rhs) {
auto both_prioritized = lhs.group == prioritized_group
&& rhs.group == prioritized_group;
if (both_prioritized) {
return lhs.weight < rhs.weight;
}
// Either one of them is prioritized
if (lhs.group == prioritized_group || rhs.group == prioritized_group) {
return rhs.group == prioritized_group;
}
// neither of them are prioritized, just sort based on
// shares if they are different, else weight within the same share
// group.
auto lhs_shares = _group_to_shares[lhs.group];
auto rhs_shares = _group_to_shares[rhs.group];
return lhs_shares == rhs_shares ? lhs.weight < rhs.weight
: lhs_shares < rhs_shares;
};

// Make a snapshot of state to be scheduled without any scheduling points.
// The expectation is that the waiting queue is small enough to not
// cause any reactor stalls.

// This loop classifies each translator into a group {@link
// translator_group} depending on the current status of the group. A
// snapshot of all the classified translators is then sorted using the
// comparator prioritizing the group that is randomly picked above..

chunked_vector<entry> candidates;
for (const auto& translator : executor.waiting) {
auto status = translator.status();
auto duration_to_expire = status.next_checkpoint_deadline
- clock::now();
auto& id = translator.translator_ptr()->id();
if (duration_to_expire < clock::duration{0}) {
// Translator with the largest expiration time (most expired) is
// given the highest weight.
candidates.push_back(
{.id = id,
.group = translator_group::expired,
.weight = -duration_to_expire.count()});
} else if (
duration_to_expire < about_to_expire_window * status.target_lag) {
// Translator with the nearest expiry time is prioritized higher.
candidates.push_back(
{.id = id,
.group = translator_group::about_to_expire,
.weight = -duration_to_expire.count()});
} else {
auto total_time = clock::now() - translator.start_time();
auto total_running_time = translator.total_running_time();
if (total_running_time < minimum_allotment_coeff * total_time) {
// within all unfulfilled quota, we want to order order them
// by least alloted time first .
candidates.push_back(
{.id = id,
.group = translator_group::unfulfilled_quota,
.weight = -total_running_time.count()});
} else {
// todo: perhaps we can order by pending_bytes_to_translate
candidates.push_back(
{.id = id,
.group = translator_group::other,
.weight = random_generators::get_int<long>()});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but I wonder if we need to bound the random number, given we're going to be comparing against something relatively bounded (a time duration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we're going to be comparing against something relatively bounded (a time duration)

not sure what you mean by this exactly, .. this weight is used for ordering within the group, so we are just comparing amongst these random numbers.. (I'm ok with clamping it with a bound but just want to make sure we are on the same page)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah oops thanks for clarifying. I misunderstood that we don't compare across groups.

}
}
const auto& back = candidates.back();
vlog(
datalake_log.trace,
"scheduling candidate: {}, group: {}, weight: {}",
translator,
back.group,
back.weight);
}

co_await ss::maybe_yield();

// Reactor friendly sort using the comparator
std::priority_queue<entry, chunked_vector<entry>, decltype(max_heap_cmp)>
prioritized(max_heap_cmp);
for (auto& entry : candidates) {
executor.as.check();
auto it = executor.translators.find(entry.id);
if (
it == executor.translators.end()
|| !it->second._waiting_hook.is_linked()) {
continue;
}
prioritized.push(entry);
co_await ss::maybe_yield();
}

executor.as.check();

while (!prioritized.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the above loop needs to be reactor friendly, doesn't this one too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this loop is expensive in most cases because we start the first available translator and break the loop, added a yield just in case.

auto& entry = prioritized.top();
auto it = executor.translators.find(entry.id);
if (
it != executor.translators.end()
&& it->second._waiting_hook.is_linked()) {
vlog(
datalake_log.trace,
"[{}] chosen translator to run, group: {}, weight: {}",
it->second,
entry.group,
entry.weight);
executor.start_translation(it->second, _translation_time_quota);
co_return;
}
prioritized.pop();
co_await ss::maybe_yield();
}
}

ss::future<> fair_scheduling_policy::on_resource_exhaustion(
executor& executor, const reservations_tracker& mem_tracker) {
if (!mem_tracker.memory_exhausted() || executor.running.empty()) {
co_return;
}
// stop the translator with highest memory usage first.
// note: the size of this list is super small (low single
// digits)
executor.running.sort(
[](const translator_executable& a, const translator_executable& b) {
return a.status().memory_bytes_reserved
> b.status().memory_bytes_reserved;
});

auto num_running = executor.running.size();
// pick the earliest scheduled translator and force a flush.
vlog(
datalake_log.debug,
"[{}] stopping translator due to memory exhaustion",
*executor.running.begin());
executor.stop_translation(*executor.running.begin());

while (mem_tracker.memory_exhausted() && !executor.as.abort_requested()
&& executor.running.size() == num_running) {
co_await ss::sleep_abortable(polling_interval, executor.as);
}
}

} // namespace datalake::translation::scheduling
86 changes: 86 additions & 0 deletions src/v/datalake/translation/scheduling_policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include "datalake/translation/scheduling.h"

#include <absl/container/flat_hash_map.h>

namespace datalake::translation::scheduling {

/**
Expand All @@ -35,4 +37,88 @@ class simple_fcfs_scheduling_policy : public scheduling_policy {
clock::duration _translation_time_quota;
};

/**
* A scheduling policy that strives to meet the target lag deadline for
* the translators while still being fair so that translators
* with a small lag do not starve translators with large lag.
*
* The policy uses a heuristic with shares to guarantee a degree of
* fairness proportional to the alloted shares.
*/
class fair_scheduling_policy : public scheduling_policy {
public:
explicit fair_scheduling_policy(
size_t max_concurrent_translators,
clock::duration translation_time_quota);

ss::future<>
schedule_one_translation(executor&, const reservations_tracker&) override;

ss::future<>
on_resource_exhaustion(executor&, const reservations_tracker&) override;

private:
// Minimum expected time slice allotment share of the total target lag.
// For example, if target lag = 1hr, 0.3 * 1hr = 0.3h of translation time
// is guaranteed to each translator over the period of lag interval.
static constexpr double minimum_allotment_coeff = 0.3;

// If the remaining time in the lag window falls within this fraction of
// of the total lag time, it is considered as 'about to expire'.
static constexpr double about_to_expire_window = 0.2;

// A classification of waiting translators for prioritization.
// Each of these groups is assigned shares that determine the likelihood
// with which a translator from one of these groups is picked to run next.
// During a translation tick, every waiting translator is classified into
// one of these groups.

// The rationale here is that we cannot always prioritize translators with
// impending deadline as that will starve translators with larger lag
// windows.
enum translator_group : uint8_t {
// A translator whose lag deadline has expired while it is waiting
expired,
// A translator whose lag deadline is about to expire soon.
// Check {@link about_to_expire_window}.
about_to_expire,
// A translator that does not fall into the categories above
// but has not met the minimum allotment quota.
// Check {@link minimum_allotment_coeff}
unfulfilled_quota,
// Cannot be classified into one of the following groups
other,
};

friend std::ostream& operator<<(std::ostream&, translator_group);

/**
* Picks a random translator group category with probability proportional
* to the shares defined below.
*/
translator_group choose_random_translator_group() const;

// Higher the shares, higher the likelihood of the group being picked.
// eg: other group with 5 shares results in a (5 / (5 + 15 + 30 + 50))
// 5% chance of translator from that getting picked to run next.

// todo: these weights can be made dynamic or mapped to a dynamic runtime
// configuration.
static constexpr long default_other_shares = 5;
static constexpr long default_unfulfilled_group_shares = 15;
static constexpr long default_about_to_expire_group_shares = 30;
static constexpr long default_expired_group_shares = 50;

using group_shares_t = absl::flat_hash_map<translator_group, long>;
using group_intervals_t
= absl::flat_hash_map<translator_group, std::pair<double, double>>;

static group_shares_t _group_to_shares;
static group_intervals_t _group_intervals;
static void initialize_group_shares();

size_t _max_concurrent_translations;
clock::duration _translation_time_quota;
};

} // namespace datalake::translation::scheduling
18 changes: 18 additions & 0 deletions src/v/datalake/translation/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ redpanda_cc_gtest(
deps = [
":scheduler_fixture",
"//src/v/datalake:logger",
"//src/v/test_utils:gtest",
"//src/v/test_utils:random",
"@seastar",
],
)

redpanda_cc_gtest(
name = "fair_scheduler_test",
timeout = "moderate",
srcs = [
"fair_scheduling_policy_tests.cc",
],
cpu = 1,
deps = [
":scheduler_fixture",
"//src/v/datalake:logger",
"//src/v/datalake/translation:scheduler",
"//src/v/test_utils:gtest",
"//src/v/test_utils:random",
"@seastar",
],
Expand Down
Loading