Skip to content

Commit

Permalink
[Enhancement] support incremental scan ranges deployment at BE side (#…
Browse files Browse the repository at this point in the history
…50254)

Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt committed Sep 19, 2024
1 parent ce25287 commit 7f57234
Show file tree
Hide file tree
Showing 28 changed files with 259 additions and 73 deletions.
13 changes: 4 additions & 9 deletions be/src/connector/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,9 @@ StatusOr<pipeline::MorselQueuePtr> DataSourceProvider::convert_scan_range_to_mor
peek_scan_ranges(scan_ranges);

pipeline::Morsels morsels;
// If this scan node does not accept non-empty scan ranges, create a placeholder one.
if (!accept_empty_scan_ranges() && scan_ranges.empty()) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, TScanRangeParams()));
} else {
for (const auto& scan_range : scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, scan_range));
}
}
bool has_more_morsel = false;
pipeline::ScanMorsel::build_scan_morsels(node_id, scan_ranges, accept_empty_scan_ranges(), &morsels,
&has_more_morsel);

if (partition_order_hint().has_value()) {
bool asc = partition_order_hint().value();
Expand All @@ -143,7 +138,7 @@ StatusOr<pipeline::MorselQueuePtr> DataSourceProvider::convert_scan_range_to_mor
});
}

auto morsel_queue = std::make_unique<pipeline::DynamicMorselQueue>(std::move(morsels));
auto morsel_queue = std::make_unique<pipeline::DynamicMorselQueue>(std::move(morsels), has_more_morsel);
if (scan_parallelism > 0) {
morsel_queue->set_max_degree_of_parallelism(scan_parallelism);
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/capture_version_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pipeline::OpFactories CaptureVersionNode::decompose_to_pipeline(pipeline::Pipeli
StatusOr<pipeline::MorselQueueFactoryPtr> CaptureVersionNode::scan_range_to_morsel_queue_factory(
const std::vector<TScanRangeParams>& scan_ranges) {
pipeline::Morsels morsels;
for (const auto& scan_range : scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(id(), scan_range));
}
[[maybe_unused]] bool has_more_morsel = false;
pipeline::ScanMorsel::build_scan_morsels(id(), scan_ranges, true, &morsels, &has_more_morsel);
DCHECK(has_more_morsel == false);
auto morsel_queue = std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
return std::make_unique<pipeline::SharedMorselQueueFactory>(std::move(morsel_queue), 1);
}
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,6 @@ StatusOr<pipeline::MorselQueuePtr> ConnectorScanNode::convert_scan_range_to_mors
const std::vector<TScanRangeParams>& scan_ranges, int node_id, int32_t pipeline_dop,
bool enable_tablet_internal_parallel, TTabletInternalParallelMode::type tablet_internal_parallel_mode,
size_t num_total_scan_ranges) {
_data_source_provider->peek_scan_ranges(scan_ranges);
return _data_source_provider->convert_scan_range_to_morsel_queue(
scan_ranges, node_id, pipeline_dop, enable_tablet_internal_parallel, tablet_internal_parallel_mode,
num_total_scan_ranges);
Expand Down
7 changes: 4 additions & 3 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,10 @@ StatusOr<pipeline::MorselQueuePtr> OlapScanNode::convert_scan_range_to_morsel_qu
bool enable_tablet_internal_parallel, TTabletInternalParallelMode::type tablet_internal_parallel_mode,
size_t num_total_scan_ranges) {
pipeline::Morsels morsels;
for (const auto& scan_range : scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, scan_range));
}
[[maybe_unused]] bool has_more_morsel = false;
pipeline::ScanMorsel::build_scan_morsels(node_id, scan_ranges, accept_empty_scan_ranges(), &morsels,
&has_more_morsel);
DCHECK(has_more_morsel == false);

if (partition_order_hint().has_value()) {
bool asc = partition_order_hint().value();
Expand Down
30 changes: 30 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,4 +853,34 @@ void FragmentExecutor::_fail_cleanup(bool fragment_has_registed) {
}
}

