diff --git a/QueryEngine/Execute.cpp b/QueryEngine/Execute.cpp index 3640381078..f80f87d031 100644 --- a/QueryEngine/Execute.cpp +++ b/QueryEngine/Execute.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -75,6 +76,8 @@ bool g_enable_watchdog{false}; bool g_enable_dynamic_watchdog{false}; bool g_use_tbb_pool{false}; +bool g_enable_cpu_sub_tasks{false}; +size_t g_cpu_sub_task_size{500'000}; bool g_enable_filter_function{true}; unsigned g_dynamic_watchdog_time_limit{10000}; bool g_allow_cpu_retry{true}; @@ -341,6 +344,17 @@ size_t Executor::getNumBytesForFetchedRow(const std::set& table_ids_to_fetc return num_bytes; } +bool Executor::hasLazyFetchColumns( + const std::vector& target_exprs) const { + CHECK(plan_state_); + for (const auto target_expr : target_exprs) { + if (plan_state_->isLazyFetchColumn(target_expr)) { + return true; + } + } + return false; +} + std::vector Executor::getColLazyFetchInfo( const std::vector& target_exprs) const { CHECK(plan_state_); @@ -2239,6 +2253,21 @@ void Executor::launchKernels(SharedKernelContext& shared_context, kernel_queue_time_ms_ += timer_stop(clock_begin); THREAD_POOL thread_pool; + // A hack to have unused unit for results collection. + const RelAlgExecutionUnit* ra_exe_unit = + kernels.empty() ? nullptr : &kernels[0]->ra_exe_unit_; + +#ifdef HAVE_TBB + if constexpr (std::is_same::value) { + if (g_use_tbb_pool && g_enable_cpu_sub_tasks && + device_type == ExecutorDeviceType::CPU) { + shared_context.setThreadPool(&thread_pool); + } + } + ScopeGuard pool_guard([&shared_context]() { shared_context.setThreadPool(nullptr); }); +#endif // HAVE_TBB + VLOG(1) << "Launching " << kernels.size() << " kernels for query on " << (device_type == ExecutorDeviceType::CPU ? "CPU"s : "GPU"s) << "."; size_t kernel_idx = 1; @@ -2255,6 +2284,21 @@ void Executor::launchKernels(SharedKernelContext& shared_context, kernel_idx++); } thread_pool.join(); + + for (auto& exec_ctx : shared_context.getTlsExecutionContext()) { + // The first arg is used for GPU only, it's not our case. + // TODO: add QueryExecutionContext::getRowSet() interface + // for our case. + if (exec_ctx) { + ResultSetPtr results; + if (ra_exe_unit->estimator) { + results = std::shared_ptr(exec_ctx->estimator_result_set_.release()); + } else { + results = exec_ctx->getRowSet(*ra_exe_unit, exec_ctx->query_mem_desc_); + } + shared_context.addDeviceResults(std::move(results), {}); + } + } } std::vector Executor::getTableFragmentIndices( @@ -2891,7 +2935,7 @@ int32_t Executor::executePlanWithoutGroupBy( const RelAlgExecutionUnit& ra_exe_unit, const CompilationResult& compilation_result, const bool hoist_literals, - ResultSetPtr& results, + ResultSetPtr* results, const std::vector& target_exprs, const ExecutorDeviceType device_type, std::vector>& col_buffers, @@ -2903,10 +2947,11 @@ int32_t Executor::executePlanWithoutGroupBy( const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, - RenderInfo* render_info) { + RenderInfo* render_info, + const int64_t rows_to_process) { INJECT_TIMER(executePlanWithoutGroupBy); auto timer = DEBUG_TIMER(__func__); - CHECK(!results); + CHECK(!results || !(*results)); if (col_buffers.empty()) { return 0; } @@ -2954,7 +2999,8 @@ int32_t Executor::executePlanWithoutGroupBy( 0, &error_code, num_tables, - join_hash_table_ptrs); + join_hash_table_ptrs, + rows_to_process); output_memory_scope.reset(new OutVecOwner(out_vec)); } else { auto gpu_generated_code = std::dynamic_pointer_cast( @@ -2997,10 +3043,14 @@ int32_t Executor::executePlanWithoutGroupBy( } if (ra_exe_unit.estimator) { CHECK(!error_code); - results = - std::shared_ptr(query_exe_context->estimator_result_set_.release()); + if (results) { + *results = + std::shared_ptr(query_exe_context->estimator_result_set_.release()); + } return 0; } + // Expect delayed results extraction (used for sub-fragments) for estimator only; + CHECK(results); std::vector reduced_outs; const auto num_frags = col_buffers.size(); const size_t entry_count = @@ -3087,7 +3137,7 @@ int32_t Executor::executePlanWithoutGroupBy( auto rows_ptr = std::shared_ptr( query_exe_context->query_buffers_->result_sets_[0].release()); rows_ptr->fillOneEntry(reduced_outs); - results = std::move(rows_ptr); + *results = std::move(rows_ptr); return error_code; } @@ -3104,7 +3154,7 @@ int32_t Executor::executePlanWithGroupBy( const RelAlgExecutionUnit& ra_exe_unit, const CompilationResult& compilation_result, const bool hoist_literals, - ResultSetPtr& results, + ResultSetPtr* results, const ExecutorDeviceType device_type, std::vector>& col_buffers, const std::vector outer_tab_frag_ids, @@ -3118,10 +3168,12 @@ int32_t Executor::executePlanWithGroupBy( const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, - RenderInfo* render_info) { + RenderInfo* render_info, + const int64_t rows_to_process) { auto timer = DEBUG_TIMER(__func__); INJECT_TIMER(executePlanWithGroupBy); - CHECK(!results); + // TODO: get results via a separate method, but need to do something with literals. + CHECK(!results || !(*results)); if (col_buffers.empty()) { return 0; } @@ -3212,7 +3264,8 @@ int32_t Executor::executePlanWithGroupBy( max_matched, &error_code, num_tables, - join_hash_table_ptrs); + join_hash_table_ptrs, + rows_to_process); } else { try { auto gpu_generated_code = std::dynamic_pointer_cast( @@ -3257,13 +3310,13 @@ int32_t Executor::executePlanWithGroupBy( return error_code; } - if (error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW && + if (results && error_code != Executor::ERR_OVERFLOW_OR_UNDERFLOW && error_code != Executor::ERR_DIV_BY_ZERO && !render_allocator_map_ptr) { - results = query_exe_context->getRowSet(ra_exe_unit_copy, - query_exe_context->query_mem_desc_); - CHECK(results); - VLOG(2) << "results->rowCount()=" << results->rowCount(); - results->holdLiterals(hoist_buf); + *results = query_exe_context->getRowSet(ra_exe_unit_copy, + query_exe_context->query_mem_desc_); + CHECK(*results); + VLOG(2) << "results->rowCount()=" << (*results)->rowCount(); + (*results)->holdLiterals(hoist_buf); } if (error_code < 0 && render_allocator_map_ptr) { auto const adjusted_scan_limit = @@ -3276,7 +3329,8 @@ int32_t Executor::executePlanWithGroupBy( return error_code; } } - if (error_code && (!scan_limit || check_rows_less_than_needed(results, scan_limit))) { + if (results && error_code && + (!scan_limit || check_rows_less_than_needed(*results, scan_limit))) { return error_code; // unlucky, not enough results and we ran out of slots } diff --git a/QueryEngine/Execute.h b/QueryEngine/Execute.h index 7afd0bd5ca..d9d17a005f 100644 --- a/QueryEngine/Execute.h +++ b/QueryEngine/Execute.h @@ -462,6 +462,7 @@ class Executor { size_t getNumBytesForFetchedRow(const std::set& table_ids_to_fetch) const; + bool hasLazyFetchColumns(const std::vector& target_exprs) const; std::vector getColLazyFetchInfo( const std::vector& target_exprs) const; @@ -710,10 +711,11 @@ class Executor { const size_t scan_idx, const RelAlgExecutionUnit& ra_exe_unit); + // pass nullptr to results if it shouldn't be extracted from the execution context int32_t executePlanWithGroupBy(const RelAlgExecutionUnit& ra_exe_unit, const CompilationResult&, const bool hoist_literals, - ResultSetPtr& results, + ResultSetPtr* results, const ExecutorDeviceType device_type, std::vector>& col_buffers, const std::vector outer_tab_frag_ids, @@ -727,12 +729,14 @@ class Executor { const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, - RenderInfo* render_info); + RenderInfo* render_info, + const int64_t rows_to_process = -1); + // pass nullptr to results if it shouldn't be extracted from the execution context int32_t executePlanWithoutGroupBy( const RelAlgExecutionUnit& ra_exe_unit, const CompilationResult&, const bool hoist_literals, - ResultSetPtr& results, + ResultSetPtr* results, const std::vector& target_exprs, const ExecutorDeviceType device_type, std::vector>& col_buffers, @@ -744,7 +748,8 @@ class Executor { const uint32_t start_rowid, const uint32_t num_tables, const bool allow_runtime_interrupt, - RenderInfo* render_info); + RenderInfo* render_info, + const int64_t rows_to_process = -1); public: // Temporary, ask saman about this static std::pair reduceResults(const SQLAgg agg, @@ -1164,6 +1169,7 @@ class Executor { friend class ColumnFetcher; friend struct DiamondCodegen; // cgen_state_ friend class ExecutionKernel; + friend class KernelSubtask; friend class HashJoin; // cgen_state_ friend class OverlapsJoinHashTable; friend class GroupByAndAggregate; diff --git a/QueryEngine/ExecutionKernel.cpp b/QueryEngine/ExecutionKernel.cpp index 2188f0f1a4..77fd933d8a 100644 --- a/QueryEngine/ExecutionKernel.cpp +++ b/QueryEngine/ExecutionKernel.cpp @@ -77,6 +77,19 @@ bool need_to_hold_chunk(const Chunk_NS::Chunk* chunk, return false; } +bool need_to_hold_chunk(const std::list>& chunks, + const RelAlgExecutionUnit& ra_exe_unit, + const std::vector& lazy_fetch_info, + const ExecutorDeviceType device_type) { + for (const auto& chunk : chunks) { + if (need_to_hold_chunk(chunk.get(), ra_exe_unit, lazy_fetch_info, device_type)) { + return true; + } + } + + return false; +} + } // namespace const std::vector& SharedKernelContext::getFragOffsets() { @@ -174,38 +187,38 @@ void ExecutionKernel::runImpl(Executor* executor, new std::lock_guard(executor->gpu_exec_mutex_[chosen_device_id])); device_allocator = std::make_unique(data_mgr, chosen_device_id); } - FetchResult fetch_result; + std::shared_ptr fetch_result(new FetchResult); try { std::map all_tables_fragments; QueryFragmentDescriptor::computeAllTablesFragments( all_tables_fragments, ra_exe_unit_, shared_context.getQueryInfos()); - fetch_result = ra_exe_unit_.union_all - ? executor->fetchUnionChunks(column_fetcher, - ra_exe_unit_, - chosen_device_id, - memory_level, - all_tables_fragments, - frag_list, - *catalog, - *chunk_iterators_ptr, - chunks, - device_allocator.get(), - thread_idx, - eo.allow_runtime_query_interrupt) - : executor->fetchChunks(column_fetcher, - ra_exe_unit_, - chosen_device_id, - memory_level, - all_tables_fragments, - frag_list, - *catalog, - *chunk_iterators_ptr, - chunks, - device_allocator.get(), - thread_idx, - eo.allow_runtime_query_interrupt); - if (fetch_result.num_rows.empty()) { + *fetch_result = ra_exe_unit_.union_all + ? executor->fetchUnionChunks(column_fetcher, + ra_exe_unit_, + chosen_device_id, + memory_level, + all_tables_fragments, + frag_list, + *catalog, + *chunk_iterators_ptr, + chunks, + device_allocator.get(), + thread_idx, + eo.allow_runtime_query_interrupt) + : executor->fetchChunks(column_fetcher, + ra_exe_unit_, + chosen_device_id, + memory_level, + all_tables_fragments, + frag_list, + *catalog, + *chunk_iterators_ptr, + chunks, + device_allocator.get(), + thread_idx, + eo.allow_runtime_query_interrupt); + if (fetch_result->num_rows.empty()) { return; } if (eo.with_dynamic_watchdog && @@ -241,7 +254,7 @@ void ExecutionKernel::runImpl(Executor* executor, group_by_and_aggregate.initQueryMemoryDescriptor(false, 0, 8, nullptr, false); device_results_ = run_query_external( query, - fetch_result, + *fetch_result, executor->plan_state_.get(), ExternalQueryOutputSpec{ *query_mem_desc, @@ -258,8 +271,8 @@ void ExecutionKernel::runImpl(Executor* executor, if (kernel_dispatch_mode == ExecutorDispatchMode::KernelPerFragment && query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::Projection) { total_num_input_rows = 0; - std::for_each(fetch_result.num_rows.begin(), - fetch_result.num_rows.end(), + std::for_each(fetch_result->num_rows.begin(), + fetch_result->num_rows.end(), [&total_num_input_rows](const std::vector& frag_row_count) { total_num_input_rows = std::accumulate(frag_row_count.begin(), frag_row_count.end(), @@ -278,6 +291,61 @@ void ExecutionKernel::runImpl(Executor* executor, } } + uint32_t start_rowid{0}; + if (rowid_lookup_key >= 0) { + if (!frag_list.empty()) { + const auto& all_frag_row_offsets = shared_context.getFragOffsets(); + start_rowid = rowid_lookup_key - + all_frag_row_offsets[frag_list.begin()->fragment_ids.front()]; + } + } + +#ifdef HAVE_TBB + bool can_run_subkernels = shared_context.getThreadPool() != nullptr; + + // Sub-tasks are supported for groupby queries and estimators only for now. + bool is_groupby = + (ra_exe_unit_.groupby_exprs.size() > 1) || + (ra_exe_unit_.groupby_exprs.size() == 1 && ra_exe_unit_.groupby_exprs.front()); + can_run_subkernels = can_run_subkernels && (is_groupby || ra_exe_unit_.estimator); + + // In case some column is lazily fetched, we cannot mix different fragments in a single + // ResultSet. + can_run_subkernels = + can_run_subkernels && !executor->hasLazyFetchColumns(ra_exe_unit_.target_exprs); + + // TODO: Use another structure to hold chunks. Currently, ResultSet holds them, but with + // sub-tasks chunk can be referenced by many ResultSets. So, some outer structure to + // hold all ResultSets and all chunks is required. + can_run_subkernels = + can_run_subkernels && + !need_to_hold_chunk( + chunks, ra_exe_unit_, std::vector(), chosen_device_type); + + // TODO: check for literals? We serialize literals before execution and hold them in + // result sets. Can we simply do it once and holdin an outer structure? + if (can_run_subkernels) { + size_t total_rows = fetch_result->num_rows[0][0]; + size_t sub_size = g_cpu_sub_task_size; + + for (size_t sub_start = start_rowid; sub_start < total_rows; sub_start += sub_size) { + sub_size = (sub_start + sub_size > total_rows) ? total_rows - sub_start : sub_size; + auto subtask = std::make_shared(*this, + shared_context, + fetch_result, + chunk_iterators_ptr, + total_num_input_rows, + sub_start, + sub_size, + thread_idx); + shared_context.getThreadPool()->spawn( + [subtask, executor] { subtask->run(executor); }); + } + + return; + } +#endif // HAVE_TBB + if (eo.executor_type == ExecutorType::Native) { try { query_exe_context_owned = @@ -287,8 +355,8 @@ void ExecutionKernel::runImpl(Executor* executor, kernel_dispatch_mode, chosen_device_id, total_num_input_rows, - fetch_result.col_buffers, - fetch_result.frag_offsets, + fetch_result->col_buffers, + fetch_result->frag_offsets, executor->getRowSetMemoryOwner(), compilation_result.output_columnar, query_mem_desc.sortOnGpu(), @@ -301,26 +369,18 @@ void ExecutionKernel::runImpl(Executor* executor, QueryExecutionContext* query_exe_context{query_exe_context_owned.get()}; CHECK(query_exe_context); int32_t err{0}; - uint32_t start_rowid{0}; - if (rowid_lookup_key >= 0) { - if (!frag_list.empty()) { - const auto& all_frag_row_offsets = shared_context.getFragOffsets(); - start_rowid = rowid_lookup_key - - all_frag_row_offsets[frag_list.begin()->fragment_ids.front()]; - } - } if (ra_exe_unit_.groupby_exprs.empty()) { err = executor->executePlanWithoutGroupBy(ra_exe_unit_, compilation_result, query_comp_desc.hoistLiterals(), - device_results_, + &device_results_, ra_exe_unit_.target_exprs, chosen_device_type, - fetch_result.col_buffers, + fetch_result->col_buffers, query_exe_context, - fetch_result.num_rows, - fetch_result.frag_offsets, + fetch_result->num_rows, + fetch_result->frag_offsets, data_mgr, chosen_device_id, start_rowid, @@ -335,13 +395,13 @@ void ExecutionKernel::runImpl(Executor* executor, err = executor->executePlanWithGroupBy(ra_exe_unit_, compilation_result, query_comp_desc.hoistLiterals(), - device_results_, + &device_results_, chosen_device_type, - fetch_result.col_buffers, + fetch_result->col_buffers, outer_tab_frag_ids, query_exe_context, - fetch_result.num_rows, - fetch_result.frag_offsets, + fetch_result->num_rows, + fetch_result->frag_offsets, data_mgr, chosen_device_id, outer_table_id, @@ -371,3 +431,126 @@ void ExecutionKernel::runImpl(Executor* executor, } shared_context.addDeviceResults(std::move(device_results_), outer_tab_frag_ids); } + +#ifdef HAVE_TBB + +void KernelSubtask::run(Executor* executor) { + try { + runImpl(executor); + } catch (const OutOfHostMemory& e) { + throw QueryExecutionError(Executor::ERR_OUT_OF_CPU_MEM, e.what()); + } catch (const std::bad_alloc& e) { + throw QueryExecutionError(Executor::ERR_OUT_OF_CPU_MEM, e.what()); + } catch (const OutOfRenderMemory& e) { + throw QueryExecutionError(Executor::ERR_OUT_OF_RENDER_MEM, e.what()); + } catch (const OutOfMemory& e) { + throw QueryExecutionError( + Executor::ERR_OUT_OF_GPU_MEM, + e.what(), + QueryExecutionProperties{ + kernel_.query_mem_desc.getQueryDescriptionType(), + kernel_.kernel_dispatch_mode == ExecutorDispatchMode::MultifragmentKernel}); + } catch (const ColumnarConversionNotSupported& e) { + throw QueryExecutionError(Executor::ERR_COLUMNAR_CONVERSION_NOT_SUPPORTED, e.what()); + } catch (const TooManyLiterals& e) { + throw QueryExecutionError(Executor::ERR_TOO_MANY_LITERALS, e.what()); + } catch (const StringConstInResultSet& e) { + throw QueryExecutionError(Executor::ERR_STRING_CONST_IN_RESULTSET, e.what()); + } catch (const QueryExecutionError& e) { + throw e; + } +} + +void KernelSubtask::runImpl(Executor* executor) { + auto& query_exe_context_owned = shared_context_.getTlsExecutionContext().local(); + const bool do_render = + kernel_.render_info_ && kernel_.render_info_->isPotentialInSituRender(); + const CompilationResult& compilation_result = + kernel_.query_comp_desc.getCompilationResult(); + + if (!query_exe_context_owned) { + try { + // We pass fake col_buffers and frag_offsets. These are not actually used + // for subtasks but shouldn't pass empty structures to avoid empty results. + std::vector> col_buffers( + fetch_result_->col_buffers.size(), + std::vector(fetch_result_->col_buffers[0].size())); + std::vector> frag_offsets( + fetch_result_->frag_offsets.size(), + std::vector(fetch_result_->frag_offsets[0].size())); + query_exe_context_owned = kernel_.query_mem_desc.getQueryExecutionContext( + kernel_.ra_exe_unit_, + executor, + kernel_.chosen_device_type, + kernel_.kernel_dispatch_mode, + kernel_.chosen_device_id, + total_num_input_rows_, + col_buffers, + frag_offsets, + executor->getRowSetMemoryOwner(), + compilation_result.output_columnar, + kernel_.query_mem_desc.sortOnGpu(), + // TODO: use TBB thread id to choose allocator + thread_idx_, + do_render ? kernel_.render_info_ : nullptr); + } catch (const OutOfHostMemory& e) { + throw QueryExecutionError(Executor::ERR_OUT_OF_CPU_MEM); + } + } + + const int outer_table_id = kernel_.ra_exe_unit_.union_all + ? kernel_.frag_list[0].table_id + : kernel_.ra_exe_unit_.input_descs[0].getTableId(); + const auto& outer_tab_frag_ids = kernel_.frag_list[0].fragment_ids; + auto catalog = executor->getCatalog(); + CHECK(catalog); + QueryExecutionContext* query_exe_context{query_exe_context_owned.get()}; + CHECK(query_exe_context); + int32_t err{0}; + + if (kernel_.ra_exe_unit_.groupby_exprs.empty()) { + err = executor->executePlanWithoutGroupBy(kernel_.ra_exe_unit_, + compilation_result, + kernel_.query_comp_desc.hoistLiterals(), + nullptr, + kernel_.ra_exe_unit_.target_exprs, + kernel_.chosen_device_type, + fetch_result_->col_buffers, + query_exe_context, + fetch_result_->num_rows, + fetch_result_->frag_offsets, + &catalog->getDataMgr(), + kernel_.chosen_device_id, + start_rowid_, + kernel_.ra_exe_unit_.input_descs.size(), + kernel_.eo.allow_runtime_query_interrupt, + do_render ? kernel_.render_info_ : nullptr, + start_rowid_ + num_rows_to_process_); + } else { + err = executor->executePlanWithGroupBy(kernel_.ra_exe_unit_, + compilation_result, + kernel_.query_comp_desc.hoistLiterals(), + nullptr, + kernel_.chosen_device_type, + fetch_result_->col_buffers, + outer_tab_frag_ids, + query_exe_context, + fetch_result_->num_rows, + fetch_result_->frag_offsets, + &catalog->getDataMgr(), + kernel_.chosen_device_id, + outer_table_id, + kernel_.ra_exe_unit_.scan_limit, + start_rowid_, + kernel_.ra_exe_unit_.input_descs.size(), + kernel_.eo.allow_runtime_query_interrupt, + do_render ? kernel_.render_info_ : nullptr, + start_rowid_ + num_rows_to_process_); + } + + if (err) { + throw QueryExecutionError(err); + } +} + +#endif // HAVE_TBB diff --git a/QueryEngine/ExecutionKernel.h b/QueryEngine/ExecutionKernel.h index 6e748cfdf4..293ea718f9 100644 --- a/QueryEngine/ExecutionKernel.h +++ b/QueryEngine/ExecutionKernel.h @@ -20,10 +20,21 @@ #include "QueryEngine/ColumnFetcher.h" #include "QueryEngine/Descriptors/QueryCompilationDescriptor.h" +#include "Shared/threadpool.h" + +#ifdef HAVE_TBB +#include "tbb/enumerable_thread_specific.h" +#endif + class SharedKernelContext { public: SharedKernelContext(const std::vector& query_infos) - : query_infos_(query_infos) {} + : query_infos_(query_infos) +#ifdef HAVE_TBB + , thread_pool_(nullptr) +#endif + { + } const std::vector& getFragOffsets(); @@ -36,6 +47,12 @@ class SharedKernelContext { std::atomic_flag dynamic_watchdog_set = ATOMIC_FLAG_INIT; +#ifdef HAVE_TBB + auto getThreadPool() { return thread_pool_; } + void setThreadPool(threadpool::ThreadPool* pool) { thread_pool_ = pool; } + auto& getTlsExecutionContext() { return tls_execution_context_; } +#endif // HAVE_TBB + private: std::mutex reduce_mutex_; std::vector>> all_fragment_results_; @@ -44,6 +61,12 @@ class SharedKernelContext { std::mutex all_frag_row_offsets_mutex_; const std::vector& query_infos_; const RegisteredQueryHint query_hint_; + +#ifdef HAVE_TBB + threadpool::ThreadPool* thread_pool_; + tbb::enumerable_thread_specific> + tls_execution_context_; +#endif // HAVE_TBB }; class ExecutionKernel { @@ -75,8 +98,9 @@ class ExecutionKernel { const size_t thread_idx, SharedKernelContext& shared_context); - private: const RelAlgExecutionUnit& ra_exe_unit_; + + private: const ExecutorDeviceType chosen_device_type; int chosen_device_id; const ExecutionOptions& eo; @@ -93,4 +117,42 @@ class ExecutionKernel { void runImpl(Executor* executor, const size_t thread_idx, SharedKernelContext& shared_context); + + friend class KernelSubtask; +}; + +#ifdef HAVE_TBB +class KernelSubtask { + public: + KernelSubtask(ExecutionKernel& k, + SharedKernelContext& shared_context, + std::shared_ptr fetch_result, + std::shared_ptr> chunk_iterators, + int64_t total_num_input_rows, + size_t start_rowid, + size_t num_rows_to_process, + size_t thread_idx) + : kernel_(k) + , shared_context_(shared_context) + , fetch_result_(fetch_result) + , chunk_iterators_(chunk_iterators) + , total_num_input_rows_(total_num_input_rows) + , start_rowid_(start_rowid) + , num_rows_to_process_(num_rows_to_process) + , thread_idx_(thread_idx) {} + + void run(Executor* executor); + + private: + void runImpl(Executor* executor); + + ExecutionKernel& kernel_; + SharedKernelContext& shared_context_; + std::shared_ptr fetch_result_; + std::shared_ptr> chunk_iterators_; + int64_t total_num_input_rows_; + size_t start_rowid_; + size_t num_rows_to_process_; + size_t thread_idx_; }; +#endif // HAVE_TBB diff --git a/QueryEngine/QueryExecutionContext.cpp b/QueryEngine/QueryExecutionContext.cpp index 440fb29b9a..65c03d184f 100644 --- a/QueryEngine/QueryExecutionContext.cpp +++ b/QueryEngine/QueryExecutionContext.cpp @@ -607,7 +607,8 @@ std::vector QueryExecutionContext::launchCpuCode( const int32_t scan_limit, int32_t* error_code, const uint32_t num_tables, - const std::vector& join_hash_tables) { + const std::vector& join_hash_tables, + const int64_t num_rows_to_process) { auto timer = DEBUG_TIMER(__func__); INJECT_TIMER(lauchCpuCode); @@ -627,8 +628,12 @@ std::vector QueryExecutionContext::launchCpuCode( const bool is_group_by{query_mem_desc_.isGroupBy()}; std::vector out_vec; if (ra_exe_unit.estimator) { - estimator_result_set_.reset( - new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr)); + // Subfragments collect the result from multiple runs in a single + // result set. + if (!estimator_result_set_) { + estimator_result_set_.reset( + new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr)); + } out_vec.push_back( reinterpret_cast(estimator_result_set_->getHostEstimatorBuffer())); } else { @@ -651,8 +656,14 @@ std::vector QueryExecutionContext::launchCpuCode( flatened_frag_offsets.end(), offsets.begin(), offsets.end()); } int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0}; - auto num_rows_ptr = - rowid_lookup_num_rows ? &rowid_lookup_num_rows : &flatened_num_rows[0]; + int64_t* num_rows_ptr; + if (num_rows_to_process > 0) { + flatened_num_rows[0] = num_rows_to_process; + num_rows_ptr = flatened_num_rows.data(); + } else { + num_rows_ptr = + rowid_lookup_num_rows ? &rowid_lookup_num_rows : flatened_num_rows.data(); + } int32_t total_matched_init{0}; std::vector cmpt_val_buff; diff --git a/QueryEngine/QueryExecutionContext.h b/QueryEngine/QueryExecutionContext.h index d4e36fff73..b130e0bb89 100644 --- a/QueryEngine/QueryExecutionContext.h +++ b/QueryEngine/QueryExecutionContext.h @@ -89,7 +89,8 @@ class QueryExecutionContext : boost::noncopyable { const int32_t scan_limit, int32_t* error_code, const uint32_t num_tables, - const std::vector& join_hash_tables); + const std::vector& join_hash_tables, + const int64_t num_rows_to_process = -1); int64_t getAggInitValForIndex(const size_t index) const; diff --git a/Tests/GroupByTest.cpp b/Tests/GroupByTest.cpp index 4e0ba4fcca..6ef299096a 100644 --- a/Tests/GroupByTest.cpp +++ b/Tests/GroupByTest.cpp @@ -132,7 +132,7 @@ TEST_F(HighCardinalityStringEnv, PerfectHashNoFallback) { auto result = executor->executeWorkUnit(max_groups_buffer_entry_guess, - /*is_agg=*/false, + /*is_agg=*/true, table_infos, ra_exe_unit, CompilationOptions::defaults(ExecutorDeviceType::CPU), @@ -228,7 +228,7 @@ TEST_F(HighCardinalityStringEnv, BaselineFallbackTest) { // expect throw w/out cardinality estimation EXPECT_THROW( executor->executeWorkUnit(max_groups_buffer_entry_guess, - /*is_agg=*/false, + /*is_agg=*/true, table_infos, ra_exe_unit, CompilationOptions::defaults(ExecutorDeviceType::CPU), @@ -241,7 +241,7 @@ TEST_F(HighCardinalityStringEnv, BaselineFallbackTest) { auto result = executor->executeWorkUnit(max_groups_buffer_entry_guess, - /*is_agg=*/false, + /*is_agg=*/true, table_infos, ra_exe_unit, CompilationOptions::defaults(ExecutorDeviceType::CPU), @@ -309,7 +309,7 @@ TEST_F(HighCardinalityStringEnv, BaselineNoFilters) { // no filters, so expect no throw w/out cardinality estimation auto result = executor->executeWorkUnit(max_groups_buffer_entry_guess, - /*is_agg=*/false, + /*is_agg=*/true, table_infos, ra_exe_unit, CompilationOptions::defaults(ExecutorDeviceType::CPU), diff --git a/ThriftHandler/CommandLineOptions.cpp b/ThriftHandler/CommandLineOptions.cpp index 7be59065f7..07fd375a97 100644 --- a/ThriftHandler/CommandLineOptions.cpp +++ b/ThriftHandler/CommandLineOptions.cpp @@ -540,6 +540,17 @@ void CommandLineOptions::fillAdvancedOptions() { ->default_value(g_use_tbb_pool) ->implicit_value(true), "Enable a new thread pool implementation for queuing kernels for execution."); + developer_desc.add_options()( + "enable-cpu-sub-tasks", + po::value(&g_enable_cpu_sub_tasks) + ->default_value(g_enable_cpu_sub_tasks) + ->implicit_value(true), + "Enable parallel processing of a single data fragment on CPU. This can improve CPU " + "load balance and decrease reduction overhead."); + developer_desc.add_options()( + "cpu-sub-task-size", + po::value(&g_cpu_sub_task_size)->default_value(g_cpu_sub_task_size), + "Set CPU sub-task size in rows."); developer_desc.add_options()( "skip-intermediate-count", po::value(&g_skip_intermediate_count) diff --git a/ThriftHandler/CommandLineOptions.h b/ThriftHandler/CommandLineOptions.h index b210ebfca6..6231280036 100644 --- a/ThriftHandler/CommandLineOptions.h +++ b/ThriftHandler/CommandLineOptions.h @@ -195,6 +195,8 @@ extern bool g_enable_s3_fsi; extern bool g_enable_interop; extern bool g_enable_union; extern bool g_use_tbb_pool; +extern bool g_enable_cpu_sub_tasks; +extern size_t g_cpu_sub_task_size; extern bool g_enable_filter_function; extern size_t g_max_import_threads; extern bool g_enable_auto_metadata_update;