Skip to content

Commit

Permalink
[opt]fixed partitioners, local_exchange and murmur-hash calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitin-Kashyap committed Dec 13, 2024
1 parent a4f29a1 commit 5c27041
Show file tree
Hide file tree
Showing 39 changed files with 484 additions and 212 deletions.
12 changes: 10 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -728,14 +728,22 @@ inline std::string get_exchange_type_name(ExchangeType idx) {
}

struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type) {}
DataDistribution(ExchangeType type) : distribution_type(type), hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
: distribution_type(type),
partition_exprs(partition_exprs_),
hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const THashType::type hash_type)
: distribution_type(type), hash_type(hash_type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_,
const THashType::type hash)
: distribution_type(type), partition_exprs(partition_exprs_), hash_type(hash) {}
DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
DataDistribution& operator=(const DataDistribution& other) = default;
ExchangeType distribution_type;
std::vector<TExpr> partition_exprs;
THashType::type hash_type;
};

class ExchangerBase;
Expand Down
141 changes: 137 additions & 4 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf

auto& p = _parent->cast<ExchangeSinkOperatorX>();
_part_type = p._part_type;
_hash_type = p._hash_type;
std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < p._dests.size(); ++i) {
const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
Expand Down Expand Up @@ -132,9 +133,18 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
if (_hash_type == THashType::SPARK_MURMUR32) {
_partitioner.reset(
new vectorized::Murmur32HashPartitioner<vectorized::ShufflePModChannelIds>(
channels.size()));
_profile->add_info_string("Partitioner",
fmt::format("Murmur32HashPartitioner({})", _partition_count));
} else {
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
channels.size()));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
Expand Down Expand Up @@ -199,6 +209,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
_writer.reset(new Writer());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
_part_type = p._part_type;
_hash_type = p._hash_type;

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
Expand Down Expand Up @@ -243,6 +255,100 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
}
}
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channels.size();
if (_hash_type == THashType::SPARK_MURMUR32) {
_partitioner = std::make_unique<
vectorized::Murmur32HashPartitioner<vectorized::ShufflePModChannelIds>>(
channels.size());
_profile->add_info_string("Partitioner",
fmt::format("Murmur32HashPartitioner({})", _partition_count));
} else {
_partitioner = std::make_unique<
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));

} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_partition_count = channels.size();
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
_txn_id = p._tablet_sink_txn_id;
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema));
_vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, p._tablet_sink_partition);
RETURN_IF_ERROR(_vpartition->init());
auto find_tablet_mode = vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
_tablet_finder =
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), find_tablet_mode);
_tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
_tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false));
_tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
}
// if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor =
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
_block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
_state->batch_size());
_location = p._pool->add(new OlapTableLocationParam(p._tablet_sink_location));
_row_distribution.init(
{.state = _state,
.block_convertor = _block_convertor.get(),
.tablet_finder = _tablet_finder.get(),
.vpartition = _vpartition.get(),
.add_partition_request_timer = _add_partition_request_timer,
.txn_id = _txn_id,
.pool = p._pool.get(),
.location = _location,
.vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
.schema = _schema,
.caller = (void*)this,
.create_partition_callback = &ExchangeSinkLocalState::empty_callback_function});
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
_partition_count =
channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer;
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
_partition_count);
_partition_function = std::make_unique<HashPartitionFunction>(_partitioner.get());

scale_writer_partitioning_exchanger = std::make_unique<
vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>(
channels.size(), *_partition_function, _partition_count, channels.size(), 1,
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num(),
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num());

RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}

if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
Expand Down Expand Up @@ -273,6 +379,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
_part_type(sink.output_partition.type),
_hash_type(sink.output_partition.hash_type),
_dests(destinations),
_dest_node_id(sink.dest_node_id),
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc),
Expand All @@ -293,6 +400,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
DCHECK(sink.output_partition.hash_type == THashType::CRC32 ||
sink.output_partition.hash_type == THashType::XXHASH64 ||
sink.output_partition.hash_type == THashType::SPARK_MURMUR32);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
if (sink.__isset.output_tuple_id) {
Expand All @@ -312,6 +422,28 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}

