Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rebase] Add data columns to join hash table payload #457

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions MapDServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ class MapDProgramOptions {
bool allow_multifrag = true;
bool read_only = false;
bool allow_loop_joins = false;
bool join_hash_row_payload = false;
bool enable_legacy_syntax = true;
AuthMetadata authMetadata;

Expand Down Expand Up @@ -361,6 +362,11 @@ void MapDProgramOptions::fillOptions() {
->default_value(allow_loop_joins)
->implicit_value(true),
"Enable loop joins.");
help_desc.add_options()("enable-join-hash-row-payload",
po::value<bool>(&join_hash_row_payload)
->default_value(join_hash_row_payload)
->implicit_value(true),
"Enable row payload in join hash indexes");
help_desc.add_options()("bigint-count",
po::value<bool>(&g_bigint_count)
->default_value(g_bigint_count)
Expand Down Expand Up @@ -1054,6 +1060,7 @@ int startMapdServer(MapDProgramOptions& prog_config_opts, bool start_http_server
prog_config_opts.intel_jit_profile,
prog_config_opts.read_only,
prog_config_opts.allow_loop_joins,
prog_config_opts.join_hash_row_payload,
prog_config_opts.enable_rendering,
prog_config_opts.enable_auto_clear_render_mem,
prog_config_opts.render_oom_retry_threshold,
Expand Down
2 changes: 1 addition & 1 deletion QueryEngine/BaselineJoinHashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ int BaselineJoinHashTable::initHashTableOnCpu(
if (layout == JoinHashTableInterface::HashType::OneToMany) {
auto one_to_many_buff = reinterpret_cast<int32_t*>(&(*cpu_hash_table_buff_)[0] +
entry_count_ * entry_size);
init_hash_join_buff(one_to_many_buff, entry_count_, -1, 0, 1);
init_hash_join_buff(one_to_many_buff, entry_count_, 1, -1, 0, 1);
switch (key_component_width) {
case 4: {
const auto composite_key_dict =
Expand Down
1 change: 1 addition & 0 deletions QueryEngine/CgenState.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ struct CgenState {
const bool contains_left_deep_outer_join_;
std::vector<llvm::Value*> outer_join_match_found_per_level_;
std::unordered_map<int, llvm::Value*> scan_idx_to_hash_pos_;
std::unordered_map<InputColDescriptor, llvm::Value*> hashed_cols_;
std::vector<std::unique_ptr<const InValuesBitmap>> in_values_bitmaps_;
const std::vector<InputTableInfo>& query_infos_;
bool needs_error_check_;
Expand Down
15 changes: 15 additions & 0 deletions QueryEngine/ColumnIR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ std::vector<llvm::Value*> CodeGenerator::codegenColVar(const Analyzer::ColumnVar
}
return codegen(hash_join_lhs.get(), fetch_column, co);
}

if (!plan_state_->isLazyFetchColumn(col_var)) {
InputColDescriptor col{
col_var->get_column_id(), col_var->get_table_id(), col_var->get_rte_idx()};
if (cgen_state_->hashed_cols_.count(col)) {
auto val = codegenFixedLengthColVar(col_var,
cgen_state_->hashed_cols_.at(col),
ll_int((int64_t)0, cgen_state_->context_));
auto it_ok = cgen_state_->fetch_cache_.insert(
std::make_pair(local_col_id, std::vector<llvm::Value*>{val}));
CHECK(it_ok.second);
return {it_ok.first->second};
}
}

auto pos_arg = posArg(col_var);
if (window_func_context) {
pos_arg = codegenWindowPosition(window_func_context, pos_arg);
Expand Down
1 change: 1 addition & 0 deletions QueryEngine/CompilationOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct ExecutionOptions {
const bool find_push_down_candidates;
const bool just_calcite_explain;
const double gpu_input_mem_limit_percent; // punt to CPU if input memory exceeds this
const bool join_hash_row_payload;
};

#endif // QUERYENGINE_COMPILATIONOPTIONS_H
6 changes: 4 additions & 2 deletions QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2896,7 +2896,8 @@ Executor::JoinHashTableOrError Executor::buildHashTableForQualifier(
const std::vector<InputTableInfo>& query_infos,
const MemoryLevel memory_level,
const JoinHashTableInterface::HashType preferred_hash_type,
ColumnCacheMap& column_cache) {
ColumnCacheMap& column_cache,
InputColDescriptorsByScanIdx& payload_cols) {
std::shared_ptr<JoinHashTableInterface> join_hash_table;
const int device_count = deviceCountForMemoryLevel(memory_level);
CHECK_GT(device_count, 0);
Expand Down Expand Up @@ -2924,7 +2925,8 @@ Executor::JoinHashTableOrError Executor::buildHashTableForQualifier(
preferred_hash_type,
device_count,
column_cache,
this);
this,
payload_cols);
} catch (TooManyHashEntries&) {
const auto join_quals = coalesce_singleton_equi_join(qual_bin_oper);
CHECK_EQ(join_quals.size(), size_t(1));
Expand Down
19 changes: 18 additions & 1 deletion QueryEngine/Execute.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ inline const ColumnDescriptor* get_column_descriptor(
return col_desc;
}

inline const ColumnDescriptor* get_column_descriptor(
const InputColDescriptor& col,
const Catalog_Namespace::Catalog& cat) {
return get_column_descriptor(col.getColId(), col.getScanDesc().getTableId(), cat);
}

inline const Analyzer::Expr* extract_cast_arg(const Analyzer::Expr* expr) {
const auto cast_expr = dynamic_cast<const Analyzer::UOper*>(expr);
if (!cast_expr || cast_expr->get_optype() != kCAST) {
Expand Down Expand Up @@ -176,6 +182,12 @@ inline const ColumnDescriptor* get_column_descriptor_maybe(
return table_id > 0 ? get_column_descriptor(col_id, table_id, cat) : nullptr;
}

inline const ColumnDescriptor* get_column_descriptor_maybe(
const InputColDescriptor& col,
const Catalog_Namespace::Catalog& cat) {
return get_column_descriptor_maybe(col.getColId(), col.getScanDesc().getTableId(), cat);
}

inline const ResultSetPtr& get_temporary_table(const TemporaryTables* temporary_tables,
const int table_id) {
CHECK_LT(table_id, 0);
Expand Down Expand Up @@ -844,12 +856,16 @@ class Executor {
std::shared_ptr<JoinHashTableInterface> buildCurrentLevelHashTable(
const JoinCondition& current_level_join_conditions,
RelAlgExecutionUnit& ra_exe_unit,
InputColDescriptorsByScanIdx& payload_cols,
const CompilationOptions& co,
const std::vector<InputTableInfo>& query_infos,
ColumnCacheMap& column_cache,
std::vector<std::string>& fail_reasons);
llvm::Value* addJoinLoopIterator(const std::vector<llvm::Value*>& prev_iters,
const size_t level_idx);
void addPayloadColumnIterators(const JoinHashTableInterface& hash_table);
void codegenPayloadColumnIterators(const std::vector<llvm::Value*>& prev_iters,
const JoinHashTableInterface& hash_table);
void codegenJoinLoops(const std::vector<JoinLoop>& join_loops,
const RelAlgExecutionUnit& ra_exe_unit,
GroupByAndAggregate& group_by_and_aggregate,
Expand Down Expand Up @@ -880,7 +896,8 @@ class Executor {
const std::vector<InputTableInfo>& query_infos,
const MemoryLevel memory_level,
const JoinHashTableInterface::HashType preferred_hash_type,
ColumnCacheMap& column_cache);
ColumnCacheMap& column_cache,
InputColDescriptorsByScanIdx& payload_cols);
void nukeOldState(const bool allow_lazy_fetch,
const std::vector<InputTableInfo>& query_infos,
const RelAlgExecutionUnit* ra_exe_unit);
Expand Down
Loading