Skip to content

Commit

Permalink
[feature](datalake) Add BucketShuffleJoin support for Hive table data…
Browse files Browse the repository at this point in the history
… generated by Spark. (27783)

    1. Original planner updated to consider BucketShuffle for bucketed hive table
    2. Neerids planner updated for bucketShuffle join on hive tables.
    3. Added spark style hash calculation in BE for shuffle on one side.
    4. Added shuffle hash selection based on left(non-shuffling) side.
  • Loading branch information
Nitin-Kashyap committed Dec 13, 2024
1 parent ffa02e3 commit 994d239
Show file tree
Hide file tree
Showing 41 changed files with 1,391 additions and 51 deletions.
8 changes: 8 additions & 0 deletions be/src/util/hash_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,21 @@ class HashUtil {
// refer to https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/digest/MurmurHash3.java
static const uint32_t MURMUR3_32_SEED = 104729;

// refer https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L615
static const uint32_t SPARK_MURMUR_32_SEED = 42;

// modify from https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
static uint32_t murmur_hash3_32(const void* key, int64_t len, uint32_t seed) {
uint32_t out = 0;
murmur_hash3_x86_32(key, len, seed, &out);
return out;
}

static uint32_t murmur_hash3_32_null(uint32_t seed) {
static const int INT_VALUE = 0;
return murmur_hash3_32((const unsigned char*)(&INT_VALUE), 4, seed);
}

static const int MURMUR_R = 47;

// Murmur2 hash implementation returning 64-bit hashes.
Expand Down
27 changes: 27 additions & 0 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ class SipHash;
} \
}

#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \
if (null_data == nullptr) { \
for (size_t i = 0; i < s; i++) { \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
} else { \
for (size_t i = 0; i < s; i++) { \
if (null_data[i] == 0) \
hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \
} \
}

namespace doris::vectorized {

class Arena;
Expand Down Expand Up @@ -398,6 +410,21 @@ class IColumn : public COW<IColumn> {
"Method update_crc_with_value is not supported for " + get_name());
}

/// Update state of murmur3 hash function (spark files) with value of n elements to avoid the virtual
/// function call null_data to mark whether need to do hash compute, null_data == nullptr
/// means all element need to do hash function, else only *null_data != 0 need to do hash func
virtual void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const {
LOG(FATAL) << get_name() << "update_murmurs_with_value not supported";
}

// use range for one hash value to avoid virtual function call in loop
virtual void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
LOG(FATAL) << get_name() << " update_murmur_with_value not supported";
}

