Skip to content

Commit

Permalink
[pipelineX](improvement) Support local shuffle for join and agg (#27852)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Dec 2, 2023
1 parent 6549842 commit 421ab56
Show file tree
Hide file tree
Showing 41 changed files with 281 additions and 144 deletions.
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,8 @@ AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
_pool(pool),
_limit(tnode.limit),
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()),
_is_streaming(is_streaming) {
_is_streaming(is_streaming),
_partition_exprs(tnode.agg_node.grouping_exprs) {
_is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase;
}

Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,14 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize ? ExchangeType::PASSTHROUGH : ExchangeType::NOOP;
}
return ExchangeType::SHUFFLE;
}

using DataSinkOperatorX<LocalStateType>::id;
using DataSinkOperatorX<LocalStateType>::operator_id;
using DataSinkOperatorX<LocalStateType>::get_local_state;
Expand Down Expand Up @@ -405,6 +413,8 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
int64_t _limit; // -1: no limit
bool _have_conjuncts;
const bool _is_streaming;

const std::vector<TExpr> _partition_exprs;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class DistinctStreamingAggSinkOperatorX final
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; }
};

} // namespace pipeline
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

std::string ExchangeSinkLocalState::id_name() {
std::string ExchangeSinkLocalState::name_suffix() {
std::string name = " (id=" + std::to_string(_parent->node_id());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
name += ",dest_id=" + std::to_string(p._dest_node_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

[[nodiscard]] int sender_id() const { return _sender_id; }

std::string id_name() override;
std::string name_suffix() override;
segment_v2::CompressionTypePB& compression_type();
std::string debug_string(int indentation_level) const override;

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
: OperatorX<ExchangeLocalState>(pool, tnode, operator_id, descs),
_num_senders(num_senders),
_is_merging(tnode.exchange_node.__isset.sort_info),
_is_hash_partition(
tnode.exchange_node.__isset.partition_type &&
(tnode.exchange_node.partition_type == TPartitionType::HASH_PARTITIONED ||
tnode.exchange_node.partition_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED)),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,13 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _sub_plan_query_statistics_recvr;
}

bool need_to_local_shuffle() const override {
// TODO(gabriel):
return false;
}
bool need_to_local_shuffle() const override { return !_is_hash_partition; }

private:
friend class ExchangeLocalState;
const int _num_senders;
const bool _is_merging;
const bool _is_hash_partition;
RowDescriptor _input_row_desc;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}

std::string FileScanLocalState::name_suffix() const {
return fmt::format(" (id={}. table name = {})", std::to_string(_parent->node_id()),
_parent->cast<FileScanOperatorX>()._table_name);
}

void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners =
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
int parent_id() { return _parent->node_id(); }
std::string name_suffix() const override;

private:
std::vector<TScanRangeParams> _scan_ranges;
Expand All @@ -70,14 +71,18 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
public:
FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, descs) {
: ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, descs),
_table_name(tnode.file_scan_node.__isset.table_name ? tnode.file_scan_node.table_name
: "") {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}

Status prepare(RuntimeState* state) override;

private:
friend class FileScanLocalState;

const std::string _table_name;
};

} // namespace doris::pipeline
1 change: 1 addition & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.right, ctx));
_partition_exprs.push_back(eq_join_conjunct.right);
_build_expr_ctxs.push_back(ctx);

const auto vexpr = _build_expr_ctxs.back()->root();
Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ class HashJoinBuildSinkOperatorX final
._should_build_hash_table;
}

std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _is_broadcast_join) {
return ExchangeType::NOOP;
}
return ExchangeType::SHUFFLE;
}

private:
friend class HashJoinBuildSinkLocalState;

Expand All @@ -171,6 +179,7 @@ class HashJoinBuildSinkOperatorX final

vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::vector<TExpr> _partition_exprs;
};

} // namespace pipeline
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ void HashJoinProbeLocalState::_prepare_probe_block() {
HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode,
int operator_id, const DescriptorTbl& descs)
: JoinProbeOperatorX<HashJoinProbeLocalState>(pool, tnode, operator_id, descs),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join),
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
? tnode.hash_join_node.hash_output_slot_ids
: std::vector<SlotId> {}) {}
Expand Down Expand Up @@ -549,6 +551,7 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, ctx));
_partition_exprs.push_back(eq_join_conjunct.left);
_probe_expr_ctxs.push_back(ctx);
bool null_aware = eq_join_conjunct.__isset.opcode &&
eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL;
Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,21 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
SourceState& source_state) const override;