Status FragmentExecutor::append_incremental_scan_ranges(ExecEnv* exec_env, const TExecPlanFragmentParams& request) {
DCHECK(!request.__isset.fragment);
DCHECK(request.__isset.params);
const TPlanFragmentExecParams& params = request.params;
const TUniqueId& query_id = params.query_id;
const TUniqueId& instance_id = params.fragment_instance_id;

QueryContextPtr query_ctx = exec_env->query_context_mgr()->get(query_id);
if (query_ctx == nullptr) return Status::OK();
FragmentContextPtr fragment_ctx = query_ctx->fragment_mgr()->get(instance_id);
if (fragment_ctx == nullptr) return Status::OK();

for (const auto& [node_id, scan_ranges] : params.per_node_scan_ranges) {
auto iter = fragment_ctx->morsel_queue_factories().find(node_id);
if (iter == fragment_ctx->morsel_queue_factories().end()) {
continue;
}
MorselQueueFactory* morsel_queue_factory = iter->second.get();
if (morsel_queue_factory == nullptr) {
continue;
}

pipeline::Morsels morsels;
bool has_more_morsel = false;
pipeline::ScanMorsel::build_scan_morsels(node_id, scan_ranges, true, &morsels, &has_more_morsel);
RETURN_IF_ERROR(morsel_queue_factory->append_morsels(std::move(morsels), has_more_morsel));
}
return Status::OK();
}

} // namespace starrocks::pipeline
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class FragmentExecutor {
const TExecPlanFragmentParams& unique_request);
Status execute(ExecEnv* exec_env);

static Status append_incremental_scan_ranges(ExecEnv* exec_env, const TExecPlanFragmentParams& request);

private:
void _fail_cleanup(bool fragment_has_registed);
uint32_t _calc_dop(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) const;
Expand Down
16 changes: 9 additions & 7 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ int ConnectorScanOperator::available_pickup_morsel_count() {
return io_tasks;
}

void ConnectorScanOperator::append_morsels(std::vector<MorselPtr>&& morsels) {
Status ConnectorScanOperator::append_morsels(std::vector<MorselPtr>&& morsels) {
query_cache::TicketChecker* ticket_checker = _ticket_checker.get();
if (ticket_checker != nullptr) {
int64_t cached_owner_id = -1;
Expand All @@ -613,7 +613,8 @@ void ConnectorScanOperator::append_morsels(std::vector<MorselPtr>&& morsels) {
}
}
}
_morsel_queue->append_morsels(std::move(morsels));
RETURN_IF_ERROR(_morsel_queue->append_morsels(std::move(morsels)));
return Status::OK();
}

// ==================== ConnectorChunkSource ====================
Expand Down Expand Up @@ -728,17 +729,18 @@ Status ConnectorChunkSource::_open_data_source(RuntimeState* state, bool* mem_al

ConnectorScanOperator* scan_op = down_cast<ConnectorScanOperator*>(_scan_op);
if (scan_op->enable_adaptive_io_tasks()) {
ConnectorScanOperatorIOTasksMemLimiter* limiter = _get_io_tasks_mem_limiter();
MemTracker* mem_tracker = state->query_ctx()->connector_scan_mem_tracker();

[[maybe_unused]] auto build_debug_string = [&](const std::string& action) {
std::stringstream ss;
ss << "try_mem_tracker. query_id = " << print_id(state->query_id())
<< ", op_id = " << _scan_op->get_plan_node_id() << "/" << _scan_op->get_driver_sequence() << ", "
<< action << ". this = " << (void*)this << ", value = " << _request_mem_tracker_bytes;
<< action << ". this = " << (void*)this << ", value = " << _request_mem_tracker_bytes
<< ", running = " << limiter->update_running_chunk_source_count(0);
return ss.str();
};

ConnectorScanOperatorIOTasksMemLimiter* limiter = _get_io_tasks_mem_limiter();
MemTracker* mem_tracker = state->query_ctx()->connector_scan_mem_tracker();

int retry = 3;
while (retry > 0) {
retry--;
Expand Down Expand Up @@ -877,7 +879,7 @@ Status ConnectorChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) {
split_morsels.emplace_back(std::move(m));
}

scan_op->append_morsels(std::move(split_morsels));
RETURN_IF_ERROR(scan_op->append_morsels(std::move(split_morsels)));
}
}
return Status::EndOfFile("");
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class ConnectorScanOperator : public ScanOperator {
void end_driver_process(PipelineDriver* driver) override;
bool is_running_all_io_tasks() const override;

void append_morsels(std::vector<MorselPtr>&& morsels);
Status append_morsels(std::vector<MorselPtr>&& morsels);
ConnectorScanOperatorAdaptiveProcessor* adaptive_processor() const { return _adaptive_processor; }
bool enable_adaptive_io_tasks() const { return _enable_adaptive_io_tasks; }

Expand Down
37 changes: 36 additions & 1 deletion be/src/exec/pipeline/scan/morsel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ namespace starrocks::pipeline {

const std::vector<BaseRowsetSharedPtr> ScanMorselX::kEmptyRowsets;

void ScanMorsel::build_scan_morsels(int node_id, const std::vector<TScanRangeParams>& scan_ranges,
bool accept_empty_scan_ranges, pipeline::Morsels* ptr_morsels,
bool* has_more_morsel) {
pipeline::Morsels& morsels = *ptr_morsels;
*has_more_morsel = false;
for (const auto& scan_range : scan_ranges) {
if (scan_range.__isset.empty && scan_range.empty) {
if (scan_range.__isset.has_more) {
*has_more_morsel = scan_range.has_more;
}
continue;
}
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, scan_range));
}

if (morsels.empty() && !accept_empty_scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, TScanRangeParams()));
}
}

