From 53f2df9aa8199182c7d60b937d5a776030dc5960 Mon Sep 17 00:00:00 2001 From: ienkovich Date: Thu, 22 Aug 2019 08:34:12 -0500 Subject: [PATCH] Add row payload type to join hash table --- MapDServer.cpp | 7 + QueryEngine/BaselineJoinHashTable.cpp | 2 +- QueryEngine/CgenState.h | 1 + QueryEngine/ColumnIR.cpp | 15 ++ QueryEngine/CompilationOptions.h | 1 + QueryEngine/Execute.cpp | 6 +- QueryEngine/Execute.h | 19 +- QueryEngine/GroupByRuntime.cpp | 190 ++++++++++++++ QueryEngine/HashJoinRuntime.cpp | 267 +++++++++++++------- QueryEngine/HashJoinRuntime.h | 17 ++ QueryEngine/HashJoinRuntimeGpu.cu | 23 +- QueryEngine/IRCodegen.cpp | 55 +++- QueryEngine/JoinHashImpl.h | 48 ++++ QueryEngine/JoinHashTable.cpp | 293 ++++++++++++++++++---- QueryEngine/JoinHashTable.h | 88 +++++-- QueryEngine/JoinHashTableInterface.h | 26 ++ QueryEngine/LoopControlFlow/JoinLoop.cpp | 14 +- QueryEngine/LoopControlFlow/JoinLoop.h | 6 + QueryEngine/OverlapsJoinHashTable.cpp | 2 +- QueryEngine/RelAlgExecutor.cpp | 10 +- QueryEngine/ResultSetReductionCodegen.cpp | 1 + QueryEngine/RuntimeFunctions.h | 29 +++ ThriftHandler/MapDHandler.cpp | 8 +- ThriftHandler/MapDHandler.h | 2 + 24 files changed, 954 insertions(+), 176 deletions(-) diff --git a/MapDServer.cpp b/MapDServer.cpp index 21e5c32cd0..8bb1e56349 100644 --- a/MapDServer.cpp +++ b/MapDServer.cpp @@ -274,6 +274,7 @@ class MapDProgramOptions { bool allow_multifrag = true; bool read_only = false; bool allow_loop_joins = false; + bool join_hash_row_payload = false; bool enable_legacy_syntax = true; AuthMetadata authMetadata; @@ -361,6 +362,11 @@ void MapDProgramOptions::fillOptions() { ->default_value(allow_loop_joins) ->implicit_value(true), "Enable loop joins."); + help_desc.add_options()("enable-join-hash-row-payload", + po::value(&join_hash_row_payload) + ->default_value(join_hash_row_payload) + ->implicit_value(true), + "Enable row payload in join hash indexes"); help_desc.add_options()("bigint-count", po::value(&g_bigint_count) ->default_value(g_bigint_count) @@ -1054,6 +1060,7 @@ int startMapdServer(MapDProgramOptions& prog_config_opts, bool start_http_server prog_config_opts.intel_jit_profile, prog_config_opts.read_only, prog_config_opts.allow_loop_joins, + prog_config_opts.join_hash_row_payload, prog_config_opts.enable_rendering, prog_config_opts.enable_auto_clear_render_mem, prog_config_opts.render_oom_retry_threshold, diff --git a/QueryEngine/BaselineJoinHashTable.cpp b/QueryEngine/BaselineJoinHashTable.cpp index a695f0be20..9bd581a5bf 100644 --- a/QueryEngine/BaselineJoinHashTable.cpp +++ b/QueryEngine/BaselineJoinHashTable.cpp @@ -846,7 +846,7 @@ int BaselineJoinHashTable::initHashTableOnCpu( if (layout == JoinHashTableInterface::HashType::OneToMany) { auto one_to_many_buff = reinterpret_cast(&(*cpu_hash_table_buff_)[0] + entry_count_ * entry_size); - init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1); + init_hash_join_buff(one_to_many_buff, entry_count_, 1, -1, 0, 1); switch (key_component_width) { case 4: { const auto composite_key_dict = diff --git a/QueryEngine/CgenState.h b/QueryEngine/CgenState.h index b3d224e1cf..b44798ee63 100644 --- a/QueryEngine/CgenState.h +++ b/QueryEngine/CgenState.h @@ -279,6 +279,7 @@ struct CgenState { const bool contains_left_deep_outer_join_; std::vector outer_join_match_found_per_level_; std::unordered_map scan_idx_to_hash_pos_; + std::unordered_map hashed_cols_; std::vector> in_values_bitmaps_; const std::vector& query_infos_; bool needs_error_check_; diff --git a/QueryEngine/ColumnIR.cpp b/QueryEngine/ColumnIR.cpp index 5bce093111..8534909ff3 100644 --- a/QueryEngine/ColumnIR.cpp +++ b/QueryEngine/ColumnIR.cpp @@ -173,6 +173,21 @@ std::vector CodeGenerator::codegenColVar(const Analyzer::ColumnVar } return codegen(hash_join_lhs.get(), fetch_column, co); } + + if (!plan_state_->isLazyFetchColumn(col_var)) { + InputColDescriptor col{ + col_var->get_column_id(), col_var->get_table_id(), col_var->get_rte_idx()}; + if (cgen_state_->hashed_cols_.count(col)) { + auto val = codegenFixedLengthColVar(col_var, + cgen_state_->hashed_cols_.at(col), + ll_int((int64_t)0, cgen_state_->context_)); + auto it_ok = cgen_state_->fetch_cache_.insert( + std::make_pair(local_col_id, std::vector{val})); + CHECK(it_ok.second); + return {it_ok.first->second}; + } + } + auto pos_arg = posArg(col_var); if (window_func_context) { pos_arg = codegenWindowPosition(window_func_context, pos_arg); diff --git a/QueryEngine/CompilationOptions.h b/QueryEngine/CompilationOptions.h index 8d49453328..d4a891e7a7 100644 --- a/QueryEngine/CompilationOptions.h +++ b/QueryEngine/CompilationOptions.h @@ -48,6 +48,7 @@ struct ExecutionOptions { const bool find_push_down_candidates; const bool just_calcite_explain; const double gpu_input_mem_limit_percent; // punt to CPU if input memory exceeds this + const bool join_hash_row_payload; }; #endif // QUERYENGINE_COMPILATIONOPTIONS_H diff --git a/QueryEngine/Execute.cpp b/QueryEngine/Execute.cpp index 660f83846f..dc0c154633 100644 --- a/QueryEngine/Execute.cpp +++ b/QueryEngine/Execute.cpp @@ -2896,7 +2896,8 @@ Executor::JoinHashTableOrError Executor::buildHashTableForQualifier( const std::vector& query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, - ColumnCacheMap& column_cache) { + ColumnCacheMap& column_cache, + InputColDescriptorsByScanIdx& payload_cols) { std::shared_ptr join_hash_table; const int device_count = deviceCountForMemoryLevel(memory_level); CHECK_GT(device_count, 0); @@ -2924,7 +2925,8 @@ Executor::JoinHashTableOrError Executor::buildHashTableForQualifier( preferred_hash_type, device_count, column_cache, - this); + this, + payload_cols); } catch (TooManyHashEntries&) { const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper); CHECK_EQ(join_quals.size(), size_t(1)); diff --git a/QueryEngine/Execute.h b/QueryEngine/Execute.h index 36a8458b84..4069dc01a2 100644 --- a/QueryEngine/Execute.h +++ b/QueryEngine/Execute.h @@ -149,6 +149,12 @@ inline const ColumnDescriptor* get_column_descriptor( return col_desc; } +inline const ColumnDescriptor* get_column_descriptor( + const InputColDescriptor& col, + const Catalog_Namespace::Catalog& cat) { + return get_column_descriptor(col.getColId(), col.getScanDesc().getTableId(), cat); +} + inline const Analyzer::Expr* extract_cast_arg(const Analyzer::Expr* expr) { const auto cast_expr = dynamic_cast(expr); if (!cast_expr || cast_expr->get_optype() != kCAST) { @@ -176,6 +182,12 @@ inline const ColumnDescriptor* get_column_descriptor_maybe( return table_id > 0 ? get_column_descriptor(col_id, table_id, cat) : nullptr; } +inline const ColumnDescriptor* get_column_descriptor_maybe( + const InputColDescriptor& col, + const Catalog_Namespace::Catalog& cat) { + return get_column_descriptor_maybe(col.getColId(), col.getScanDesc().getTableId(), cat); +} + inline const ResultSetPtr& get_temporary_table(const TemporaryTables* temporary_tables, const int table_id) { CHECK_LT(table_id, 0); @@ -844,12 +856,16 @@ class Executor { std::shared_ptr buildCurrentLevelHashTable( const JoinCondition& current_level_join_conditions, RelAlgExecutionUnit& ra_exe_unit, + InputColDescriptorsByScanIdx& payload_cols, const CompilationOptions& co, const std::vector& query_infos, ColumnCacheMap& column_cache, std::vector& fail_reasons); llvm::Value* addJoinLoopIterator(const std::vector& prev_iters, const size_t level_idx); + void addPayloadColumnIterators(const JoinHashTableInterface& hash_table); + void codegenPayloadColumnIterators(const std::vector& prev_iters, + const JoinHashTableInterface& hash_table); void codegenJoinLoops(const std::vector& join_loops, const RelAlgExecutionUnit& ra_exe_unit, GroupByAndAggregate& group_by_and_aggregate, @@ -880,7 +896,8 @@ class Executor { const std::vector& query_infos, const MemoryLevel memory_level, const JoinHashTableInterface::HashType preferred_hash_type, - ColumnCacheMap& column_cache); + ColumnCacheMap& column_cache, + InputColDescriptorsByScanIdx& payload_cols); void nukeOldState(const bool allow_lazy_fetch, const std::vector& query_infos, const RelAlgExecutionUnit* ra_exe_unit); diff --git a/QueryEngine/GroupByRuntime.cpp b/QueryEngine/GroupByRuntime.cpp index 96139fa475..cc75140685 100644 --- a/QueryEngine/GroupByRuntime.cpp +++ b/QueryEngine/GroupByRuntime.cpp @@ -281,6 +281,26 @@ bucketized_hash_join_idx(int64_t hash_buff, return -1; } +extern "C" ALWAYS_INLINE DEVICE int64_t +bucketized_hash_join_idx_payload(int64_t hash_buff, + int64_t const key, + int64_t const min_key, + int64_t const max_key, + int64_t bucket_normalization, + int64_t entry_size, + int32_t** entry_ptr) { + *entry_ptr = + SUFFIX(get_bucketized_hash_slot_payload)(reinterpret_cast(hash_buff), + key, + min_key, + entry_size, + bucket_normalization); + if (key >= min_key && key <= max_key) { + return **entry_ptr; + } + return -1; +} + extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx(int64_t hash_buff, const int64_t key, const int64_t min_key, @@ -291,6 +311,20 @@ extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx(int64_t hash_buff, return -1; } +extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const int64_t entry_size, + int32_t** entry_ptr) { + *entry_ptr = SUFFIX(get_hash_slot_payload)( + reinterpret_cast(hash_buff), key, min_key, entry_size); + if (key >= min_key && key <= max_key) { + return **entry_ptr; + } + return -1; +} + extern "C" ALWAYS_INLINE DEVICE int64_t bucketized_hash_join_idx_nullable(int64_t hash_buff, const int64_t key, @@ -303,6 +337,25 @@ bucketized_hash_join_idx_nullable(int64_t hash_buff, : -1; } +extern "C" ALWAYS_INLINE DEVICE int64_t +bucketized_hash_join_idx_nullable_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const int64_t null_val, + const int64_t bucket_normalization, + const int64_t entry_size, + int32_t** entry_ptr) { + return key != null_val ? bucketized_hash_join_idx_payload(hash_buff, + key, + min_key, + max_key, + bucket_normalization, + entry_size, + entry_ptr) + : -1; +} + extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx_nullable(int64_t hash_buff, const int64_t key, const int64_t min_key, @@ -311,6 +364,19 @@ extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx_nullable(int64_t hash_buff return key != null_val ? hash_join_idx(hash_buff, key, min_key, max_key) : -1; } +extern "C" ALWAYS_INLINE DEVICE int64_t +hash_join_idx_nullable_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const int64_t null_val, + const int64_t entry_size, + int32_t** entry_ptr) { + return key != null_val ? hash_join_idx_payload( + hash_buff, key, min_key, max_key, entry_size, entry_ptr) + : -1; +} + extern "C" ALWAYS_INLINE DEVICE int64_t bucketized_hash_join_idx_bitwise(int64_t hash_buff, const int64_t key, @@ -328,6 +394,32 @@ bucketized_hash_join_idx_bitwise(int64_t hash_buff, bucket_normalization); } +extern "C" ALWAYS_INLINE DEVICE int64_t +bucketized_hash_join_idx_bitwise_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const int64_t null_val, + const int64_t translated_val, + const int64_t bucket_normalization, + const int64_t entry_size, + int32_t** entry_ptr) { + return key != null_val ? bucketized_hash_join_idx_payload(hash_buff, + key, + min_key, + max_key, + bucket_normalization, + entry_size, + entry_ptr) + : bucketized_hash_join_idx_payload(hash_buff, + translated_val, + min_key, + translated_val, + bucket_normalization, + entry_size, + entry_ptr); +} + extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx_bitwise(int64_t hash_buff, const int64_t key, @@ -340,6 +432,25 @@ hash_join_idx_bitwise(int64_t hash_buff, : hash_join_idx(hash_buff, translated_val, min_key, translated_val); } +extern "C" ALWAYS_INLINE DEVICE int64_t +hash_join_idx_bitwise_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const int64_t null_val, + const int64_t translated_val, + const int64_t entry_size, + int32_t** entry_ptr) { + return key != null_val ? hash_join_idx_payload( + hash_buff, key, min_key, max_key, entry_size, entry_ptr) + : hash_join_idx_payload(hash_buff, + translated_val, + min_key, + translated_val, + entry_size, + entry_ptr); +} + extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx_sharded(int64_t hash_buff, const int64_t key, @@ -359,6 +470,30 @@ hash_join_idx_sharded(int64_t hash_buff, return -1; } +extern "C" ALWAYS_INLINE DEVICE int64_t +hash_join_idx_sharded_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const uint32_t entry_count_per_shard, + const uint32_t num_shards, + const uint32_t device_count, + const int64_t entry_size, + int32_t** entry_ptr) { + *entry_ptr = + SUFFIX(get_hash_slot_sharded_payload)(reinterpret_cast(hash_buff), + key, + min_key, + entry_size, + entry_count_per_shard, + num_shards, + device_count); + if (key >= min_key && key <= max_key) { + return **entry_ptr; + } + return -1; +} + extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx_sharded_nullable(int64_t hash_buff, const int64_t key, @@ -378,6 +513,29 @@ hash_join_idx_sharded_nullable(int64_t hash_buff, : -1; } +extern "C" ALWAYS_INLINE DEVICE int64_t +hash_join_idx_sharded_nullable_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const uint32_t entry_count_per_shard, + const uint32_t num_shards, + const uint32_t device_count, + const int64_t null_val, + const int64_t entry_size, + int32_t** entry_ptr) { + return key != null_val ? hash_join_idx_sharded_payload(hash_buff, + key, + min_key, + max_key, + entry_count_per_shard, + num_shards, + device_count, + entry_size, + entry_ptr) + : -1; +} + extern "C" ALWAYS_INLINE DEVICE int64_t hash_join_idx_bitwise_sharded(int64_t hash_buff, const int64_t key, @@ -404,6 +562,38 @@ hash_join_idx_bitwise_sharded(int64_t hash_buff, device_count); } +extern "C" ALWAYS_INLINE DEVICE int64_t +hash_join_idx_bitwise_sharded_payload(int64_t hash_buff, + const int64_t key, + const int64_t min_key, + const int64_t max_key, + const uint32_t entry_count_per_shard, + const uint32_t num_shards, + const uint32_t device_count, + const int64_t null_val, + const int64_t translated_val, + const int64_t entry_size, + int32_t** entry_ptr) { + return key != null_val ? hash_join_idx_sharded_payload(hash_buff, + key, + min_key, + max_key, + entry_count_per_shard, + num_shards, + device_count, + entry_size, + entry_ptr) + : hash_join_idx_sharded_payload(hash_buff, + translated_val, + min_key, + translated_val, + entry_count_per_shard, + num_shards, + device_count, + entry_size, + entry_ptr); +} + #define DEF_TRANSLATE_NULL_KEY(key_type) \ extern "C" NEVER_INLINE DEVICE int64_t translate_null_key_##key_type( \ const key_type key, const key_type null_val, const int64_t translated_val) { \ diff --git a/QueryEngine/HashJoinRuntime.cpp b/QueryEngine/HashJoinRuntime.cpp index fc7b383a08..1494731f74 100644 --- a/QueryEngine/HashJoinRuntime.cpp +++ b/QueryEngine/HashJoinRuntime.cpp @@ -89,17 +89,20 @@ inline int64_t translate_str_id_to_outer_dict(const int64_t elem, DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer, const int32_t hash_entry_count, + const int32_t hash_entry_size, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count) { #ifdef __CUDACC__ int32_t start = threadIdx.x + blockDim.x * blockIdx.x; + int32_t end = hash_entry_count; int32_t step = blockDim.x * gridDim.x; #else - int32_t start = cpu_thread_idx; - int32_t step = cpu_thread_count; + int32_t start = cpu_thread_idx * hash_entry_size; + int32_t step = cpu_thread_count * hash_entry_size; + int32_t end = hash_entry_count * hash_entry_size; #endif - for (int32_t i = start; i < hash_entry_count; i += step) { + for (int32_t i = start; i < end; i += step) { groups_buffer[i] = invalid_slot_val; } } @@ -110,11 +113,34 @@ DEVICE void SUFFIX(init_hash_join_buff)(int32_t* groups_buffer, #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val) #endif +#ifndef __CUDACC__ +DEVICE void store_payload(int32_t* hash_entry, + const PayloadColumn* payload, + const size_t payload_count, + size_t pos) { + for (size_t p = 0; p < payload_count; ++p) { + // TODO: noinline version doesn't seem good to be used here + int64_t val = + fixed_width_int_decode_noinline(payload[p].col_buff, payload[p].elem_sz, pos); + int32_t* pos = hash_entry + payload[p].payload_offs; + if (payload[p].payload_size == 1) { + *pos = (int32_t)val; + } else if (payload[p].payload_size == 2) { + *(int64_t*)pos = val; + } else { + CHECK(false); + } + } +} +#endif + template DEVICE auto fill_hash_join_buff_impl(int32_t* buff, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner_proxy, const void* sd_outer_proxy, const int32_t cpu_thread_idx, @@ -153,27 +179,35 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff, if (mapd_cas(entry_ptr, invalid_slot_val, i) != invalid_slot_val) { return -1; } +#ifndef __CUDACC__ + store_payload(entry_ptr, payload, payload_count, i); +#endif } return 0; }; DEVICE int SUFFIX(fill_hash_join_buff_bucketized)(int32_t* buff, + const size_t entry_size, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner_proxy, const void* sd_outer_proxy, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization) { auto slot_selector = [&](auto elem) { - return SUFFIX(get_bucketized_hash_slot)( - buff, elem, type_info.min_val, bucket_normalization); + return SUFFIX(get_bucketized_hash_slot_payload)( + buff, elem, type_info.min_val, entry_size, bucket_normalization); }; return fill_hash_join_buff_impl(buff, invalid_slot_val, join_column, type_info, + payload, + payload_count, sd_inner_proxy, sd_outer_proxy, cpu_thread_idx, @@ -196,6 +230,8 @@ DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff, invalid_slot_val, join_column, type_info, + nullptr, + 0, sd_inner_proxy, sd_outer_proxy, cpu_thread_idx, @@ -746,20 +782,23 @@ GLOBAL void SUFFIX(count_matches_baseline)(int32_t* count_buff, } template -DEVICE void fill_row_ids_impl(int32_t* buff, - const int32_t hash_entry_count, - const int32_t invalid_slot_val, - const JoinColumn join_column, - const JoinColumnTypeInfo type_info +DEVICE void fill_row_payload_impl(int32_t* buff, + const int32_t hash_entry_count, + const int32_t hash_entry_size, + const int32_t invalid_slot_val, + const JoinColumn join_column, + const JoinColumnTypeInfo type_info #ifndef __CUDACC__ - , - const void* sd_inner_proxy, - const void* sd_outer_proxy, - const int32_t cpu_thread_idx, - const int32_t cpu_thread_count + , + const PayloadColumn* payload, + const size_t payload_count, + const void* sd_inner_proxy, + const void* sd_outer_proxy, + const int32_t cpu_thread_idx, + const int32_t cpu_thread_count #endif - , - SLOT_SELECTOR slot_selector) { + , + SLOT_SELECTOR slot_selector) { int32_t* pos_buff = buff; int32_t* count_buff = buff + hash_entry_count; int32_t* id_buff = count_buff + hash_entry_count; @@ -799,75 +838,90 @@ DEVICE void fill_row_ids_impl(int32_t* buff, #endif const auto bin_idx = pos_ptr - pos_buff; const auto id_buff_idx = mapd_add(count_buff + bin_idx, 1) + *pos_ptr; - id_buff[id_buff_idx] = static_cast(i); + id_buff[id_buff_idx * hash_entry_size] = static_cast(i); +#ifndef __CUDACC__ + store_payload(id_buff + id_buff_idx * hash_entry_size, payload, payload_count, i); +#endif } } -GLOBAL void SUFFIX(fill_row_ids)(int32_t* buff, - const int32_t hash_entry_count, - const int32_t invalid_slot_val, - const JoinColumn join_column, - const JoinColumnTypeInfo type_info +GLOBAL void SUFFIX(fill_row_payload)(int32_t* buff, + const int32_t hash_entry_count, + const int32_t hash_entry_size, + const int32_t invalid_slot_val, + const JoinColumn join_column, + const JoinColumnTypeInfo type_info #ifndef __CUDACC__ - , - const void* sd_inner_proxy, - const void* sd_outer_proxy, - const int32_t cpu_thread_idx, - const int32_t cpu_thread_count + , + const PayloadColumn* payload, + const size_t payload_count, + const void* sd_inner_proxy, + const void* sd_outer_proxy, + const int32_t cpu_thread_idx, + const int32_t cpu_thread_count #endif ) { auto slot_sel = [&type_info](auto pos_buff, auto elem) { return SUFFIX(get_hash_slot)(pos_buff, elem, type_info.min_val); }; - fill_row_ids_impl(buff, - hash_entry_count, - invalid_slot_val, - join_column, - type_info + fill_row_payload_impl(buff, + hash_entry_count, + hash_entry_size, + invalid_slot_val, + join_column, + type_info #ifndef __CUDACC__ - , - sd_inner_proxy, - sd_outer_proxy, - cpu_thread_idx, - cpu_thread_count + , + payload, + payload_count, + sd_inner_proxy, + sd_outer_proxy, + cpu_thread_idx, + cpu_thread_count #endif - , - slot_sel); + , + slot_sel); } -GLOBAL void SUFFIX(fill_row_ids_bucketized)(int32_t* buff, - const int32_t hash_entry_count, - const int32_t invalid_slot_val, - const JoinColumn join_column, - const JoinColumnTypeInfo type_info +GLOBAL void SUFFIX(fill_row_payload_bucketized)(int32_t* buff, + const int32_t hash_entry_count, + const int32_t hash_entry_size, + const int32_t invalid_slot_val, + const JoinColumn join_column, + const JoinColumnTypeInfo type_info #ifndef __CUDACC__ - , - const void* sd_inner_proxy, - const void* sd_outer_proxy, - const int32_t cpu_thread_idx, - const int32_t cpu_thread_count + , + const PayloadColumn* payload, + const size_t payload_count, + const void* sd_inner_proxy, + const void* sd_outer_proxy, + const int32_t cpu_thread_idx, + const int32_t cpu_thread_count #endif - , - const int64_t bucket_normalization) { + , + const int64_t bucket_normalization) { auto slot_sel = [&type_info, bucket_normalization](auto pos_buff, auto elem) { return SUFFIX(get_bucketized_hash_slot)( pos_buff, elem, type_info.min_val, bucket_normalization); }; - fill_row_ids_impl(buff, - hash_entry_count, - invalid_slot_val, - join_column, - type_info + fill_row_payload_impl(buff, + hash_entry_count, + hash_entry_size, + invalid_slot_val, + join_column, + type_info #ifndef __CUDACC__ - , - sd_inner_proxy, - sd_outer_proxy, - cpu_thread_idx, - cpu_thread_count + , + payload, + payload_count, + sd_inner_proxy, + sd_outer_proxy, + cpu_thread_idx, + cpu_thread_count #endif - , - slot_sel); + , + slot_sel); } template @@ -953,20 +1007,23 @@ GLOBAL void SUFFIX(fill_row_ids_sharded)(int32_t* buff, shard_info.device_count); }; - fill_row_ids_impl(buff, - hash_entry_count, - invalid_slot_val, - join_column, - type_info + fill_row_payload_impl(buff, + hash_entry_count, + 1, + invalid_slot_val, + join_column, + type_info #ifndef __CUDACC__ - , - sd_inner_proxy, - sd_outer_proxy, - cpu_thread_idx, - cpu_thread_count + , + nullptr, + 0, + sd_inner_proxy, + sd_outer_proxy, + cpu_thread_idx, + cpu_thread_count #endif - , - slot_sel); + , + slot_sel); } GLOBAL void SUFFIX(fill_row_ids_sharded_bucketized)(int32_t* buff, @@ -995,20 +1052,23 @@ GLOBAL void SUFFIX(fill_row_ids_sharded_bucketized)(int32_t* buff, bucket_normalization); }; - fill_row_ids_impl(buff, - hash_entry_count, - invalid_slot_val, - join_column, - type_info + fill_row_payload_impl(buff, + hash_entry_count, + 1, + invalid_slot_val, + join_column, + type_info #ifndef __CUDACC__ - , - sd_inner_proxy, - sd_outer_proxy, - cpu_thread_idx, - cpu_thread_count + , + nullptr, + 0, + sd_inner_proxy, + sd_outer_proxy, + cpu_thread_idx, + cpu_thread_count #endif - , - slot_sel); + , + slot_sel); } template @@ -1249,9 +1309,12 @@ void inclusive_scan(InputIterator first, template void fill_one_to_many_hash_table_impl(int32_t* buff, const int32_t hash_entry_count, + const int32_t hash_entry_size, const int32_t invalid_slot_val, const JoinColumn& join_column, const JoinColumnTypeInfo& type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner_proxy, const void* sd_outer_proxy, const unsigned cpu_thread_count, @@ -1310,10 +1373,13 @@ void fill_one_to_many_hash_table_impl(int32_t* buff, } void fill_one_to_many_hash_table(int32_t* buff, + const int32_t hash_entry_size, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn& join_column, const JoinColumnTypeInfo& type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner_proxy, const void* sd_outer_proxy, const unsigned cpu_thread_count) { @@ -1335,19 +1401,25 @@ void fill_one_to_many_hash_table(int32_t* buff, cpu_thread_count); }; auto launch_fill_row_ids = [hash_entry_count = hash_entry_info.hash_entry_count, + hash_entry_size, buff, invalid_slot_val, &join_column, &type_info, + payload, + payload_count, sd_inner_proxy, sd_outer_proxy](auto cpu_thread_idx, auto cpu_thread_count) { - SUFFIX(fill_row_ids) + SUFFIX(fill_row_payload) (buff, hash_entry_count, + hash_entry_size, invalid_slot_val, join_column, type_info, + payload, + payload_count, sd_inner_proxy, sd_outer_proxy, cpu_thread_idx, @@ -1356,9 +1428,12 @@ void fill_one_to_many_hash_table(int32_t* buff, fill_one_to_many_hash_table_impl(buff, hash_entry_info.hash_entry_count, + hash_entry_size, invalid_slot_val, join_column, type_info, + payload, + payload_count, sd_inner_proxy, sd_outer_proxy, cpu_thread_count, @@ -1367,10 +1442,13 @@ void fill_one_to_many_hash_table(int32_t* buff, } void fill_one_to_many_hash_table_bucketized(int32_t* buff, + const int32_t hash_entry_size, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn& join_column, const JoinColumnTypeInfo& type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner_proxy, const void* sd_outer_proxy, const unsigned cpu_thread_count) { @@ -1397,19 +1475,25 @@ void fill_one_to_many_hash_table_bucketized(int32_t* buff, }; auto launch_fill_row_ids = [bucket_normalization, hash_entry_count, + hash_entry_size, buff, invalid_slot_val, &join_column, &type_info, + payload, + payload_count, sd_inner_proxy, sd_outer_proxy](auto cpu_thread_idx, auto cpu_thread_count) { - SUFFIX(fill_row_ids_bucketized) + SUFFIX(fill_row_payload_bucketized) (buff, hash_entry_count, + hash_entry_size, invalid_slot_val, join_column, type_info, + payload, + payload_count, sd_inner_proxy, sd_outer_proxy, cpu_thread_idx, @@ -1419,9 +1503,12 @@ void fill_one_to_many_hash_table_bucketized(int32_t* buff, fill_one_to_many_hash_table_impl(buff, hash_entry_count, + hash_entry_size, invalid_slot_val, join_column, type_info, + payload, + payload_count, sd_inner_proxy, sd_outer_proxy, cpu_thread_count, diff --git a/QueryEngine/HashJoinRuntime.h b/QueryEngine/HashJoinRuntime.h index a092962427..2a96c7dd28 100644 --- a/QueryEngine/HashJoinRuntime.h +++ b/QueryEngine/HashJoinRuntime.h @@ -54,6 +54,7 @@ const size_t g_maximum_conditions_to_coalesce{8}; void init_hash_join_buff(int32_t* buff, const int32_t entry_count, + const int32_t entry_size, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count); @@ -113,6 +114,13 @@ struct JoinColumnTypeInfo { const ColumnType column_type; }; +struct PayloadColumn { + const int8_t* col_buff; + const size_t elem_sz; + const size_t payload_offs; + const size_t payload_size; +}; + inline ColumnType get_join_column_type_kind(const SQLTypeInfo& ti) { if (ti.is_date_in_days()) { return SmallDate; @@ -127,9 +135,12 @@ struct JoinBucketInfo { }; int fill_hash_join_buff_bucketized(int32_t* buff, + const size_t entry_size, const int32_t invalid_slot_val, const JoinColumn join_column, const JoinColumnTypeInfo type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner, const void* sd_outer, const int32_t cpu_thread_idx, @@ -188,19 +199,25 @@ void fill_hash_join_buff_on_device_sharded_bucketized(int32_t* buff, const int64_t bucket_normalization); void fill_one_to_many_hash_table(int32_t* buff, + const int32_t hash_entry_size, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn& join_column, const JoinColumnTypeInfo& type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner_proxy, const void* sd_outer_proxy, const unsigned cpu_thread_count); void fill_one_to_many_hash_table_bucketized(int32_t* buff, + const int32_t hash_entry_size, const HashEntryInfo hash_entry_info, const int32_t invalid_slot_val, const JoinColumn& join_column, const JoinColumnTypeInfo& type_info, + const PayloadColumn* payload, + const size_t payload_count, const void* sd_inner_proxy, const void* sd_outer_proxy, const unsigned cpu_thread_count); diff --git a/QueryEngine/HashJoinRuntimeGpu.cu b/QueryEngine/HashJoinRuntimeGpu.cu index 06bc7697ca..062e6c2160 100644 --- a/QueryEngine/HashJoinRuntimeGpu.cu +++ b/QueryEngine/HashJoinRuntimeGpu.cu @@ -36,11 +36,14 @@ __global__ void fill_hash_join_buff_bucketized_wrapper( int* err, const int64_t bucket_normalization) { int partial_err = SUFFIX(fill_hash_join_buff_bucketized)(buff, + 1, invalid_slot_val, join_column, type_info, NULL, NULL, + NULL, + 0, -1, -1, bucket_normalization); @@ -137,7 +140,7 @@ void fill_hash_join_buff_on_device_sharded(int32_t* buff, __global__ void init_hash_join_buff_wrapper(int32_t* buff, const int32_t hash_entry_count, const int32_t invalid_slot_val) { - SUFFIX(init_hash_join_buff)(buff, hash_entry_count, invalid_slot_val, -1, -1); + SUFFIX(init_hash_join_buff)(buff, hash_entry_count, 1, invalid_slot_val, -1, -1); } void init_hash_join_buff_on_device(int32_t* buff, @@ -227,8 +230,8 @@ void fill_one_to_many_hash_table_on_device(int32_t* buff, invalid_slot_val, join_column, type_info] { - SUFFIX(fill_row_ids)<<>>( - buff, hash_entry_count, invalid_slot_val, join_column, type_info); + SUFFIX(fill_row_payload)<<>>( + buff, hash_entry_count, 1, invalid_slot_val, join_column, type_info); }; fill_one_to_many_hash_table_on_device_impl(buff, @@ -271,12 +274,14 @@ void fill_one_to_many_hash_table_on_device_bucketized(int32_t* buff, join_column, type_info, bucket_normalization = hash_entry_info.bucket_normalization] { - SUFFIX(fill_row_ids_bucketized)<<>>(buff, - hash_entry_count, - invalid_slot_val, - join_column, - type_info, - bucket_normalization); + SUFFIX(fill_row_payload_bucketized)<<>>( + buff, + hash_entry_count, + 1, + invalid_slot_val, + join_column, + type_info, + bucket_normalization); }; fill_one_to_many_hash_table_on_device_impl(buff, diff --git a/QueryEngine/IRCodegen.cpp b/QueryEngine/IRCodegen.cpp index edc0d15c82..35829e5659 100644 --- a/QueryEngine/IRCodegen.cpp +++ b/QueryEngine/IRCodegen.cpp @@ -223,6 +223,18 @@ std::vector Executor::buildJoinLoops( ColumnCacheMap& column_cache) { INJECT_TIMER(buildJoinLoops); std::vector join_loops; + InputColDescriptorsByScanIdx payload_cols; + auto& cat = *getCatalog(); + if (eo.join_hash_row_payload && co.device_type_ == ExecutorDeviceType::CPU) { + for (auto& col : ra_exe_unit.input_col_descs) { + auto* desc = get_column_descriptor_maybe(*col, cat); + if (desc && !desc->isSystemCol && !desc->isVirtualCol && !desc->isDeletedCol && + !desc->columnType.get_physical_cols() && !desc->columnType.is_varlen() && + (desc->columnType.get_size() <= 8)) { + payload_cols[col->getScanDesc().getNestLevel()].push_back(col); + } + } + } for (size_t level_idx = 0, current_hash_table_idx = 0; level_idx < ra_exe_unit.join_quals.size(); ++level_idx) { @@ -231,6 +243,7 @@ std::vector Executor::buildJoinLoops( const auto current_level_hash_table = buildCurrentLevelHashTable(current_level_join_conditions, ra_exe_unit, + payload_cols, co, query_infos, column_cache, @@ -254,8 +267,13 @@ std::vector Executor::buildJoinLoops( JoinLoopDomain domain{{0}}; domain.slot_lookup_result = current_level_hash_table->codegenSlot(co, current_hash_table_idx); + domain.entry_size = current_level_hash_table->getPayloadSize(); return domain; }, + [this, + current_level_hash_table](const std::vector& prev_iters) { + addPayloadColumnIterators(*current_level_hash_table); + }, nullptr, current_level_join_conditions.type == JoinType::LEFT ? std::function(found_outer_join_matches_cb) @@ -273,8 +291,13 @@ std::vector Executor::buildJoinLoops( co, current_hash_table_idx); domain.values_buffer = matching_set.elements; domain.element_count = matching_set.count; + domain.entry_size = current_level_hash_table->getPayloadSize(); return domain; }, + [this, + current_level_hash_table](const std::vector& prev_iters) { + codegenPayloadColumnIterators(prev_iters, *current_level_hash_table); + }, nullptr, current_level_join_conditions.type == JoinType::LEFT ? std::function(found_outer_join_matches_cb) @@ -323,6 +346,7 @@ std::vector Executor::buildJoinLoops( "num_rows_per_scan"); return domain; }, + nullptr, current_level_join_conditions.type == JoinType::LEFT ? std::function&)>( outer_join_condition_cb) @@ -396,6 +420,7 @@ Executor::buildIsDeletedCb(const RelAlgExecutionUnit& ra_exe_unit, std::shared_ptr Executor::buildCurrentLevelHashTable( const JoinCondition& current_level_join_conditions, RelAlgExecutionUnit& ra_exe_unit, + InputColDescriptorsByScanIdx& payload_cols, const CompilationOptions& co, const std::vector& query_infos, ColumnCacheMap& column_cache, @@ -423,7 +448,8 @@ std::shared_ptr Executor::buildCurrentLevelHashTable( co.device_type_ == ExecutorDeviceType::GPU ? MemoryLevel::GPU_LEVEL : MemoryLevel::CPU_LEVEL, JoinHashTableInterface::HashType::OneToOne, - column_cache); + column_cache, + payload_cols); current_level_hash_table = hash_table_or_error.hash_table; } if (hash_table_or_error.hash_table) { @@ -456,6 +482,33 @@ llvm::Value* Executor::addJoinLoopIterator(const std::vector& prev return matching_row_index; } +void Executor::addPayloadColumnIterators(const JoinHashTableInterface& hash_table) { + if (hash_table.getPayloadType() != JoinHashTableInterface::PayloadType::RowId) { + for (auto& col : hash_table.getPayloadColumns()) { + auto ptr = hash_table.getPayloadColumnPtr(*col); + CHECK(ptr); + cgen_state_->hashed_cols_[*col] = ptr; + } + } +} + +void Executor::codegenPayloadColumnIterators(const std::vector& prev_iters, + const JoinHashTableInterface& hash_table) { + if (hash_table.getPayloadType() == JoinHashTableInterface::PayloadType::RowId) { + return; + } + + CHECK(!prev_iters.empty()); + auto payload_ptr = prev_iters.back(); + auto ptr_ty = cgen_state_->ir_builder_.getInt8PtrTy(); + for (auto& col : hash_table.getPayloadColumns()) { + auto offs = ll_int(hash_table.getPayloadColumnOffset(*col), cgen_state_->context_); + auto val_ptr = cgen_state_->ir_builder_.CreateGEP(payload_ptr, {offs}); + val_ptr = cgen_state_->ir_builder_.CreateBitCast(val_ptr, ptr_ty); + cgen_state_->hashed_cols_[*col] = val_ptr; + } +} + void Executor::codegenJoinLoops(const std::vector& join_loops, const RelAlgExecutionUnit& ra_exe_unit, GroupByAndAggregate& group_by_and_aggregate, diff --git a/QueryEngine/JoinHashImpl.h b/QueryEngine/JoinHashImpl.h index e5f75d3c42..38d9f0c677 100644 --- a/QueryEngine/JoinHashImpl.h +++ b/QueryEngine/JoinHashImpl.h @@ -36,12 +36,29 @@ extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_bucketized_hash_slot)( return buff + (key - min_key) / bucket_normalization; } +extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_bucketized_hash_slot_payload)( + int32_t* buff, + const int64_t key, + const int64_t min_key, + const size_t entry_size, + const int64_t bucket_normalization) { + return buff + (key - min_key) / bucket_normalization * entry_size; +} + extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_hash_slot)(int32_t* buff, const int64_t key, const int64_t min_key) { return buff + (key - min_key); } +extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_hash_slot_payload)( + int32_t* buff, + const int64_t key, + const int64_t min_key, + const size_t entry_size) { + return buff + (key - min_key) * entry_size; +} + extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_bucketized_hash_slot_sharded)( int32_t* buff, const int64_t key, @@ -57,6 +74,22 @@ extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_bucketized_hash_slot_sharded return shard_buffer + (key - min_key) / bucket_normalization / num_shards; } +extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_bucketized_hash_slot_sharded_payload)( + int32_t* buff, + const int64_t key, + const int64_t min_key, + const int64_t entry_size, + const uint32_t entry_count_per_shard, + const uint32_t num_shards, + const uint32_t device_count, + const int64_t bucket_normalization) { + const uint32_t shard = SHARD_FOR_KEY(key, num_shards); + const uint32_t shard_buffer_index = + shard / device_count; // shard sub-buffer index within `buff` + int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard * entry_size; + return shard_buffer + (key - min_key) / bucket_normalization / num_shards * entry_size; +} + extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_hash_slot_sharded)( int32_t* buff, const int64_t key, @@ -71,4 +104,19 @@ extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_hash_slot_sharded)( return shard_buffer + (key - min_key) / num_shards; } +extern "C" ALWAYS_INLINE DEVICE int32_t* SUFFIX(get_hash_slot_sharded_payload)( + int32_t* buff, + const int64_t key, + const int64_t min_key, + const int64_t entry_size, + const uint32_t entry_count_per_shard, + const uint32_t num_shards, + const uint32_t device_count) { + const uint32_t shard = SHARD_FOR_KEY(key, num_shards); + const uint32_t shard_buffer_index = + shard / device_count; // shard sub-buffer index within `buff` + int32_t* shard_buffer = buff + shard_buffer_index * entry_count_per_shard * entry_size; + return shard_buffer + (key - min_key) / num_shards * entry_size; +} + #endif // QUERYENGINE_GROUPBYFASTIMPL_H diff --git a/QueryEngine/JoinHashTable.cpp b/QueryEngine/JoinHashTable.cpp index 1c85e930d1..2899e01d58 100644 --- a/QueryEngine/JoinHashTable.cpp +++ b/QueryEngine/JoinHashTable.cpp @@ -19,7 +19,6 @@ #include "ColumnFetcher.h" #include "Execute.h" #include "ExpressionRewrite.h" -#include "HashJoinRuntime.h" #include "RangeTableIndexVisitor.h" #include "RuntimeFunctions.h" #include "Shared/Logger.h" @@ -37,6 +36,8 @@ class NeedsOneToManyHash : public HashJoinFail { } // namespace +const InputColDescriptors JoinHashTableInterface::EMPTY_PAYLOAD = {}; + InnerOuter normalize_column_pair(const Analyzer::Expr* lhs, const Analyzer::Expr* rhs, const Catalog_Namespace::Catalog& cat, @@ -308,7 +309,8 @@ std::shared_ptr JoinHashTable::getInstance( const HashType preferred_hash_type, const int device_count, ColumnCacheMap& column_cache, - Executor* executor) { + Executor* executor, + InputColDescriptorsByScanIdx& payload_cols_candidates) { CHECK(IS_EQUIVALENCE(qual_bin_oper->get_optype())); const auto cols = get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_); @@ -359,9 +361,23 @@ std::shared_ptr JoinHashTable::getInstance( col_range.getIntMax() >= std::numeric_limits::max()) { throw HashJoinFail("Cannot translate null value for kBW_EQ"); } + + // Build a set of columns for table payload. + InputColDescriptors payload_cols; + auto scan_it = payload_cols_candidates.find(inner_col->get_rte_idx()); + if (scan_it != payload_cols_candidates.end()) { + for (auto& col : scan_it->second) { + if (col->getScanDesc().getTableId() == inner_col->get_table_id() && + col->getColId() != inner_col->get_column_id()) { + payload_cols.push_back(col); + } + } + } + auto join_hash_table = std::shared_ptr(new JoinHashTable(qual_bin_oper, inner_col, + payload_cols, query_infos, memory_level, preferred_hash_type, @@ -370,29 +386,35 @@ std::shared_ptr JoinHashTable::getInstance( executor, device_count)); try { - join_hash_table->reify(device_count); - } catch (const TableMustBeReplicated& e) { - // Throw a runtime error to abort the query - join_hash_table->freeHashBufferMemory(); - throw std::runtime_error(e.what()); - } catch (const HashJoinFail& e) { - // HashJoinFail exceptions log an error and trigger a retry with a join loop (if - // possible) - join_hash_table->freeHashBufferMemory(); - throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns " - "involved in equijoin | ") + - e.what()); - } catch (const ColumnarConversionNotSupported& e) { - throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") + - e.what()); - } catch (const OutOfMemory& e) { - throw HashJoinFail( - std::string("Ran out of memory while building hash tables for equijoin | ") + - e.what()); - } catch (const std::exception& e) { - throw std::runtime_error( - std::string("Fatal error while attempting to build hash tables for join: ") + - e.what()); + join_hash_table->tryReify(device_count, true); + if (!payload_cols.empty()) { + for (auto col_it = scan_it->second.begin(); + !payload_cols.empty() && col_it != scan_it->second.end();) { + auto cur = col_it++; + if (*cur == payload_cols.front()) { + payload_cols.pop_front(); + scan_it->second.erase(cur); + } + } + } + } catch (const OutOfMemory&) { + if (payload_cols.empty()) { + throw; + } + + payload_cols.clear(); + join_hash_table = + std::shared_ptr(new JoinHashTable(qual_bin_oper, + inner_col, + payload_cols, + query_infos, + memory_level, + preferred_hash_type, + col_range, + column_cache, + executor, + device_count)); + join_hash_table->tryReify(device_count, false); } return join_hash_table; } @@ -450,17 +472,60 @@ std::shared_ptr JoinHashTable::getSyntheticInstance( executor->setupCaching(phys_inputs, phys_table_ids); + InputColDescriptorsByScanIdx dummy; auto hash_table = JoinHashTable::getInstance(op, query_infos, memory_level, preferred_hash_type, device_count, column_cache, - executor); + executor, + dummy); return hash_table; } +JoinHashTable::JoinHashTable(const std::shared_ptr qual_bin_oper, + const Analyzer::ColumnVar* col_var, + const InputColDescriptors& payload_cols, + const std::vector& query_infos, + const Data_Namespace::MemoryLevel memory_level, + const HashType preferred_hash_type, + const ExpressionRange& col_range, + ColumnCacheMap& column_cache, + Executor* executor, + const int device_count) + : qual_bin_oper_(qual_bin_oper) + , col_var_(std::dynamic_pointer_cast(col_var->deep_copy())) + , payload_cols_(payload_cols) + , query_infos_(query_infos) + , memory_level_(memory_level) + , hash_type_(preferred_hash_type) + , hash_entry_count_(0) + , col_range_(col_range) + , executor_(executor) + , column_cache_(column_cache) + , device_count_(device_count) { + CHECK(col_range.getType() == ExpressionRangeType::Integer); + if (payload_cols_.empty()) { + payload_size_ = 1; + payload_type_ = PayloadType::RowId; + } else { + payload_type_ = PayloadType::RowIdAndRow; + const auto& cat = *executor_->getCatalog(); + + payload_size_ = 1; + for (auto& col : payload_cols_) { + auto* desc = get_column_descriptor(*col, cat); + CHECK(desc); + size_t elem_sz = + (desc->columnType.get_size() + sizeof(int32_t) - 1) / sizeof(int32_t); + payload_col_pos_[*col] = std::make_pair(payload_size_, elem_sz); + payload_size_ += elem_sz; + } + } +} + std::pair JoinHashTable::getOneColumnFragment( const Analyzer::ColumnVar& hash_col, const Fragmenter_Namespace::FragmentInfo& fragment, @@ -482,7 +547,7 @@ std::pair JoinHashTable::getAllColumnFragments( std::vector>& chunks_owner) { std::lock_guard linearized_multifrag_column_lock( linearized_multifrag_column_mutex_); - if (linearized_multifrag_column_.first) { + if (hash_col == *col_var_ && linearized_multifrag_column_.first) { return linearized_multifrag_column_; } const int8_t* col_buff; @@ -490,7 +555,7 @@ std::pair JoinHashTable::getAllColumnFragments( std::tie(col_buff, total_elem_count) = ColumnFetcher::getAllColumnFragments( executor_, hash_col, fragments, chunks_owner, column_cache_); linearized_multifrag_column_owner_.addColBuffer(col_buff); - if (!shardCount()) { + if (hash_col == *col_var_ && !shardCount()) { linearized_multifrag_column_ = {col_buff, total_elem_count}; } return {col_buff, total_elem_count}; @@ -610,6 +675,37 @@ void JoinHashTable::reify(const int device_count) { } } +void JoinHashTable::tryReify(const int device_count, bool throw_out_of_memory) { + try { + reify(device_count); + } catch (const TableMustBeReplicated& e) { + // Throw a runtime error to abort the query + freeHashBufferMemory(); + throw std::runtime_error(e.what()); + } catch (const HashJoinFail& e) { + // HashJoinFail exceptions log an error and trigger a retry with a join loop (if + // possible) + freeHashBufferMemory(); + throw HashJoinFail(std::string("Could not build a 1-to-1 correspondence for columns " + "involved in equijoin | ") + + e.what()); + } catch (const ColumnarConversionNotSupported& e) { + throw HashJoinFail(std::string("Could not build hash tables for equijoin | ") + + e.what()); + } catch (const OutOfMemory& e) { + if (throw_out_of_memory) { + throw; + } + throw HashJoinFail( + std::string("Ran out of memory while building hash tables for equijoin | ") + + e.what()); + } catch (const std::exception& e) { + throw std::runtime_error( + std::string("Fatal error while attempting to build hash tables for join: ") + + e.what()); + } +} + std::pair JoinHashTable::fetchFragments( const Analyzer::ColumnVar* hash_col, const std::deque& fragment_info, @@ -653,6 +749,40 @@ std::pair JoinHashTable::fetchFragments( return {col_buff, elem_count}; } +std::vector JoinHashTable::fetchPayload( + const std::deque& fragments, + const Data_Namespace::MemoryLevel effective_memory_level, + const int device_id, + std::vector>& chunks_owner, + ThrustAllocator& dev_buff_owner) { + std::vector payload; + const auto& cat = *executor_->getCatalog(); + + for (auto& col : payload_cols_) { + auto* desc = get_column_descriptor(*col, cat); + CHECK(desc); + auto var = std::make_shared(desc->columnType, + col->getScanDesc().getTableId(), + col->getColId(), + col->getScanDesc().getNestLevel()); + const int8_t* buff = nullptr; + size_t count = 0; + std::tie(buff, count) = fetchFragments(var.get(), + fragments, + effective_memory_level, + device_id, + chunks_owner, + dev_buff_owner); + CHECK(desc->columnType.get_size() > 0); + payload.push_back(PayloadColumn{buff, + static_cast(desc->columnType.get_size()), + payload_col_pos_.at(*col).first, + payload_col_pos_.at(*col).second}); + } + + return payload; +} + ChunkKey JoinHashTable::genHashTableKey( const std::deque& fragments, const Analyzer::Expr* outer_col_expr, @@ -703,7 +833,7 @@ void JoinHashTable::reifyOneToOneForDevice( // properly. ChunkKey empty_chunk; initHashTableForDevice( - empty_chunk, nullptr, 0, cols, effective_memory_level, device_id); + empty_chunk, nullptr, 0, cols, {}, effective_memory_level, device_id); } std::vector> chunks_owner; @@ -718,10 +848,14 @@ void JoinHashTable::reifyOneToOneForDevice( chunks_owner, dev_buff_owner); + std::vector payload = fetchPayload( + fragments, effective_memory_level, device_id, chunks_owner, dev_buff_owner); + initHashTableForDevice(genHashTableKey(fragments, cols.second, inner_col), col_buff, elem_count, cols, + payload, effective_memory_level, device_id); } @@ -749,7 +883,7 @@ void JoinHashTable::reifyOneToManyForDevice( if (fragments.empty()) { ChunkKey empty_chunk; initOneToManyHashTable( - empty_chunk, nullptr, 0, cols, effective_memory_level, device_id); + empty_chunk, nullptr, 0, cols, {}, effective_memory_level, device_id); return; } @@ -765,10 +899,14 @@ void JoinHashTable::reifyOneToManyForDevice( chunks_owner, dev_buff_owner); + std::vector payload = fetchPayload( + fragments, effective_memory_level, device_id, chunks_owner, dev_buff_owner); + initOneToManyHashTable(genHashTableKey(fragments, cols.second, inner_col), col_buff, elem_count, cols, + payload, effective_memory_level, device_id); } @@ -792,6 +930,7 @@ void JoinHashTable::initHashTableOnCpu( const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val) { const auto inner_col = cols.first; @@ -799,7 +938,7 @@ void JoinHashTable::initHashTableOnCpu( const auto& ti = inner_col->get_type_info(); if (!cpu_hash_table_buff_) { cpu_hash_table_buff_ = std::make_shared>( - hash_entry_info.getNormalizedHashEntryCount()); + hash_entry_info.getNormalizedHashEntryCount() * payload_size_); const StringDictionaryProxy* sd_inner_proxy{nullptr}; const StringDictionaryProxy* sd_outer_proxy{nullptr}; if (ti.is_string()) { @@ -820,6 +959,7 @@ void JoinHashTable::initHashTableOnCpu( [this, hash_entry_info, hash_join_invalid_val, thread_idx, thread_count] { init_hash_join_buff(&(*cpu_hash_table_buff_)[0], hash_entry_info.getNormalizedHashEntryCount(), + payload_size_, hash_join_invalid_val, thread_idx, thread_count); @@ -841,9 +981,11 @@ void JoinHashTable::initHashTableOnCpu( thread_count, &ti, &err, - hash_entry_info] { + hash_entry_info, + &payload] { int partial_err = fill_hash_join_buff_bucketized(&(*cpu_hash_table_buff_)[0], + payload_size_, hash_join_invalid_val, {col_buff, num_elements}, {static_cast(ti.get_size()), @@ -853,6 +995,8 @@ void JoinHashTable::initHashTableOnCpu( isBitwiseEq(), col_range_.getIntMax() + 1, get_join_column_type_kind(ti)}, + payload.data(), + payload.size(), sd_inner_proxy, sd_outer_proxy, thread_idx, @@ -869,11 +1013,6 @@ void JoinHashTable::initHashTableOnCpu( // Too many hash entries, need to retry with a 1:many table throw NeedsOneToManyHash(); } - } else { - if (cpu_hash_table_buff_->size() > hash_entry_info.getNormalizedHashEntryCount()) { - // Too many hash entries, need to retry with a 1:many table - throw NeedsOneToManyHash(); - } } } @@ -881,6 +1020,7 @@ void JoinHashTable::initOneToManyHashTableOnCpu( const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val) { const auto inner_col = cols.first; @@ -890,7 +1030,7 @@ void JoinHashTable::initOneToManyHashTableOnCpu( return; } cpu_hash_table_buff_ = std::make_shared>( - 2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements); + 2 * hash_entry_info.getNormalizedHashEntryCount() + num_elements * payload_size_); const StringDictionaryProxy* sd_inner_proxy{nullptr}; const StringDictionaryProxy* sd_outer_proxy{nullptr}; if (ti.is_string()) { @@ -911,6 +1051,7 @@ void JoinHashTable::initOneToManyHashTableOnCpu( init_hash_join_buff, &(*cpu_hash_table_buff_)[0], hash_entry_info.getNormalizedHashEntryCount(), + 1, hash_join_invalid_val, thread_idx, thread_count)); @@ -924,6 +1065,7 @@ void JoinHashTable::initOneToManyHashTableOnCpu( if (ti.get_type() == kDATE) { fill_one_to_many_hash_table_bucketized(&(*cpu_hash_table_buff_)[0], + payload_size_, hash_entry_info, hash_join_invalid_val, {col_buff, num_elements}, @@ -934,11 +1076,14 @@ void JoinHashTable::initOneToManyHashTableOnCpu( isBitwiseEq(), col_range_.getIntMax() + 1, get_join_column_type_kind(ti)}, + payload.data(), + payload.size(), sd_inner_proxy, sd_outer_proxy, thread_count); } else { fill_one_to_many_hash_table(&(*cpu_hash_table_buff_)[0], + payload_size_, hash_entry_info, hash_join_invalid_val, {col_buff, num_elements}, @@ -949,6 +1094,8 @@ void JoinHashTable::initOneToManyHashTableOnCpu( isBitwiseEq(), col_range_.getIntMax() + 1, get_join_column_type_kind(ti)}, + payload.data(), + payload.size(), sd_inner_proxy, sd_outer_proxy, thread_count); @@ -972,6 +1119,7 @@ void JoinHashTable::initHashTableForDevice( const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id) { const auto inner_col = cols.first; @@ -1017,7 +1165,7 @@ void JoinHashTable::initHashTableForDevice( { std::lock_guard cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_); initHashTableOnCpu( - col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val); + col_buff, num_elements, cols, payload, hash_entry_info, hash_join_invalid_val); } if (inner_col->get_table_id() > 0) { putHashTableOnCpuToCache(chunk_key, num_elements, cols); @@ -1109,6 +1257,7 @@ void JoinHashTable::initOneToManyHashTable( const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id) { auto const inner_col = cols.first; @@ -1152,7 +1301,7 @@ void JoinHashTable::initOneToManyHashTable( { std::lock_guard cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_); initOneToManyHashTableOnCpu( - col_buff, num_elements, cols, hash_entry_info, hash_join_invalid_val); + col_buff, num_elements, cols, payload, hash_entry_info, hash_join_invalid_val); } if (inner_col->get_table_id() > 0) { putHashTableOnCpuToCache(chunk_key, num_elements, cols); @@ -1245,10 +1394,18 @@ void JoinHashTable::initHashTableOnCpuFromCache( outer_col ? *outer_col : *cols.first, num_elements, chunk_key, - qual_bin_oper_->get_optype()}; + qual_bin_oper_->get_optype(), + hash_type_, + payload_type_, + payload_cols_}; std::lock_guard join_hash_table_cache_lock(join_hash_table_cache_mutex_); for (const auto& kv : join_hash_table_cache_) { if (kv.first == cache_key) { + if (hash_type_ != kv.first.hash_type) { + CHECK(hash_type_ == HashType::OneToOne && + kv.first.hash_type == HashType::OneToMany); + throw NeedsOneToManyHash(); + } std::lock_guard cpu_hash_table_buff_lock(cpu_hash_table_buff_mutex_); cpu_hash_table_buff_ = kv.second; break; @@ -1266,7 +1423,10 @@ void JoinHashTable::putHashTableOnCpuToCache( outer_col ? *outer_col : *cols.first, num_elements, chunk_key, - qual_bin_oper_->get_optype()}; + qual_bin_oper_->get_optype(), + hash_type_, + payload_type_, + payload_cols_}; std::lock_guard join_hash_table_cache_lock(join_hash_table_cache_mutex_); for (const auto& kv : join_hash_table_cache_) { if (kv.first == cache_key) { @@ -1356,6 +1516,16 @@ std::vector JoinHashTable::getHashJoinArgs(llvm::Value* hash_ptr, executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization)); } + if (getHashType() == JoinHashTableInterface::HashType::OneToOne && + payload_type_ != PayloadType::RowId) { + hash_join_idx_args.emplace_back( + executor_->cgen_state_->llInt((int64_t)payload_size_)); + auto int32ptr_ty = llvm::Type::getInt32PtrTy(executor_->cgen_state_->context_); + auto entry_ptr_ptr = executor_->cgen_state_->ir_builder_.CreateAlloca( + int32ptr_ty, nullptr, "entry_ptr_ptr"); + hash_join_idx_args.emplace_back(entry_ptr_ptr); + } + return hash_join_idx_args; } @@ -1371,6 +1541,7 @@ HashJoinMatchingSet JoinHashTable::codegenMatchingSet(const CompilationOptions& CHECK(pos_ptr); const int shard_count = shardCount(); auto hash_join_idx_args = getHashJoinArgs(pos_ptr, key_col, shard_count, co); + const int64_t sub_buff_size = getComponentBufferSize(); const auto& key_col_ti = key_col->get_type_info(); @@ -1381,7 +1552,8 @@ HashJoinMatchingSet JoinHashTable::codegenMatchingSet(const CompilationOptions& isBitwiseEq(), sub_buff_size, executor_, - bucketize); + bucketize, + payload_size_); } HashJoinMatchingSet JoinHashTable::codegenMatchingSet( @@ -1391,7 +1563,8 @@ HashJoinMatchingSet JoinHashTable::codegenMatchingSet( const bool is_bw_eq, const int64_t sub_buff_size, Executor* executor, - bool is_bucketized) { + const bool is_bucketized, + const size_t payload_size) { using namespace std::string_literals; std::string fname(is_bucketized ? "bucketized_hash_join_idx"s : "hash_join_idx"s); @@ -1427,11 +1600,18 @@ HashJoinMatchingSet JoinHashTable::codegenMatchingSet( executor->cgen_state_->ir_builder_.CreateAdd( pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)), llvm::Type::getInt32PtrTy(executor->cgen_state_->context_)); + auto rowid_offs = executor->cgen_state_->ir_builder_.CreateMul( + slot_lv, ll_int(payload_size, executor->cgen_state_->context_)); auto rowid_ptr_i32 = - executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv); + executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, rowid_offs); return {rowid_ptr_i32, row_count_lv, slot_lv}; } +size_t JoinHashTable::getPayloadColumnOffset(const InputColDescriptor& col) const + noexcept { + return payload_col_pos_.at(col).first; +} + size_t JoinHashTable::offsetBufferOff() const noexcept { CHECK(hash_type_ == JoinHashTableInterface::HashType::OneToMany); return 0; @@ -1580,7 +1760,28 @@ llvm::Value* JoinHashTable::codegenSlot(const CompilationOptions& co, if (!isBitwiseEq() && !key_col_ti.get_notnull()) { fname += "_nullable"; } - return executor_->cgen_state_->emitCall(fname, hash_join_idx_args); + + if (payload_type_ != PayloadType::RowId) { + fname += "_payload"; + } + + auto* res = executor_->cgen_state_->emitCall(fname, hash_join_idx_args); + + if (payload_type_ != PayloadType::RowId) { + auto int8ptr_ty = llvm::Type::getInt8PtrTy(executor_->cgen_state_->context_); + auto int32ptr_ty = llvm::Type::getInt32PtrTy(executor_->cgen_state_->context_); + auto payload_ptr = executor_->cgen_state_->ir_builder_.CreateLoad( + int32ptr_ty, hash_join_idx_args.back()); + + for (auto& pr : payload_col_pos_) { + auto offs = ll_int(pr.second.first, executor_->cgen_state_->context_); + auto val = executor_->cgen_state_->ir_builder_.CreateGEP(payload_ptr, {offs}); + val = executor_->cgen_state_->ir_builder_.CreateBitCast(val, int8ptr_ty); + payload_ptrs_[pr.first] = val; + } + } + + return res; } const InputTableInfo& JoinHashTable::getInnerQueryInfo( diff --git a/QueryEngine/JoinHashTable.h b/QueryEngine/JoinHashTable.h index f5a587896e..38f067584f 100644 --- a/QueryEngine/JoinHashTable.h +++ b/QueryEngine/JoinHashTable.h @@ -33,6 +33,7 @@ #include "Descriptors/InputDescriptors.h" #include "Descriptors/RowSetMemoryOwner.h" #include "ExpressionRange.h" +#include "HashJoinRuntime.h" #include "InputMetadata.h" #include "JoinHashTableInterface.h" @@ -59,7 +60,8 @@ class JoinHashTable : public JoinHashTableInterface { const HashType preferred_hash_type, const int device_count, ColumnCacheMap& column_cache, - Executor* executor); + Executor* executor, + InputColDescriptorsByScanIdx& payload_cols_candidate); //! Make hash table from named tables and columns (such as for testing). static std::shared_ptr getSyntheticInstance( @@ -102,6 +104,21 @@ class JoinHashTable : public JoinHashTableInterface { HashType getHashType() const noexcept override { return hash_type_; } + PayloadType getPayloadType() const noexcept override { return payload_type_; } + + size_t getPayloadSize() const noexcept override { return payload_size_; } + + const InputColDescriptors& getPayloadColumns() const noexcept override { + return payload_cols_; + } + + size_t getPayloadColumnOffset(const InputColDescriptor& col) const noexcept override; + + llvm::Value* getPayloadColumnPtr(const InputColDescriptor& col) const + noexcept override { + return payload_ptrs_.at(col); + } + size_t offsetBufferOff() const noexcept override; size_t countBufferOff() const noexcept override; @@ -115,7 +132,8 @@ class JoinHashTable : public JoinHashTableInterface { const bool is_bw_eq, const int64_t sub_buff_size, Executor* executor, - const bool is_bucketized = false); + const bool is_bucketized = false, + const size_t payload_size = 1); static llvm::Value* codegenHashTableLoad(const size_t table_idx, Executor* executor); @@ -131,25 +149,14 @@ class JoinHashTable : public JoinHashTableInterface { private: JoinHashTable(const std::shared_ptr qual_bin_oper, const Analyzer::ColumnVar* col_var, + const InputColDescriptors& payload_cols, const std::vector& query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const ExpressionRange& col_range, ColumnCacheMap& column_cache, Executor* executor, - const int device_count) - : qual_bin_oper_(qual_bin_oper) - , col_var_(std::dynamic_pointer_cast(col_var->deep_copy())) - , query_infos_(query_infos) - , memory_level_(memory_level) - , hash_type_(preferred_hash_type) - , hash_entry_count_(0) - , col_range_(col_range) - , executor_(executor) - , column_cache_(column_cache) - , device_count_(device_count) { - CHECK(col_range.getType() == ExpressionRangeType::Integer); - } + const int device_count); std::pair getOneColumnFragment( const Analyzer::ColumnVar& hash_col, @@ -168,6 +175,7 @@ class JoinHashTable : public JoinHashTableInterface { const Analyzer::Expr* outer_col, const Analyzer::ColumnVar* inner_col) const; + void tryReify(const int device_count, bool throw_out_of_memory); void reify(const int device_count); void reifyOneToOneForDevice( const std::deque& fragments, @@ -181,6 +189,7 @@ class JoinHashTable : public JoinHashTableInterface { const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id); void initOneToManyHashTable( @@ -188,6 +197,7 @@ class JoinHashTable : public JoinHashTableInterface { const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id); void initHashTableOnCpuFromCache( @@ -202,12 +212,14 @@ class JoinHashTable : public JoinHashTableInterface { const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val); void initOneToManyHashTableOnCpu( const int8_t* col_buff, const size_t num_elements, const std::pair& cols, + const std::vector& payload, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val); @@ -229,6 +241,12 @@ class JoinHashTable : public JoinHashTableInterface { const int device_id, std::vector>& chunks_owner, ThrustAllocator& dev_buff_owner); + std::vector fetchPayload( + const std::deque& fragments, + const Data_Namespace::MemoryLevel effective_memory_level, + const int device_id, + std::vector>& chunks_owner, + ThrustAllocator& dev_buff_owner); bool isBitwiseEq() const; @@ -238,12 +256,21 @@ class JoinHashTable : public JoinHashTableInterface { size_t getComponentBufferSize() const noexcept; + llvm::Value* codegenPayload(const CompilationOptions& co, const size_t index); + std::shared_ptr qual_bin_oper_; std::shared_ptr col_var_; + InputColDescriptors payload_cols_; + const std::vector& query_infos_; const Data_Namespace::MemoryLevel memory_level_; HashType hash_type_; size_t hash_entry_count_; + PayloadType payload_type_; + size_t payload_size_; + // Payload col -> + std::unordered_map> payload_col_pos_; + std::unordered_map payload_ptrs_; std::shared_ptr> cpu_hash_table_buff_; std::mutex cpu_hash_table_buff_mutex_; #ifdef HAVE_CUDA @@ -265,11 +292,36 @@ class JoinHashTable : public JoinHashTableInterface { const size_t num_elements; const ChunkKey chunk_key; const SQLOps optype; + // Hash type is a part of a key but is not used in + // comparison operator. So we can find required + // hash table in a cache and then fix-up used hash + // type. + const HashType hash_type; + const JoinHashTableInterface::PayloadType payload_type; + // TODO(ilya): We should be able to re-use payload with different + // (extended) set of rows and different rows order. + const InputColDescriptors payload_rows; bool operator==(const struct JoinHashTableCacheKey& that) const { - return col_range == that.col_range && inner_col == that.inner_col && - outer_col == that.outer_col && num_elements == that.num_elements && - chunk_key == that.chunk_key && optype == that.optype; + if (col_range == that.col_range && inner_col == that.inner_col && + outer_col == that.outer_col && num_elements == that.num_elements && + chunk_key == that.chunk_key && optype == that.optype && + payload_type == that.payload_type && + payload_rows.size() == that.payload_rows.size()) { + auto it1 = payload_rows.begin(); + auto it2 = that.payload_rows.begin(); + while (it1 != payload_rows.end()) { + if (!(**it1 == **it2)) + return false; + + ++it1; + ++it2; + } + + return true; + } + + return false; } }; diff --git a/QueryEngine/JoinHashTableInterface.h b/QueryEngine/JoinHashTableInterface.h index b5935f6feb..20bff6f687 100644 --- a/QueryEngine/JoinHashTableInterface.h +++ b/QueryEngine/JoinHashTableInterface.h @@ -22,6 +22,7 @@ #include #include "Analyzer/Analyzer.h" #include "CompilationOptions.h" +#include "Descriptors/InputDescriptors.h" class TooManyHashEntries : public std::runtime_error { public: @@ -72,6 +73,8 @@ struct DecodedJoinHashBufferEntry { }; // struct DecodedJoinHashBufferEntry using InnerOuter = std::pair; +using InputColDescriptors = std::list>; +using InputColDescriptorsByScanIdx = std::unordered_map; class JoinHashTableInterface { public: @@ -108,11 +111,34 @@ class JoinHashTableInterface { virtual HashType getHashType() const noexcept = 0; + enum class PayloadType { RowId, RowIdAndRow, Row }; + + virtual PayloadType getPayloadType() const noexcept { return PayloadType::RowId; } + + virtual size_t getPayloadSize() const noexcept { return 1; } + + virtual const InputColDescriptors& getPayloadColumns() const noexcept { + return EMPTY_PAYLOAD; + } + + virtual size_t getPayloadColumnOffset(const InputColDescriptor& col) const noexcept { + CHECK(false); + return 0; + } + + virtual llvm::Value* getPayloadColumnPtr(const InputColDescriptor& col) const noexcept { + CHECK(false); + return nullptr; + } + virtual size_t offsetBufferOff() const noexcept = 0; virtual size_t countBufferOff() const noexcept = 0; virtual size_t payloadBufferOff() const noexcept = 0; + + private: + static const InputColDescriptors EMPTY_PAYLOAD; }; std::string decodeJoinHashBufferToString( diff --git a/QueryEngine/LoopControlFlow/JoinLoop.cpp b/QueryEngine/LoopControlFlow/JoinLoop.cpp index b1d769da1b..82ab501dcc 100644 --- a/QueryEngine/LoopControlFlow/JoinLoop.cpp +++ b/QueryEngine/LoopControlFlow/JoinLoop.cpp @@ -25,6 +25,8 @@ JoinLoop::JoinLoop(const JoinLoopKind kind, const JoinType type, const std::function&)>& iteration_domain_codegen, + const std::function&)>& + payload_iterators_codegen, const std::function&)>& outer_condition_match, const std::function& found_outer_matches, @@ -34,6 +36,7 @@ JoinLoop::JoinLoop(const JoinLoopKind kind, : kind_(kind) , type_(type) , iteration_domain_codegen_(iteration_domain_codegen) + , payload_iterators_codegen_(payload_iterators_codegen) , outer_condition_match_(outer_condition_match) , found_outer_matches_(found_outer_matches) , is_deleted_(is_deleted) @@ -95,10 +98,14 @@ llvm::BasicBlock* JoinLoop::codegen( auto iteration_val = iteration_counter; CHECK(join_loop.kind_ == JoinLoopKind::Set || !iteration_domain.values_buffer); if (join_loop.kind_ == JoinLoopKind::Set) { - iteration_val = - builder.CreateGEP(iteration_domain.values_buffer, iteration_counter); + auto offs = builder.CreateMul(iteration_counter, + ll_int(iteration_domain.entry_size, context)); + iteration_val = builder.CreateGEP(iteration_domain.values_buffer, offs); } iterators.push_back(iteration_val); + if (join_loop.payload_iterators_codegen_) { + join_loop.payload_iterators_codegen_(iterators); + } const auto have_more_inner_rows = builder.CreateICmpSLT( iteration_counter, join_loop.kind_ == JoinLoopKind::UpperBound ? iteration_domain.upper_bound @@ -162,6 +169,9 @@ llvm::BasicBlock* JoinLoop::codegen( const auto iteration_domain = join_loop.iteration_domain_codegen_(iterators); CHECK(!iteration_domain.values_buffer); iterators.push_back(iteration_domain.slot_lookup_result); + if (join_loop.payload_iterators_codegen_) { + join_loop.payload_iterators_codegen_(iterators); + } auto match_found = builder.CreateICmpSGE(iteration_domain.slot_lookup_result, ll_int(0, context)); if (join_loop.is_deleted_) { diff --git a/QueryEngine/LoopControlFlow/JoinLoop.h b/QueryEngine/LoopControlFlow/JoinLoop.h index 83afc9fdca..cb8a97248c 100644 --- a/QueryEngine/LoopControlFlow/JoinLoop.h +++ b/QueryEngine/LoopControlFlow/JoinLoop.h @@ -38,6 +38,7 @@ enum class JoinLoopKind { // 2. For one-to-one joins, at most one value: `slot_lookup_result` if valid (greater than // or equal to zero). // 3. For one-to-many joins, the `element_count` values in `values_buffer`. +// 4. Payload entry size (number of int32_t elements in payload). struct JoinLoopDomain { union { llvm::Value* upper_bound; // for UpperBound @@ -45,6 +46,7 @@ struct JoinLoopDomain { llvm::Value* slot_lookup_result; // for Singleton }; llvm::Value* values_buffer; // used for Set + size_t entry_size = 1; }; // Any join is logically a loop. Hash joins just limit the domain of iteration, @@ -55,6 +57,7 @@ class JoinLoop { JoinLoop(const JoinLoopKind, const JoinType, const std::function&)>&, + const std::function&)>&, const std::function&)>&, const std::function&, const std::function& prev_iters, @@ -86,6 +89,9 @@ class JoinLoop { // domain of iteration. const std::function&)> iteration_domain_codegen_; + // Callback provided from the executor which generates iterators for all + // columns stored in join hash table as a payload. + const std::function&)> payload_iterators_codegen_; // Callback provided from the executor which generates true iff the outer condition // evaluates to true. const std::function&)> diff --git a/QueryEngine/OverlapsJoinHashTable.cpp b/QueryEngine/OverlapsJoinHashTable.cpp index b3a92efc8a..debc71886c 100644 --- a/QueryEngine/OverlapsJoinHashTable.cpp +++ b/QueryEngine/OverlapsJoinHashTable.cpp @@ -607,7 +607,7 @@ int OverlapsJoinHashTable::initHashTableOnCpu( } auto one_to_many_buff = reinterpret_cast(&(*cpu_hash_table_buff_)[0] + entry_count_ * entry_size); - init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1); + init_hash_join_buff(one_to_many_buff, entry_count_, 1, -1, 0, 1); switch (key_component_width) { case 4: { const auto composite_key_dict = diff --git a/QueryEngine/RelAlgExecutor.cpp b/QueryEngine/RelAlgExecutor.cpp index 6e0618a9d5..77c82dcff3 100644 --- a/QueryEngine/RelAlgExecutor.cpp +++ b/QueryEngine/RelAlgExecutor.cpp @@ -330,7 +330,8 @@ void RelAlgExecutor::executeRelAlgStep(const RaExecutionSequence& seq, eo.dynamic_watchdog_time_limit, eo.find_push_down_candidates, eo.just_calcite_explain, - eo.gpu_input_mem_limit_percent}; + eo.gpu_input_mem_limit_percent, + eo.join_hash_row_payload}; const auto compound = dynamic_cast(body); if (compound) { @@ -1492,12 +1493,14 @@ std::unique_ptr RelAlgExecutor::createWindowFunctionConte const auto memory_level = co.device_type_ == ExecutorDeviceType::GPU ? MemoryLevel::GPU_LEVEL : MemoryLevel::CPU_LEVEL; + InputColDescriptorsByScanIdx dummy; const auto join_table_or_err = executor_->buildHashTableForQualifier(partition_key_cond, query_infos, memory_level, JoinHashTableInterface::HashType::OneToMany, - column_cache_map); + column_cache_map, + dummy); if (!join_table_or_err.fail_reason.empty()) { throw std::runtime_error(join_table_or_err.fail_reason); } @@ -2182,7 +2185,8 @@ ExecutionResult RelAlgExecutor::handleOutOfMemoryRetry( eo.dynamic_watchdog_time_limit, false, false, - eo.gpu_input_mem_limit_percent}; + eo.gpu_input_mem_limit_percent, + eo.join_hash_row_payload}; if (was_multifrag_kernel_launch) { try { diff --git a/QueryEngine/ResultSetReductionCodegen.cpp b/QueryEngine/ResultSetReductionCodegen.cpp index 4d92c2cf4b..f601044328 100644 --- a/QueryEngine/ResultSetReductionCodegen.cpp +++ b/QueryEngine/ResultSetReductionCodegen.cpp @@ -304,6 +304,7 @@ void translate_for(const For* for_loop, nullptr, nullptr, nullptr, + nullptr, "reduction_loop"); const auto bb_loop_body = JoinLoop::codegen( {join_loop}, diff --git a/QueryEngine/RuntimeFunctions.h b/QueryEngine/RuntimeFunctions.h index 75483070b0..80951e8277 100644 --- a/QueryEngine/RuntimeFunctions.h +++ b/QueryEngine/RuntimeFunctions.h @@ -177,10 +177,21 @@ extern "C" int32_t* get_bucketized_hash_slot(int32_t* buff, const int64_t min_key, const int64_t bucket_normalization = 1); +extern "C" int32_t* get_bucketized_hash_slot_payload(int32_t* buff, + const int64_t key, + const int64_t min_key, + const size_t entry_size, + const int64_t bucket_normalization); + extern "C" int32_t* get_hash_slot(int32_t* buff, const int64_t key, const int64_t min_key); +extern "C" int32_t* get_hash_slot_payload(int32_t* buff, + const int64_t key, + const int64_t min_key, + const size_t entry_size); + extern "C" int32_t* get_hash_slot_sharded(int32_t* buff, const int64_t key, const int64_t min_key, @@ -188,6 +199,14 @@ extern "C" int32_t* get_hash_slot_sharded(int32_t* buff, const uint32_t num_shards, const uint32_t device_count); +extern "C" int32_t* get_hash_slot_sharded_payload(int32_t* buff, + const int64_t key, + const int64_t min_key, + const int64_t entry_size, + const uint32_t entry_count_per_shard, + const uint32_t num_shards, + const uint32_t device_count); + extern "C" int32_t* get_bucketized_hash_slot_sharded(int32_t* buff, const int64_t key, const int64_t min_key, @@ -196,6 +215,16 @@ extern "C" int32_t* get_bucketized_hash_slot_sharded(int32_t* buff, const uint32_t device_count, const int64_t bucket_normalization); +extern "C" int32_t* get_bucketized_hash_slot_sharded_payload( + int32_t* buff, + const int64_t key, + const int64_t min_key, + const int64_t entry_size, + const uint32_t entry_count_per_shard, + const uint32_t num_shards, + const uint32_t device_count, + const int64_t bucket_normalization); + extern "C" void linear_probabilistic_count(uint8_t* bitmap, const uint32_t bitmap_bytes, const uint8_t* key_bytes, diff --git a/ThriftHandler/MapDHandler.cpp b/ThriftHandler/MapDHandler.cpp index 12ccf7e2df..1d4f8dad24 100644 --- a/ThriftHandler/MapDHandler.cpp +++ b/ThriftHandler/MapDHandler.cpp @@ -173,6 +173,7 @@ MapDHandler::MapDHandler(const std::vector& db_leaves, const bool intel_jit_profile, const bool read_only, const bool allow_loop_joins, + const bool join_hash_row_payload, const bool enable_rendering, const bool enable_auto_clear_render_mem, const int render_oom_retry_threshold, @@ -198,6 +199,7 @@ MapDHandler::MapDHandler(const std::vector& db_leaves, , allow_multifrag_(allow_multifrag) , read_only_(read_only) , allow_loop_joins_(allow_loop_joins) + , join_hash_row_payload_(join_hash_row_payload) , authMetadata_(authMetadata) , mapd_parameters_(mapd_parameters) , legacy_syntax_(legacy_syntax) @@ -4493,7 +4495,8 @@ std::vector MapDHandler::execute_rel_alg( g_dynamic_watchdog_time_limit, find_push_down_candidates, just_calcite_explain, - mapd_parameters_.gpu_input_mem_limit}; + mapd_parameters_.gpu_input_mem_limit, + join_hash_row_payload_}; auto executor = Executor::getExecutor(cat.getCurrentDB().dbId, jit_debug_ ? "/tmp" : "", jit_debug_ ? "mapdquery" : "", @@ -4554,7 +4557,8 @@ void MapDHandler::execute_rel_alg_df(TDataFrame& _return, g_dynamic_watchdog_time_limit, false, false, - mapd_parameters_.gpu_input_mem_limit}; + mapd_parameters_.gpu_input_mem_limit, + join_hash_row_payload_}; auto executor = Executor::getExecutor(cat.getCurrentDB().dbId, jit_debug_ ? "/tmp" : "", jit_debug_ ? "mapdquery" : "", diff --git a/ThriftHandler/MapDHandler.h b/ThriftHandler/MapDHandler.h index 3d8c2e3f94..4f7e1fe2a2 100644 --- a/ThriftHandler/MapDHandler.h +++ b/ThriftHandler/MapDHandler.h @@ -123,6 +123,7 @@ class MapDHandler : public MapDIf { const bool intel_jit_profile, const bool read_only, const bool allow_loop_joins, + const bool join_hash_row_payload, const bool enable_rendering, const bool enable_auto_clear_render_mem, const int render_oom_retry_threshold, @@ -452,6 +453,7 @@ class MapDHandler : public MapDIf { bool allow_multifrag_; const bool read_only_; const bool allow_loop_joins_; + const bool join_hash_row_payload_; bool cpu_mode_only_; mapd_shared_mutex sessions_mutex_; std::mutex render_mutex_;