Skip to content

Commit

Permalink
fixed partitioners
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitin-Kashyap committed Dec 5, 2024
1 parent 6d43a23 commit c76ddc5
Show file tree
Hide file tree
Showing 26 changed files with 256 additions and 140 deletions.
9 changes: 8 additions & 1 deletion be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,14 @@ inline std::string get_exchange_type_name(ExchangeType idx) {
struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type), hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
: distribution_type(type),
partition_exprs(partition_exprs_),
hash_type(THashType::CRC32) {}
DataDistribution(ExchangeType type, const THashType::type hash_type)
: distribution_type(type), hash_type(hash_type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_,
const THashType::type hash)
: distribution_type(type), partition_exprs(partition_exprs_), hash_type(hash) {}
DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
DataDistribution& operator=(const DataDistribution& other) = default;
Expand Down
37 changes: 31 additions & 6 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channel_shared_ptrs.size();
_partition_count = channels.size();
if (_hash_type == THashType::SPARK_MURMUR32) {
_partitioner.reset(new vectorized::Murmur32HashPartitioner<vectorized::ShufflePModChannelIds>(
channel_shared_ptrs.size()));
_partitioner = std::make_unique<
vectorized::Murmur32HashPartitioner<vectorized::ShufflePModChannelIds>>(
channels.size());
_profile->add_info_string("Partitioner",
fmt::format("Murmur32HashPartitioner({})", _partition_count));
} else {
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
channel_shared_ptrs.size()));
_partitioner = std::make_unique<
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}
Expand Down Expand Up @@ -358,6 +360,28 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}

std::string ExchangeSinkOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));

string dest_names;
for (const auto& dest : _dests) {
if (dest_names.empty()) {
dest_names += print_id(dest.fragment_instance_id);
} else {
dest_names += ", " + print_id(dest.fragment_instance_id);
}
}

fmt::format_to(debug_string_buffer,
", Info: (_num_recievers = {}, _dest_node_id = {},"
", _partition_type = {}, _hash_type = {},"
" _destinations = [{}])",
_dests.size(), _dest_node_id, to_string(_part_type), to_string(_hash_type),
dest_names);
return fmt::to_string(debug_string_buffer);
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
Expand Down Expand Up @@ -430,7 +454,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (serialized) {
auto cur_block = local_state._serializer.get_block()->to_block();
if (!cur_block.empty()) {
DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0);
DCHECK(eos || local_state._serializer.is_local())
<< Base::debug_string(state, 0);
RETURN_IF_ERROR(local_state._serializer.serialize_block(
&cur_block, block_holder->get_block(),
local_state._rpc_channels_num));
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,16 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
using Base = DataSinkOperatorX<ExchangeSinkLocalState>;

public:
ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations);
Status init(const TDataSink& tsink) override;

[[nodiscard]] std::string debug_string(int indentation_level) const override;

RuntimeState* state() { return _state; }

Status open(RuntimeState* state) override;
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})",
_num_senders, _is_merging);
fmt::format_to(debug_string_buffer,
", Info: (_num_senders = {}, _is_merging = {}, _hash_type = {})", _num_senders,
_is_merging, to_string(_hash_type));
return fmt::to_string(debug_string_buffer);
}

Expand Down Expand Up @@ -106,6 +107,8 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
_partition_type(tnode.exchange_node.__isset.partition_type
? tnode.exchange_node.partition_type
: TPartitionType::UNPARTITIONED),
_hash_type(tnode.exchange_node.__isset.hash_type ? tnode.exchange_node.hash_type
: THashType::CRC32),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _partition_type == TPartitionType::HASH_PARTITIONED
? DataDistribution(ExchangeType::HASH_SHUFFLE)
: _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _hash_type)
: DataDistribution(ExchangeType::NOOP);
}

Expand All @@ -99,6 +99,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
const int _num_senders;
const bool _is_merging;
const TPartitionType::type _partition_type;
const THashType::type _hash_type;
RowDescriptor _input_row_desc;