void PhysicalSplitScanMorsel::init_tablet_reader_params(TabletReaderParams* params) {
params->rowid_range_option = _rowid_range_option;
}
Expand All @@ -48,6 +68,12 @@ size_t SharedMorselQueueFactory::num_original_morsels() const {
return _queue->num_original_morsels();
}

Status SharedMorselQueueFactory::append_morsels(Morsels&& morsels, bool has_more) {
RETURN_IF_ERROR(_queue->append_morsels(std::move(morsels)));
_queue->set_has_more(has_more);
return Status::OK();
}

size_t IndividualMorselQueueFactory::num_original_morsels() const {
size_t total = 0;
for (const auto& queue : _queue_per_driver_seq) {
Expand All @@ -56,6 +82,10 @@ size_t IndividualMorselQueueFactory::num_original_morsels() const {
return total;
}

Status MorselQueueFactory::append_morsels(Morsels&& morsels, bool has_more) {
return Status::NotSupported("append_morsels not supported");
}

IndividualMorselQueueFactory::IndividualMorselQueueFactory(std::map<int, MorselQueuePtr>&& queue_per_driver_seq,
bool could_local_shuffle)
: _could_local_shuffle(could_local_shuffle) {
Expand Down Expand Up @@ -124,6 +154,10 @@ void MorselQueue::unget(MorselPtr&& morsel) {
_unget_morsel = std::move(morsel);
}

Status MorselQueue::append_morsels(Morsels&& morsels) {
return Status::NotSupported("append_morsels not supported");
}

StatusOr<MorselPtr> FixedMorselQueue::try_get() {
if (_unget_morsel != nullptr) {
return std::move(_unget_morsel);
Expand Down Expand Up @@ -857,12 +891,13 @@ void DynamicMorselQueue::unget(MorselPtr&& morsel) {
_queue.emplace_front(std::move(morsel));
}

void DynamicMorselQueue::append_morsels(std::vector<MorselPtr>&& morsels) {
Status DynamicMorselQueue::append_morsels(std::vector<MorselPtr>&& morsels) {
std::lock_guard<std::mutex> _l(_mutex);
_size += morsels.size();
// add split morsels to front of this queue.
// so this new morsels share same owner_id with recently processed morsel.
_queue.insert(_queue.begin(), std::make_move_iterator(morsels.begin()), std::make_move_iterator(morsels.end()));
return Status::OK();
}

} // namespace starrocks::pipeline
25 changes: 18 additions & 7 deletions be/src/exec/pipeline/scan/morsel.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ class ScanMorsel : public ScanMorselX {
}
}

~ScanMorsel() override = default;

ScanMorsel(int32_t plan_node_id, const TScanRangeParams& scan_range)
: ScanMorsel(plan_node_id, scan_range.scan_range) {}

~ScanMorsel() override = default;

TScanRange* get_scan_range() { return _scan_range.get(); }

TInternalScanRange* get_olap_scan_range() { return &(_scan_range->internal_scan_range); }
Expand All @@ -181,6 +181,9 @@ class ScanMorsel : public ScanMorselX {
bool is_ticket_checker_entered() const { return _ticket_checker_entered; }
void set_ticket_checker_entered(bool v) { _ticket_checker_entered = v; }

static void build_scan_morsels(int node_id, const std::vector<TScanRangeParams>& scan_ranges,
bool accept_empty_scan_ranges, pipeline::Morsels* morsels, bool* has_more_morsel);

private:
std::unique_ptr<TScanRange> _scan_range;
ScanSplitContextPtr _split_context = nullptr;
Expand Down Expand Up @@ -244,6 +247,8 @@ class MorselQueueFactory {

virtual bool is_shared() const = 0;
virtual bool could_local_shuffle() const = 0;

virtual Status append_morsels(Morsels&& morsels, bool has_more);
};

class SharedMorselQueueFactory final : public MorselQueueFactory {
Expand All @@ -258,6 +263,8 @@ class SharedMorselQueueFactory final : public MorselQueueFactory {
bool is_shared() const override { return true; }
bool could_local_shuffle() const override { return true; }

Status append_morsels(Morsels&& morsels, bool has_more) override;

private:
MorselQueuePtr _queue;
const int _size;
Expand Down Expand Up @@ -342,10 +349,13 @@ class MorselQueue {
virtual void unget(MorselPtr&& morsel);
virtual std::string name() const = 0;
virtual StatusOr<bool> ready_for_next() const { return true; }
virtual void append_morsels(Morsels&& morsels) {}
virtual Status append_morsels(Morsels&& morsels);
virtual Type type() const = 0;
bool has_more() const { return _has_more; }
void set_has_more(bool v) { _has_more = v; }

protected:
std::atomic<bool> _has_more = false;
Morsels _morsels;
size_t _num_morsels = 0;
MorselPtr _unget_morsel = nullptr;
Expand Down Expand Up @@ -394,7 +404,7 @@ class BucketSequenceMorselQueue : public MorselQueue {
StatusOr<MorselPtr> try_get() override;
std::string name() const override;
StatusOr<bool> ready_for_next() const override;
void append_morsels(Morsels&& morsels) override { _morsel_queue->append_morsels(std::move(morsels)); }
Status append_morsels(Morsels&& morsels) override { return _morsel_queue->append_morsels(std::move(morsels)); }
Type type() const override { return BUCKET_SEQUENCE; }

private:
Expand Down Expand Up @@ -555,18 +565,19 @@ class LogicalSplitMorselQueue final : public SplitMorselQueue {

class DynamicMorselQueue final : public MorselQueue {
public:
explicit DynamicMorselQueue(Morsels&& morsels) {
append_morsels(std::move(morsels));
explicit DynamicMorselQueue(Morsels&& morsels, bool has_more) {
(void)append_morsels(std::move(morsels));
_size = _num_morsels = _queue.size();
_degree_of_parallelism = _num_morsels;
_has_more = has_more;
}

~DynamicMorselQueue() override = default;
bool empty() const override { return _size.load(std::memory_order_relaxed) == 0; }
StatusOr<MorselPtr> try_get() override;
void unget(MorselPtr&& morsel) override;
std::string name() const override { return "dynamic_morsel_queue"; }
void append_morsels(Morsels&& morsels) override;
Status append_morsels(Morsels&& morsels) override;
void set_ticket_checker(const query_cache::TicketCheckerPtr& ticket_checker) override {
_ticket_checker = ticket_checker;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ bool ScanOperator::is_finished() const {
}

// Any io task is running or needs to run.
if (_num_running_io_tasks > 0 || !_morsel_queue->empty()) {
if (_num_running_io_tasks > 0 || _morsel_queue->has_more() || !_morsel_queue->empty()) {
return false;
}

Expand Down
16 changes: 6 additions & 10 deletions be/src/exec/scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ static std::map<int, pipeline::MorselQueuePtr> uniform_distribute_morsels(pipeli
}
} else {
for (auto& [operator_seq, morsels] : morsels_per_driver) {
queue_per_driver.emplace(operator_seq, std::make_unique<pipeline::DynamicMorselQueue>(std::move(morsels)));
queue_per_driver.emplace(operator_seq, std::make_unique<pipeline::DynamicMorselQueue>(
std::move(morsels), morsel_queue->has_more()));
}
}

Expand Down Expand Up @@ -186,15 +187,10 @@ StatusOr<pipeline::MorselQueuePtr> ScanNode::convert_scan_range_to_morsel_queue(
bool enable_tablet_internal_parallel, TTabletInternalParallelMode::type tablet_internal_parallel_mode,
size_t num_total_scan_ranges) {
pipeline::Morsels morsels;
// If this scan node does not accept non-empty scan ranges, create a placeholder one.
if (!accept_empty_scan_ranges() && scan_ranges.empty()) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, TScanRangeParams()));
} else {
for (const auto& scan_range : scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, scan_range));
}
}

[[maybe_unused]] bool has_more_morsel = false;
pipeline::ScanMorsel::build_scan_morsels(node_id, scan_ranges, accept_empty_scan_ranges(), &morsels,
&has_more_morsel);
DCHECK(has_more_morsel == false);
return std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ Status PInternalServiceImplBase<T>::_exec_plan_fragment(brpc::Controller* cntl,
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, request->attachment_protocol(), &t_request));
}
// incremental scan ranges deployment.
if (!t_request.__isset.fragment) {
return pipeline::FragmentExecutor::append_incremental_scan_ranges(_exec_env, t_request);
}

if (UNLIKELY(!t_request.query_options.__isset.batch_size)) {
return Status::InvalidArgument("batch_size is not set");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Status TabletReader::open(const TabletReaderParams& read_params) {
return init_collector(read_params);
}

std::vector<std::unique_ptr<pipeline::ScanMorsel>> morsels;
pipeline::Morsels morsels;
morsels.emplace_back(
std::make_unique<pipeline::ScanMorsel>(read_params.plan_node_id, *(read_params.scan_range)));

Expand Down
Loading

0 comments on commit 7f57234

Please sign in to comment.