Skip to content

Commit

Permalink
Add initial sub-fragments support
Browse files Browse the repository at this point in the history
Resolves #643

Use sub-fragments for additional parallelism and better load balance. Works for aggregates only for now.

* Add initial sub-fragments support.

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* Fix review comments

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* Rename sub-fragments and use them for CPU only.

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* Fix groupby detection and hold fetched chunks in sub-tasks.

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* Fix inner table iteration for sub-tasks.

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* Adjust HighCardinalityGroupByTest to sub-tasks.

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* Add exception handling for KernelSubtask

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* fix build

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

* Add comments

Signed-off-by: ienkovich <[email protected]>
Signed-off-by: Yoon-Min Nam <[email protected]>

Co-authored-by: ienkovich <[email protected]>
  • Loading branch information
2 people authored and andrewseidl committed Aug 16, 2021
1 parent bd7702d commit 2f97526
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 82 deletions.
90 changes: 72 additions & 18 deletions QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <numeric>
#include <set>
#include <thread>
Expand Down Expand Up @@ -75,6 +76,8 @@
bool g_enable_watchdog{false};
bool g_enable_dynamic_watchdog{false};
bool g_use_tbb_pool{false};
bool g_enable_cpu_sub_tasks{false};
size_t g_cpu_sub_task_size{500'000};
bool g_enable_filter_function{true};
unsigned g_dynamic_watchdog_time_limit{10000};
bool g_allow_cpu_retry{true};
Expand Down Expand Up @@ -341,6 +344,17 @@ size_t Executor::getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetc
return num_bytes;
}

bool Executor::hasLazyFetchColumns(
const std::vector<Analyzer::Expr*>& target_exprs) const {
CHECK(plan_state_);
for (const auto target_expr : target_exprs) {
if (plan_state_->isLazyFetchColumn(target_expr)) {
return true;
}
}
return false;
}

std::vector<ColumnLazyFetchInfo> Executor::getColLazyFetchInfo(
const std::vector<Analyzer::Expr*>& target_exprs) const {
CHECK(plan_state_);
Expand Down Expand Up @@ -2239,6 +2253,21 @@ void Executor::launchKernels(SharedKernelContext& shared_context,
kernel_queue_time_ms_ += timer_stop(clock_begin);

THREAD_POOL thread_pool;
// A hack to have unused unit for results collection.
const RelAlgExecutionUnit* ra_exe_unit =
kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_;

#ifdef HAVE_TBB
if constexpr (std::is_same<decltype(&thread_pool),
decltype(shared_context.getThreadPool())>::value) {
if (g_use_tbb_pool && g_enable_cpu_sub_tasks &&
device_type == ExecutorDeviceType::CPU) {
shared_context.setThreadPool(&thread_pool);
}
}
ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); });
#endif // HAVE_TBB

VLOG(1) << "Launching " << kernels.size() << " kernels for query on "
<< (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s) << ".";
size_t kernel_idx = 1;
Expand All @@ -2255,6 +2284,21 @@ void Executor::launchKernels(SharedKernelContext& shared_context,
kernel_idx++);
}
thread_pool.join();

for (auto& exec_ctx : shared_context.getTlsExecutionContext()) {
// The first arg is used for GPU only, it's not our case.
// TODO: add QueryExecutionContext::getRowSet() interface
// for our case.
if (exec_ctx) {
ResultSetPtr results;
if (ra_exe_unit->estimator) {
results = std::shared_ptr<ResultSet>(exec_ctx->estimator_result_set_.release());
} else {
results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_);
}
shared_context.addDeviceResults(std::move(results), {});
}
}
}