/** Removes elements that don't match the filter.
* Is used in WHERE and HAVING operations.
* If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column;
Expand Down
54 changes: 54 additions & 0 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,60 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp
}
}

// for every array row calculate murmurHash
void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
auto& offsets_column = get_offsets();
if (hash == 0) {
hash = HashUtil::SPARK_MURMUR_32_SEED;
}
if (null_data) {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
size_t elem_size = offsets_column[i] - offsets_column[i - 1];
if (elem_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&elem_size),
sizeof(elem_size), hash);
} else {
get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i],
hash, nullptr);
}
}
}
} else {
for (size_t i = start; i < end; ++i) {
size_t elem_size = offsets_column[i] - offsets_column[i - 1];
if (elem_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&elem_size),
sizeof(elem_size), hash);
} else {
get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], hash,
nullptr);
}
}
}
}

void ColumnArray::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
DCHECK(s == size());

if (null_data) {
for (size_t i = 0; i < s; ++i) {
// every row
if (null_data[i] == 0) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
} else {
for (size_t i = 0; i < s; ++i) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
}

void ColumnArray::insert(const Field& x) {
if (x.is_null()) {
get_data().insert(Null());
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;
void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data = nullptr) const override;
Expand All @@ -148,6 +150,10 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void insert_range_from(const IColumn& src, size_t start, size_t length) override;
void insert_range_from_ignore_overflow(const IColumn& src, size_t start,
size_t length) override;
Expand Down
50 changes: 50 additions & 0 deletions be/src/vec/columns/column_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,56 @@ void ColumnDecimal<T>::update_crcs_with_value(uint32_t* __restrict hashes, Primi
}
}

template <typename T>
void ColumnDecimal<T>::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
if (null_data == nullptr) {
for (size_t i = start; i < end; i++) {
if constexpr (!IsDecimalV2<T>) {
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T),
HashUtil::SPARK_MURMUR_32_SEED);
} else {
decimalv2_do_murmur(i, hash);
}
}
} else {
for (size_t i = start; i < end; i++) {
if (null_data[i] == 0) {
if constexpr (!IsDecimalV2<T>) {
hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T),
HashUtil::SPARK_MURMUR_32_SEED);
} else {
decimalv2_do_murmur(i, hash);
}
}
}
}
}

template <typename T>
void ColumnDecimal<T>::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
DCHECK(s == size());

if constexpr (!IsDecimalV2<T>) {
DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED)
} else {
if (null_data == nullptr) {
for (size_t i = 0; i < s; i++) {
decimalv2_do_murmur(i, hashes[i]);
}
} else {
for (size_t i = 0; i < s; i++) {
if (null_data[i] == 0) {
decimalv2_do_murmur(i, hashes[i]);
}
}
}
}
}

template <typename T>
void ColumnDecimal<T>::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const {
Expand Down
14 changes: 13 additions & 1 deletion be/src/vec/columns/column_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,16 @@ class ColumnDecimal final : public COWHelper<IColumn, ColumnDecimal<T>> {
void update_crcs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows,
uint32_t offset,
const uint8_t* __restrict null_data) const override;
void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows,
uint32_t offset,
const uint8_t* __restrict null_data) const override;

void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;
int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
IColumn::Permutation& res) const override;
Expand Down Expand Up @@ -284,6 +288,14 @@ class ColumnDecimal final : public COWHelper<IColumn, ColumnDecimal<T>> {
hash = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), hash);
hash = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), hash);
};

void ALWAYS_INLINE decimalv2_do_murmur(size_t i, int32_t& hash) const {
const auto& dec_val = (const DecimalV2Value&)data[i];
int64_t int_val = dec_val.int_value();
int32_t frac_val = dec_val.frac_value();
hash = HashUtil::murmur_hash3_32(&int_val, sizeof(int_val), hash);
hash = HashUtil::murmur_hash3_32(&frac_val, sizeof(frac_val), hash);
};
};

template <typename>
Expand Down
54 changes: 54 additions & 0 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,40 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash,
}
}

void ColumnMap::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
auto& offsets = get_offsets();
if (hash == 0) {
hash = HashUtil::SPARK_MURMUR_32_SEED;
}
if (null_data) {
for (size_t i = start; i < end; ++i) {
if (null_data[i] == 0) {
size_t kv_size = offsets[i] - offsets[i - 1];
if (kv_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&kv_size),
sizeof(kv_size), hash);
} else {
get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr);
get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash,
nullptr);
}
}
}
} else {
for (size_t i = start; i < end; ++i) {
size_t kv_size = offsets[i] - offsets[i - 1];
if (kv_size == 0) {
hash = HashUtil::murmur_hash3_32(reinterpret_cast<const char*>(&kv_size),
sizeof(kv_size), hash);
} else {
get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr);
get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr);
}
}
}
}

void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const {
size_t s = size();
if (null_data) {
Expand Down Expand Up @@ -378,6 +412,26 @@ void ColumnMap::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType
}
}

void ColumnMap::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type,
int32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
auto s = rows;
DCHECK(s == size());

if (null_data) {
for (size_t i = 0; i < s; ++i) {
// every row
if (null_data[i] == 0) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
} else {
for (size_t i = 0; i < s; ++i) {
update_murmur_with_value(i, i + 1, hash[i], nullptr);
}
}
}

void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) {
if (length == 0) {
return;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;
void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data = nullptr) const override;
Expand All @@ -169,6 +171,10 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

/******************** keys and values ***************/
const ColumnPtr& get_keys_ptr() const { return keys_column; }
ColumnPtr& get_keys_ptr() { return keys_column; }
Expand Down
38 changes: 38 additions & 0 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ void ColumnNullable::update_crc_with_value(size_t start, size_t end, uint32_t& h
}
}

void ColumnNullable::update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const {
if (!has_null()) {
nested_column->update_murmur_with_value(start, end, hash, nullptr);
} else {
const auto* __restrict real_null_data =
assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
hash = HashUtil::SPARK_MURMUR_32_SEED;
for (int i = start; i < end; ++i) {
if (real_null_data[i] != 0) {
hash = HashUtil::murmur_hash3_32_null(hash);
}
}
nested_column->update_murmur_with_value(start, end, hash, real_null_data);
}
}

void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const {
if (is_null_at(n)) {
hash.update(0);
Expand Down Expand Up @@ -116,6 +133,27 @@ void ColumnNullable::update_crcs_with_value(uint32_t* __restrict hashes, doris::
}
}

void ColumnNullable::update_murmurs_with_value(int32_t* __restrict hashes,
doris::PrimitiveType type, int32_t rows,
uint32_t offset,
const uint8_t* __restrict null_data) const {
DCHECK(null_data == nullptr);
auto s = rows;
DCHECK(s == size());
const auto* __restrict real_null_data =
assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
if (!has_null()) {
nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr);
} else {
for (int i = 0; i < s; ++i) {
if (real_null_data[i] != 0) {
hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED);
}
}
nested_column->update_murmurs_with_value(hashes, type, rows, offset, real_null_data);
}
}

void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const {
DCHECK(null_data == nullptr);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,18 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>, public N
const uint8_t* __restrict null_data) const override;
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;
void update_murmur_with_value(size_t start, size_t end, int32_t& hash,
const uint8_t* __restrict null_data) const override;

void update_hash_with_value(size_t n, SipHash& hash) const override;
void update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows,
uint32_t offset,
const uint8_t* __restrict null_data) const override;
void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const override;
void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows,
uint32_t offset,
const uint8_t* __restrict null_data) const override;

ColumnPtr convert_column_if_overflow() override {
nested_column = nested_column->convert_column_if_overflow();
Expand Down
Loading

0 comments on commit 994d239

Please sign in to comment.