// use in merge sort
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p
descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_hash_type(tnode.hash_join_node.__isset.__isset.hash_type ? tnode.hash_join_node.hash_type
: THashType::CRC32),
_hash_type(tnode.hash_join_node.__isset.hash_type ? tnode.hash_join_node.hash_type
: THashType::CRC32),
_distribution_partition_exprs(tnode.__isset.distribute_expr_lists
? tnode.distribute_expr_lists[1]
: std::vector<TExpr> {}),
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ class PartitionedHashJoinSinkOperatorX
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_distribution_partition_exprs,
hash_type)
_distribution_partition_exprs, _hash_type)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribution_partition_exprs);
}
Expand Down
16 changes: 8 additions & 8 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ class SipHash;
} \
}

#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL() \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hashes[i]); \
} \
} \
}

namespace doris::vectorized {
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ void ColumnNullable::update_murmur_with_value(size_t start, size_t end, int32_t&
nested_column->update_murmur_with_value(start, end, hash, nullptr);
} else {
const auto* __restrict real_null_data =
assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
for (int i = start; i < end; ++i) {
assert_cast<const ColumnUInt8&>(get_null_map_column()).get_data().data();
for (size_t i = start; i < end; ++i) {
if (real_null_data[i] != 0) {
hash = HashUtil::murmur_hash3_32_null(hash);
}
Expand Down Expand Up @@ -140,7 +140,7 @@ void ColumnNullable::update_murmurs_with_value(int32_t* __restrict hashes,
auto s = rows;
DCHECK(s == size());
const auto* __restrict real_null_data =
assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
assert_cast<const ColumnUInt8&>(get_null_map_column()).get_data().data();
if (!has_null()) {
nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr);
} else {
Expand Down
22 changes: 12 additions & 10 deletions be/src/vec/runtime/partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
namespace doris::vectorized {

template <typename HashValueType, typename ChannelIds>
Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* state, Block* block,
MemTracker* mem_tracker) const {
Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* state,
Block* block) const {
int rows = block->rows();

if (rows > 0) {
Expand Down Expand Up @@ -55,7 +55,7 @@ Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* sta
template <typename ChannelIds>
void Crc32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column,
uint32_t* __restrict result, int idx) const {
column->update_crcs_with_value(result, _partition_expr_ctxs[idx]->root()->type().type,
column->update_crcs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type,
cast_set<uint32_t>(column->size()));
}

Expand All @@ -69,20 +69,22 @@ void Murmur32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column,
template <typename ChannelIds>
Status Crc32HashPartitioner<ChannelIds>::clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) {
auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(cast_set<int>(_partition_count));
auto* new_partitioner =
new Crc32HashPartitioner<ChannelIds>(cast_set<int>(Base::_partition_count));
partitioner.reset(new_partitioner);
new_partitioner->_partition_expr_ctxs.resize(_partition_expr_ctxs.size());
for (size_t i = 0; i < _partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(
_partition_expr_ctxs[i]->clone(state, new_partitioner->_partition_expr_ctxs[i]));
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone(
state, new_partitioner->_partition_expr_ctxs[i]));
}
return Status::OK();
}

template <typename ChannelIds>
Status Murmur32HashPartitioner<ChannelIds>::clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) {
auto* new_partitioner = new Murmur32HashPartitioner(Base::_partition_count);
auto* new_partitioner =
new Murmur32HashPartitioner<ChannelIds>(cast_set<int>(Base::_partition_count));
partitioner.reset(new_partitioner);
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
Expand All @@ -94,7 +96,7 @@ Status Murmur32HashPartitioner<ChannelIds>::clone(RuntimeState* state,

template <typename ChannelIds>
int32_t Murmur32HashPartitioner<ChannelIds>::_get_default_seed() const {
return reinterpret_cast<int32_t>(HashUtil::SPARK_MURMUR_32_SEED);
return static_cast<int32_t>(HashUtil::SPARK_MURMUR_32_SEED);
}

template class Crc32HashPartitioner<ShuffleChannelIds>;
Expand Down
48 changes: 33 additions & 15 deletions be/src/vec/runtime/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ class PartitionerBase {
const size_t _partition_count;
};

template <typename ChannelIds>
class Crc32HashPartitioner : public PartitionerBase {
template <typename HashValueType, typename ChannelIds>
class Partitioner : public PartitionerBase {
public:
Crc32HashPartitioner(int partition_count) : PartitionerBase(partition_count) {}
~Crc32HashPartitioner() override = default;
Partitioner(int partition_count) : PartitionerBase(partition_count) {}
~Partitioner() override = default;

Status init(const std::vector<TExpr>& texprs) override {
return VExpr::create_expr_trees(texprs, _partition_expr_ctxs);
Expand All @@ -76,9 +76,9 @@ class Crc32HashPartitioner : public PartitionerBase {

Status do_partitioning(RuntimeState* state, Block* block) const override;

ChannelField get_channel_ids() const override { return {_hash_vals.data(), sizeof(uint32_t)}; }

Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override;
ChannelField get_channel_ids() const override {
return {_hash_vals.data(), sizeof(HashValueType)};
}

protected:
Status _get_partition_column_result(Block* block, std::vector<int>& result) const {
Expand All @@ -89,14 +89,12 @@ class Crc32HashPartitioner : public PartitionerBase {
return Status::OK();
}

void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const;

HashValueType _get_default_seed() const {
return reinterpret_cast<HashValueType>(0);
}
virtual void _do_hash(const ColumnPtr& column, HashValueType* __restrict result,
int idx) const = 0;
virtual HashValueType _get_default_seed() const { return static_cast<HashValueType>(0); }

VExprContextSPtrs _partition_expr_ctxs;
mutable std::vector<uint32_t> _hash_vals;
mutable std::vector<HashValueType> _hash_vals;
};

struct ShuffleChannelIds {
Expand All @@ -113,6 +111,27 @@ struct SpillPartitionChannelIds {
}
};

struct ShufflePModChannelIds {
template <typename HashValueType>
HashValueType operator()(HashValueType l, int32_t r) {
return (l % r + r) % r;
}
};

template <typename ChannelIds>
class Crc32HashPartitioner final : public Partitioner<uint32_t, ChannelIds> {
public:
using Base = Partitioner<uint32_t, ChannelIds>;
Crc32HashPartitioner(int partition_count)
: Partitioner<uint32_t, ChannelIds>(partition_count) {}
~Crc32HashPartitioner() override = default;

Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override;

private:
void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override;
};

template <typename ChannelIds>
class Murmur32HashPartitioner final : public Partitioner<int32_t, ChannelIds> {
public:
Expand All @@ -123,10 +142,9 @@ class Murmur32HashPartitioner final : public Partitioner<int32_t, ChannelIds> {

Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override;

int32_t _get_default_seed() const;

private:
void _do_hash(const ColumnPtr& column, int32_t* __restrict result, int idx) const override;
int32_t _get_default_seed() const override;
};

} // namespace vectorized
Expand Down
7 changes: 0 additions & 7 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ class BlockSerializer {
const int _batch_size;
};

struct ShufflePModChannelIds {
template <typename HashValueType>
HashValueType operator()(HashValueType l, int32_t r) {
return (l % r + r) % r;
}
};

class Channel {
public:
friend class pipeline::ExchangeSinkBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.IndexedPriorityQueue;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ResettableRandomizedIterator;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ConsistentHash;
Expand Down Expand Up @@ -217,15 +218,16 @@ public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) {
this.enableSplitsRedistribution = enableSplitsRedistribution;
}

public Multimap<Backend, Split> computeBucketAwareScanRangeAssignmentWith(List<Split> splits) throws UserException {
ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
public Multimap<Pair<Backend, Integer>, Split> computeBucketAwareScanRangeAssignmentWith(List<Split> splits)
throws UserException {
ListMultimap<Pair<Backend, Integer>, Split> assignment = ArrayListMultimap.create();
int bucketNum = 0;
for (Split split : splits) {
FileSplit fileSplit = (FileSplit) split;
bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt();
bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getPath().getName()).getAsInt();

List<Backend> candidateNodes = consistentBucketHash.getNode(bucketNum, 1);
assignment.put(candidateNodes.get(0), split);
assignment.put(Pair.of(candidateNodes.get(0), bucketNum), split);
}

return assignment;
Expand Down
Loading

0 comments on commit c76ddc5

Please sign in to comment.