std::string ExchangeSinkOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));

string dest_names;
for (const auto& dest : _dests) {
if (dest_names.empty()) {
dest_names += print_id(dest.fragment_instance_id);
} else {
dest_names += ", " + print_id(dest.fragment_instance_id);
}
}

fmt::format_to(debug_string_buffer,
", Info: (_num_recievers = {}, _dest_node_id = {},"
", _partition_type = {}, _hash_type = {},"
" _destinations = [{}])",
_dests.size(), _dest_node_id, to_string(_part_type), to_string(_hash_type),
dest_names);
return fmt::to_string(debug_string_buffer);
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
Expand Down Expand Up @@ -389,7 +521,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (serialized) {
auto cur_block = local_state._serializer.get_block()->to_block();
if (!cur_block.empty()) {
DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0);
DCHECK(eos || local_state._serializer.is_local())
<< Base::debug_string(state, 0);
RETURN_IF_ERROR(local_state._serializer.serialize_block(
&cur_block, block_holder->get_block(),
local_state._rpc_channels_num));
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
TPartitionType::type _part_type;
THashType::type _hash_type;

std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
Expand All @@ -183,13 +184,17 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
using Base = DataSinkOperatorX<ExchangeSinkLocalState>;

public:
ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TUniqueId>& fragment_instance_ids);
Status init(const TDataSink& tsink) override;

[[nodiscard]] std::string debug_string(int indentation_level) const override;

RuntimeState* state() { return _state; }

Status open(RuntimeState* state) override;
Expand Down Expand Up @@ -228,6 +233,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
TTupleId _output_tuple_id = -1;

TPartitionType::type _part_type;
THashType::type _hash_type;

// serialized batches for broadcasting; we need two so we can write
// one while the other one is still being sent
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})",
_num_senders, _is_merging);
fmt::format_to(debug_string_buffer,
", Info: (_num_senders = {}, _is_merging = {}, _hash_type = {})", _num_senders,
_is_merging, to_string(_hash_type));
return fmt::to_string(debug_string_buffer);
}

Expand Down Expand Up @@ -106,6 +107,8 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
_partition_type(tnode.exchange_node.__isset.partition_type
? tnode.exchange_node.partition_type
: TPartitionType::UNPARTITIONED),
_hash_type(tnode.exchange_node.__isset.hash_type ? tnode.exchange_node.hash_type
: THashType::CRC32),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _partition_type == TPartitionType::HASH_PARTITIONED
? DataDistribution(ExchangeType::HASH_SHUFFLE)
: _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _hash_type)
: DataDistribution(ExchangeType::NOOP);
}

Expand All @@ -99,6 +99,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
const int _num_senders;
const bool _is_merging;
const TPartitionType::type _partition_type;
const THashType::type _hash_type;
RowDescriptor _input_row_desc;

// use in merge sort
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p
descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_hash_type(tnode.hash_join_node.__isset.hash_type ? tnode.hash_join_node.hash_type
: THashType::CRC32),
_distribution_partition_exprs(tnode.__isset.distribute_expr_lists
? tnode.distribute_expr_lists[1]
: std::vector<TExpr> {}),
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class PartitionedHashJoinSinkOperatorX
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_distribution_partition_exprs)
_distribution_partition_exprs, _hash_type)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribution_partition_exprs);
}
Expand All @@ -134,6 +134,7 @@ class PartitionedHashJoinSinkOperatorX
Status _setup_internal_operator(RuntimeState* state);

const TJoinDistributionType::type _join_distribution;
THashType::type _hash_type;

std::vector<TExpr> _build_exprs;

Expand Down
20 changes: 10 additions & 10 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ class SipHash;
} \
}

#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
}

namespace doris::vectorized {
Expand Down
Loading

0 comments on commit 5c27041

Please sign in to comment.