bool need_more_input_data(RuntimeState* state) const override;
std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
}
return _is_broadcast_join ? ExchangeType::PASSTHROUGH : ExchangeType::SHUFFLE;
}

private:
Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs,
RuntimeProfile::Counter& expr_call_timer,
std::vector<int>& res_col_ids) const;
friend class HashJoinProbeLocalState;

const bool _is_broadcast_join;
// other expr
vectorized::VExprContextSPtrs _other_join_conjuncts;
// probe expr
Expand All @@ -182,6 +190,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
std::vector<std::string> _right_table_column_names;
std::vector<TExpr> _partition_exprs;
};

} // namespace pipeline
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/jdbc_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

namespace doris::pipeline {

std::string JDBCScanLocalState::name_suffix() const {
return fmt::format(" (id={}. table name = {})", std::to_string(_parent->node_id()),
_parent->cast<JDBCScanOperatorX>()._table_name);
}

Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
auto& p = _parent->cast<JDBCScanOperatorX>();
std::unique_ptr<vectorized::NewJdbcScanner> scanner = vectorized::NewJdbcScanner::create_unique(
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/jdbc_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class JDBCScanLocalState final : public ScanLocalState<JDBCScanLocalState> {
: ScanLocalState<JDBCScanLocalState>(state, parent) {}
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;

std::string name_suffix() const override;

private:
friend class vectorized::NewJdbcScanner;
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
}

Status set_child(OperatorXPtr child) override {
if (OperatorX<LocalStateType>::_child_x) {
if (OperatorX<LocalStateType>::_child_x && _build_side_child == nullptr) {
// when there already (probe) child, others is build child.
set_build_side_child(child);
} else {
Expand Down Expand Up @@ -113,7 +113,7 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
std::unique_ptr<RowDescriptor> _intermediate_row_desc;
// output expr
vectorized::VExprContextSPtrs _output_expr_ctxs;
OperatorXPtr _build_side_child;
OperatorXPtr _build_side_child = nullptr;
const bool _short_circuit_for_null_in_build_side;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
}

std::string MultiCastDataStreamSinkLocalState::id_name() {
std::string MultiCastDataStreamSinkLocalState::name_suffix() {
auto& sinks = static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)->sink_node().sinks;
std::string id_name = " (dst id : ";
for (auto& sink : sinks) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MultiCastDataStreamSinkLocalState final
using Base = PipelineXSinkLocalState<MultiCastSinkDependency>;
using Parent = MultiCastDataStreamSinkOperatorX;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
std::string id_name() override;
std::string name_suffix() override;

private:
std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}

TOlapScanNode& OlapScanLocalState::olap_scan_node() {
TOlapScanNode& OlapScanLocalState::olap_scan_node() const {
return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
}

Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
OlapScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState(state, parent) {}

TOlapScanNode& olap_scan_node();
TOlapScanNode& olap_scan_node() const;

std::string name_suffix() const override {
return fmt::format(" (id={}. table name = {})", std::to_string(_parent->node_id()),
olap_scan_node().table_name);
}

private:
friend class vectorized::NewOlapScanner;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class OperatorBase {
OperatorPtr _child;

// Used on pipeline X
OperatorXPtr _child_x;
OperatorXPtr _child_x = nullptr;

bool _is_closed;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/streaming_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class StreamingAggSinkOperatorX final : public AggSinkOperatorX<StreamingAggSink
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; }
};

} // namespace pipeline
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

private:
void _init_profile();
Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ PipelinePtr PipelineFragmentContext::add_pipeline() {
return pipeline;
}

PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent) {
// _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
_pipelines.emplace_back(pipeline);
parent->set_children(pipeline);
return pipeline;
}

Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
const size_t idx) {
if (_prepared) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag

PipelinePtr add_pipeline();

PipelinePtr add_pipeline(PipelinePtr parent);

TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; }

virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) {
Expand Down
19 changes: 19 additions & 0 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,25 @@ struct SetSharedState : public BasicSharedState {
}
};

enum class ExchangeType : uint8_t {
NOOP = 0,
SHUFFLE = 1,
PASSTHROUGH = 2,
};

inline std::string get_exchange_type_name(ExchangeType idx) {
switch (idx) {
case ExchangeType::NOOP:
return "NOOP";
case ExchangeType::SHUFFLE:
return "SHUFFLE";
case ExchangeType::PASSTHROUGH:
return "PASSTHROUGH";
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}

class Exchanger;

struct LocalExchangeSharedState : public BasicSharedState {
Expand Down
Loading

0 comments on commit 421ab56

Please sign in to comment.