std::vector<size_t> Executor::getTableFragmentIndices(
Expand Down Expand Up @@ -2891,7 +2935,7 @@ int32_t Executor::executePlanWithoutGroupBy(
const RelAlgExecutionUnit& ra_exe_unit,
const CompilationResult& compilation_result,
const bool hoist_literals,
ResultSetPtr& results,
ResultSetPtr* results,
const std::vector<Analyzer::Expr*>& target_exprs,
const ExecutorDeviceType device_type,
std::vector<std::vector<const int8_t*>>& col_buffers,
Expand All @@ -2903,10 +2947,11 @@ int32_t Executor::executePlanWithoutGroupBy(
const uint32_t start_rowid,
const uint32_t num_tables,
const bool allow_runtime_interrupt,
RenderInfo* render_info) {
RenderInfo* render_info,
const int64_t rows_to_process) {
INJECT_TIMER(executePlanWithoutGroupBy);
auto timer = DEBUG_TIMER(__func__);
CHECK(!results);
CHECK(!results || !(*results));
if (col_buffers.empty()) {
return 0;
}
Expand Down Expand Up @@ -2954,7 +2999,8 @@ int32_t Executor::executePlanWithoutGroupBy(
0,
&error_code,
num_tables,
join_hash_table_ptrs);
join_hash_table_ptrs,
rows_to_process);
output_memory_scope.reset(new OutVecOwner(out_vec));
} else {
auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
Expand Down Expand Up @@ -2997,10 +3043,14 @@ int32_t Executor::executePlanWithoutGroupBy(
}
if (ra_exe_unit.estimator) {
CHECK(!error_code);
results =
std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
if (results) {
*results =
std::shared_ptr<ResultSet>(query_exe_context->estimator_result_set_.release());
}
return 0;
}
// Expect delayed results extraction (used for sub-fragments) for estimator only;
CHECK(results);
std::vector<int64_t> reduced_outs;
const auto num_frags = col_buffers.size();
const size_t entry_count =
Expand Down Expand Up @@ -3087,7 +3137,7 @@ int32_t Executor::executePlanWithoutGroupBy(
auto rows_ptr = std::shared_ptr<ResultSet>(
query_exe_context->query_buffers_->result_sets_[0].release());
rows_ptr->fillOneEntry(reduced_outs);
results = std::move(rows_ptr);
*results = std::move(rows_ptr);
return error_code;
}

Expand All @@ -3104,7 +3154,7 @@ int32_t Executor::executePlanWithGroupBy(
const RelAlgExecutionUnit& ra_exe_unit,
const CompilationResult& compilation_result,
const bool hoist_literals,
ResultSetPtr& results,
ResultSetPtr* results,
const ExecutorDeviceType device_type,
std::vector<std::vector<const int8_t*>>& col_buffers,
const std::vector<size_t> outer_tab_frag_ids,
Expand All @@ -3118,10 +3168,12 @@ int32_t Executor::executePlanWithGroupBy(
const uint32_t start_rowid,
const uint32_t num_tables,
const bool allow_runtime_interrupt,
RenderInfo* render_info) {
RenderInfo* render_info,
const int64_t rows_to_process) {
auto timer = DEBUG_TIMER(__func__);
INJECT_TIMER(executePlanWithGroupBy);
CHECK(!results);
// TODO: get results via a separate method, but need to do something with literals.
CHECK(!results || !(*results));
if (col_buffers.empty()) {
return 0;
}
Expand Down Expand Up @@ -3212,7 +3264,8 @@ int32_t Executor::executePlanWithGroupBy(
max_matched,
&error_code,
num_tables,
join_hash_table_ptrs);
join_hash_table_ptrs,
rows_to_process);
} else {
try {
auto gpu_generated_code = std::dynamic_pointer_cast<GpuCompilationContext>(
Expand Down Expand Up @@ -3257,13 +3310,13 @@ int32_t Executor::executePlanWithGroupBy(
return error_code;
}

if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW &&
error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) {
results = query_exe_context->getRowSet(ra_exe_unit_copy,
query_exe_context->query_mem_desc_);
CHECK(results);
VLOG(2) << "results->rowCount()=" << results->rowCount();
results->holdLiterals(hoist_buf);
*results = query_exe_context->getRowSet(ra_exe_unit_copy,
query_exe_context->query_mem_desc_);
CHECK(*results);
VLOG(2) << "results->rowCount()=" << (*results)->rowCount();
(*results)->holdLiterals(hoist_buf);
}
if (error_code < 0 && render_allocator_map_ptr) {
auto const adjusted_scan_limit =
Expand All @@ -3276,7 +3329,8 @@ int32_t Executor::executePlanWithGroupBy(
return error_code;
}
}
if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) {
if (results && error_code &&
(!scan_limit || check_rows_less_than_needed(*results, scan_limit))) {
return error_code; // unlucky, not enough results and we ran out of slots
}

Expand Down
14 changes: 10 additions & 4 deletions QueryEngine/Execute.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ class Executor {

size_t getNumBytesForFetchedRow(const std::set<int>& table_ids_to_fetch) const;

bool hasLazyFetchColumns(const std::vector<Analyzer::Expr*>& target_exprs) const;
std::vector<ColumnLazyFetchInfo> getColLazyFetchInfo(
const std::vector<Analyzer::Expr*>& target_exprs) const;

Expand Down Expand Up @@ -710,10 +711,11 @@ class Executor {
const size_t scan_idx,
const RelAlgExecutionUnit& ra_exe_unit);

// pass nullptr to results if it shouldn't be extracted from the execution context
int32_t executePlanWithGroupBy(const RelAlgExecutionUnit& ra_exe_unit,
const CompilationResult&,
const bool hoist_literals,
ResultSetPtr& results,
ResultSetPtr* results,
const ExecutorDeviceType device_type,
std::vector<std::vector<const int8_t*>>& col_buffers,
const std::vector<size_t> outer_tab_frag_ids,
Expand All @@ -727,12 +729,14 @@ class Executor {
const uint32_t start_rowid,
const uint32_t num_tables,
const bool allow_runtime_interrupt,
RenderInfo* render_info);
RenderInfo* render_info,
const int64_t rows_to_process = -1);
// pass nullptr to results if it shouldn't be extracted from the execution context
int32_t executePlanWithoutGroupBy(
const RelAlgExecutionUnit& ra_exe_unit,
const CompilationResult&,
const bool hoist_literals,
ResultSetPtr& results,
ResultSetPtr* results,
const std::vector<Analyzer::Expr*>& target_exprs,
const ExecutorDeviceType device_type,
std::vector<std::vector<const int8_t*>>& col_buffers,
Expand All @@ -744,7 +748,8 @@ class Executor {
const uint32_t start_rowid,
const uint32_t num_tables,
const bool allow_runtime_interrupt,
RenderInfo* render_info);
RenderInfo* render_info,
const int64_t rows_to_process = -1);

public: // Temporary, ask saman about this
static std::pair<int64_t, int32_t> reduceResults(const SQLAgg agg,
Expand Down Expand Up @@ -1164,6 +1169,7 @@ class Executor {
friend class ColumnFetcher;
friend struct DiamondCodegen; // cgen_state_
friend class ExecutionKernel;
friend class KernelSubtask;
friend class HashJoin; // cgen_state_
friend class OverlapsJoinHashTable;
friend class GroupByAndAggregate;
Expand Down
Loading

0 comments on commit 2f97526

Please sign in to comment.