Skip to content

Commit

Permalink
[Refactor](exec) Remove unless code and add comment (#46503)
Browse files Browse the repository at this point in the history
Remove unless code and add comment be/src/pipeline/pipeline_task.h/
be/src/vec/runtime/vdatetime_value.h
  • Loading branch information
HappenLee authored Jan 14, 2025
1 parent 8e2cf8a commit fe3b56c
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 112 deletions.
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::vector<std::shared_ptr<Pipeline>> _children;

PipelineId _pipeline_id;
int _previous_schedule_id = -1;

// pipline id + operator names. init when:
// build_operators(), if pipeline;
Expand Down
20 changes: 6 additions & 14 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,14 @@ class PipelineTask {

QueryContext* query_context();

int get_previous_core_id() const {
return _previous_schedule_id != -1 ? _previous_schedule_id
: _pipeline->_previous_schedule_id;
}
int get_core_id() const { return _core_id; }

void set_previous_core_id(int id) {
if (id != _previous_schedule_id) {
if (_previous_schedule_id != -1) {
void set_core_id(int id) {
if (id != _core_id) {
if (_core_id != -1) {
COUNTER_UPDATE(_core_change_times, 1);
}
_previous_schedule_id = id;
_core_id = id;
}
}

Expand Down Expand Up @@ -175,10 +172,6 @@ class PipelineTask {
void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
int get_queue_level() const { return this->_queue_level; }

// 1.3 priority queue's core id
void set_core_id(int core_id) { this->_core_id = core_id; }
int get_core_id() const { return this->_core_id; }

/**
* Return true if:
* 1. `enable_force_spill` is true which forces this task to spill data.
Expand Down Expand Up @@ -254,7 +247,7 @@ class PipelineTask {
bool _has_exceed_timeout = false;
bool _opened;
RuntimeState* _state = nullptr;
int _previous_schedule_id = -1;
int _core_id = -1;
uint32_t _schedule_time = 0;
std::unique_ptr<doris::vectorized::Block> _block;
PipelineFragmentContext* _fragment_context = nullptr;
Expand All @@ -269,7 +262,6 @@ class PipelineTask {
// 2 exe task
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
int _core_id = 0;

RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
Expand Down
14 changes: 7 additions & 7 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = _prio_task_queues[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
Expand All @@ -162,7 +161,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
}
task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
break;
}
}
Expand All @@ -183,15 +181,14 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(next_id < _core_size);
auto task = _prio_task_queues[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
}
}
return nullptr;
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
int core_id = task->get_previous_core_id();
int core_id = task->get_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
Expand All @@ -205,9 +202,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
}

void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
task->inc_runtime_ns(time_spent);
_prio_task_queues[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
// if the task not execute but exception early close, core_id == -1
// should not do update_statistics
if (auto core_id = task->get_core_id(); core_id >= 0) {
task->inc_runtime_ns(time_spent);
_prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), time_spent);
}
}

} // namespace doris::pipeline
22 changes: 8 additions & 14 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#include "common/logging.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
Expand Down Expand Up @@ -103,6 +102,9 @@ void TaskScheduler::_do_work(int index) {
if (!task) {
continue;
}
// The task is already running, maybe block in now dependency wake up by other thread
// but the block thread still hold the task, so put it back to the queue, until the hold
// thread set task->set_running(false)
if (task->is_running()) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
Expand All @@ -129,12 +131,8 @@ void TaskScheduler::_do_work(int index) {
// task exec
bool eos = false;
auto status = Status::OK();
task->set_core_id(index);

#ifdef __APPLE__
uint32_t core_id = 0;
#else
uint32_t core_id = sched_getcpu();
#endif
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
Expand All @@ -149,12 +147,11 @@ void TaskScheduler::_do_work(int index) {

uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
{query_id, task_name, static_cast<uint32_t>(index), thread_id,
start_time, end_time});
} else { status = task->execute(&eos); },
status);

task->set_previous_core_id(index);

if (!status.ok()) {
// Print detail informations below when you debugging here.
//
Expand All @@ -173,14 +170,11 @@ void TaskScheduler::_do_work(int index) {
if (eos) {
// is pending finish will add the task to dependency's blocking queue, and then the task will be
// added to running queue when dependency is ready.
if (task->is_pending_finish()) {
// Only meet eos, should set task to PENDING_FINISH state
task->set_running(false);
} else {
if (!task->is_pending_finish()) {
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_close_task(task, exec_status);
continue;
}
continue;
}

task->set_running(false);
Expand Down
54 changes: 0 additions & 54 deletions be/src/vec/runtime/vdatetime_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2823,40 +2823,6 @@ int date_day_offset_dict::daynr(int year, int month, int day) const {
return DATE_DAY_OFFSET_DICT[year - START_YEAR][month - 1][day - 1];
}

template <typename T>
uint32_t DateV2Value<T>::set_date_uint32(uint32_t int_val) {
union DateV2UInt32Union {
DateV2Value<T> dt;
uint32_t ui32;
~DateV2UInt32Union() {}
};
DateV2UInt32Union conv = {.ui32 = int_val};
if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0)) {
return 0;
}
this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0);

return int_val;
}

template <typename T>
uint64_t DateV2Value<T>::set_datetime_uint64(uint64_t int_val) {
union DateTimeV2UInt64Union {
DateV2Value<T> dt;
uint64_t ui64;
~DateTimeV2UInt64Union() {}
};
DateTimeV2UInt64Union conv = {.ui64 = int_val};
if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(), conv.dt.minute(),
conv.dt.second(), conv.dt.microsecond())) {
return 0;
}
this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(),
conv.dt.minute(), conv.dt.second(), conv.dt.microsecond());

return int_val;
}

template <typename T>
uint8_t DateV2Value<T>::week(uint8_t mode) const {
uint16_t year = 0;
Expand Down Expand Up @@ -3685,26 +3651,6 @@ bool DateV2Value<T>::to_format_string_conservative(const char* format, size_t le
return true;
}

template <typename T>
bool DateV2Value<T>::from_date(uint32_t value) {
DCHECK(!is_datetime);
if (value < MIN_DATE_V2 || value > MAX_DATE_V2) {
return false;
}

return set_date_uint32(value);
}

template <typename T>
bool DateV2Value<T>::from_datetime(uint64_t value) {
DCHECK(is_datetime);
if (value < MIN_DATETIME_V2 || value > MAX_DATETIME_V2) {
return false;
}

return set_datetime_uint64(value);
}

template <typename T>
int64_t DateV2Value<T>::standardize_timevalue(int64_t value) {
if (value <= 0) {
Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/runtime/vdatetime_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -1181,12 +1181,7 @@ class DateV2Value {

underlying_value to_date_int_val() const { return int_val_; }

bool from_date(uint32_t value);
bool from_datetime(uint64_t value);

bool from_date_int64(int64_t value);
uint32_t set_date_uint32(uint32_t int_val);
uint64_t set_datetime_uint64(uint64_t int_val);

bool get_date_from_daynr(uint64_t);

Expand Down
2 changes: 1 addition & 1 deletion be/test/vec/core/block_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ TEST(BlockTest, dump_data) {
auto& date_v2_data = column_vector_date_v2->get_data();
for (int i = 0; i < 1024; ++i) {
DateV2Value<DateV2ValueType> value;
value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
}
vectorized::DataTypePtr date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ void serialize_and_deserialize_mysql_test() {
auto& date_v2_data = column_vector_date_v2->get_data();
for (int i = 0; i < row_num; ++i) {
DateV2Value<DateV2ValueType> value;
value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
}
vectorized::DataTypePtr date_v2_type(
Expand Down
7 changes: 2 additions & 5 deletions be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,9 @@ TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestDateTime) {
uint8_t minute = i;
uint8_t second = 0;
uint32_t microsecond = 123000;
auto value = ((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) |
((uint64_t)day << 37) | ((uint64_t)hour << 32) |
((uint64_t)minute << 26) | ((uint64_t)second << 20) |
(uint64_t)microsecond));

DateV2Value<DateTimeV2ValueType> datetime_v2;
datetime_v2.from_datetime(value);
datetime_v2.unchecked_set_time(year, month, day, hour, minute, second, microsecond);
auto datetime_val = binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(datetime_v2);
data.push_back(datetime_val);
}
Expand Down
2 changes: 1 addition & 1 deletion be/test/vec/jsonb/serialize_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ TEST(BlockSerializeTest, JsonbBlock) {
auto& date_v2_data = column_vector_date_v2->get_data();
for (int i = 0; i < 1024; ++i) {
DateV2Value<DateV2ValueType> value;
value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
}
vectorized::DataTypePtr date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
Expand Down
16 changes: 7 additions & 9 deletions be/test/vec/runtime/vdatetime_value_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TEST(VDateTimeValueTest, date_v2_from_uint32_test) {
uint8_t day = 24;

DateV2Value<DateV2ValueType> date_v2;
date_v2.from_date((uint32_t)((year << 9) | (month << 5) | day));
date_v2.unchecked_set_time(year, month, day, 0, 0, 0, 0);

EXPECT_TRUE(date_v2.year() == year);
EXPECT_TRUE(date_v2.month() == month);
Expand Down Expand Up @@ -114,10 +114,7 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) {
uint32_t microsecond = 999999;

DateV2Value<DateTimeV2ValueType> datetime_v2;
datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) |
((uint64_t)day << 37) | ((uint64_t)hour << 32) |
((uint64_t)minute << 26) | ((uint64_t)second << 20) |
(uint64_t)microsecond));
datetime_v2.unchecked_set_time(year, month, day, hour, minute, second, microsecond);

EXPECT_TRUE(datetime_v2.year() == year);
EXPECT_TRUE(datetime_v2.month() == month);
Expand All @@ -142,10 +139,11 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) {
uint32_t microsecond = 123000;

DateV2Value<DateTimeV2ValueType> datetime_v2;
datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) |
((uint64_t)day << 37) | ((uint64_t)hour << 32) |
((uint64_t)minute << 26) | ((uint64_t)second << 20) |
(uint64_t)microsecond));
auto ui64 = (uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) |
((uint64_t)day << 37) | ((uint64_t)hour << 32) |
((uint64_t)minute << 26) | ((uint64_t)second << 20) |
(uint64_t)microsecond);
datetime_v2 = (DateV2Value<DateTimeV2ValueType>&)ui64;

EXPECT_TRUE(datetime_v2.year() == year);
EXPECT_TRUE(datetime_v2.month() == month);
Expand Down

0 comments on commit fe3b56c

Please sign in to comment.