Skip to content

Commit

Permalink
[Feature](pipeline) Trace pipeline scheduling (part I) (apache#31027)
Browse files Browse the repository at this point in the history
  • Loading branch information
zclllyybb authored Feb 22, 2024
1 parent f74b138 commit 67ec419
Show file tree
Hide file tree
Showing 30 changed files with 543 additions and 195 deletions.
2 changes: 2 additions & 0 deletions be/src/common/compiler_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@
#define MAY_ALIAS __attribute__((__may_alias__))

#define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE)))

#define PURE __attribute_pure__
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ DEFINE_mInt32(download_low_speed_time, "300");
// log dir
DEFINE_String(sys_log_dir, "${DORIS_HOME}/log");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
DEFINE_String(pipeline_tracing_log_dir, "${DORIS_HOME}/log/tracing");
// INFO, WARNING, ERROR, FATAL
DEFINE_mString(sys_log_level, "INFO");
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ DECLARE_mInt32(download_low_speed_time);
// log dir
DECLARE_String(sys_log_dir);
DECLARE_String(user_function_dir);
DECLARE_String(pipeline_tracing_log_dir);
// INFO, WARNING, ERROR, FATAL
DECLARE_String(sys_log_level);
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
Expand Down
19 changes: 0 additions & 19 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,23 +354,4 @@ Status DataSink::init(const TDataSink& thrift_sink) {
Status DataSink::prepare(RuntimeState* state) {
return Status::OK();
}

bool DataSink::_has_inverted_index_or_partial_update(TOlapTableSink sink) {
OlapTableSchemaParam schema;
if (!schema.init(sink.schema).ok()) {
return false;
}
if (schema.is_partial_update()) {
return true;
}
for (const auto& index_schema : schema.indexes()) {
for (const auto& index : index_schema->indexes) {
if (index->index_type() == INVERTED) {
return true;
}
}
}
return false;
}

} // namespace doris
3 changes: 0 additions & 3 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ class DataSink {

std::shared_ptr<QueryStatistics> get_query_statistics_ptr();

private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);

protected:
// Set to true after close() has been called. subclasses should check and set this in
// close().
Expand Down
37 changes: 37 additions & 0 deletions be/src/http/action/adjust_tracing_dump.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "adjust_tracing_dump.h"

#include "common/logging.h"
#include "http/http_channel.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "runtime/exec_env.h"

namespace doris {
void AdjustTracingDump::handle(HttpRequest* req) {
auto* ctx = ExecEnv::GetInstance()->pipeline_tracer_context();
auto* params = req->params();
if (auto status = ctx->change_record_params(*params); status.ok()) {
HttpChannel::send_reply(req, "change record type succeed!\n"); // ok
} else { // not ok
LOG(WARNING) << "adjust pipeline tracing dump method failed:" << status.msg() << '\n';
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, status.msg().data());
}
}
} // namespace doris
34 changes: 34 additions & 0 deletions be/src/http/action/adjust_tracing_dump.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "http/http_handler.h"

namespace doris {

class HttpRequest;

class AdjustTracingDump : public HttpHandler {
public:
AdjustTracingDump() = default;

~AdjustTracingDump() override = default;

void handle(HttpRequest* req) override;
};
} // namespace doris
13 changes: 5 additions & 8 deletions be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

#include <butil/macros.h>
#include <glog/logging.h>
#include <stdint.h>

#include <cstdint>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -29,8 +29,7 @@
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/path.h"

namespace doris {
namespace io {
namespace doris::io {

#ifndef FILESYSTEM_M
#define FILESYSTEM_M(stmt) \
Expand Down Expand Up @@ -83,7 +82,6 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {

std::shared_ptr<FileSystem> getSPtr() { return shared_from_this(); }

public:
// the root path of this fs.
// if not empty, all given Path will be "_root_path/path"
const Path& root_path() const { return _root_path; }
Expand All @@ -97,7 +95,8 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {
virtual ~FileSystem() = default;

// Each derived class should implement create method to create fs.
DISALLOW_COPY_AND_ASSIGN(FileSystem);
FileSystem(const FileSystem&) = delete;
const FileSystem& operator=(const FileSystem&) = delete;

protected:
/// create file and return a FileWriter
Expand Down Expand Up @@ -152,7 +151,6 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {
return _root_path / path;
}

protected:
FileSystem(Path&& root_path, std::string&& id, FileSystemType type)
: _root_path(std::move(root_path)), _id(std::move(id)), _type(type) {}

Expand All @@ -163,5 +161,4 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {

using FileSystemSPtr = std::shared_ptr<FileSystem>;

} // namespace io
} // namespace doris
} // namespace doris::io
13 changes: 5 additions & 8 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

#pragma once

#include <stdint.h>
#include <time.h>

#include <cstdint>
#include <ctime>
#include <functional>
#include <memory>
#include <string>
Expand All @@ -29,8 +28,7 @@
#include "io/fs/file_system.h"
#include "io/fs/path.h"

namespace doris {
namespace io {
namespace doris::io {

class LocalFileSystem final : public FileSystem {
public:
Expand Down Expand Up @@ -106,7 +104,6 @@ class LocalFileSystem final : public FileSystem {
LocalFileSystem(Path&& root_path, std::string&& id = "");
};

const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
PURE const std::shared_ptr<LocalFileSystem>& global_local_filesystem();

} // namespace io
} // namespace doris
} // namespace doris::io
9 changes: 3 additions & 6 deletions be/src/io/fs/local_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
#include "io/fs/path.h"
#include "util/slice.h"

namespace doris {
namespace io {
namespace doris::io {

class LocalFileWriter final : public FileWriter {
public:
Expand All @@ -42,11 +41,9 @@ class LocalFileWriter final : public FileWriter {
void _abort();
Status _close(bool sync);

private:
int _fd; // owned
bool _dirty = false;
const bool _sync_data;
const bool _sync_data = false;
};

} // namespace io
} // namespace doris
} // namespace doris::io
13 changes: 11 additions & 2 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

#include "pipeline.h"

#include <ostream>
#include <memory>
#include <string>
#include <utility>

#include "pipeline/exec/operator.h"
Expand All @@ -26,17 +27,25 @@ namespace doris::pipeline {

void Pipeline::_init_profile() {
auto s = fmt::format("Pipeline (pipeline id={})", _pipeline_id);
_pipeline_profile.reset(new RuntimeProfile(std::move(s)));
_pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
}

Status Pipeline::build_operators() {
_name.reserve(_operator_builders.size() * 10);
_name.append(std::to_string(id()));

OperatorPtr pre;
for (auto& operator_t : _operator_builders) {
auto o = operator_t->build_operator();
if (pre) {
static_cast<void>(o->set_child(pre));
}
_operators.emplace_back(o);

_name.push_back('-');
_name.append(std::to_string(operator_t->id()));
_name.append(o->get_name());

pre = std::move(o);
}
return Status::OK();
Expand Down
18 changes: 12 additions & 6 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
#pragma once

#include <glog/logging.h>
#include <stdint.h>

#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <string_view>
#include <utility>
#include <vector>

#include "common/status.h"
Expand All @@ -42,18 +42,19 @@ using PipelineId = uint32_t;
class Pipeline : public std::enable_shared_from_this<Pipeline> {
friend class PipelineTask;
friend class PipelineXTask;
friend class PipelineXFragmentContext;

public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id, int num_tasks,
std::weak_ptr<PipelineFragmentContext> context)
: _pipeline_id(pipeline_id), _context(context), _num_tasks(num_tasks) {
: _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) {
_init_profile();
}

void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
pipeline->_parents.push_back({_operator_builders.size(), weak_from_this()});
_dependencies.push_back({_operator_builders.size(), pipeline});
pipeline->_parents.emplace_back(_operator_builders.size(), weak_from_this());
_dependencies.emplace_back(_operator_builders.size(), pipeline);
}

// If all dependencies are finished, this pipeline task should be scheduled.
Expand Down Expand Up @@ -192,6 +193,11 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::weak_ptr<PipelineFragmentContext> _context;
int _previous_schedule_id = -1;

// pipline id + operator names. init when:
// build_operators(), if pipeline;
// _build_pipelines() and _create_tree_helper(), if pipelineX.
std::string _name;

std::unique_ptr<RuntimeProfile> _pipeline_profile;

// Operators for pipelineX. All pipeline tasks share operators from this.
Expand Down
32 changes: 7 additions & 25 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ namespace doris::pipeline {
bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");

PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id,
int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const TUniqueId& query_id, const TUniqueId& instance_id, int fragment_id, int backend_num,
std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb)
report_status_callback report_status_cb)
: _query_id(query_id),
_fragment_instance_id(instance_id),
_fragment_id(fragment_id),
Expand All @@ -129,7 +129,7 @@ PipelineFragmentContext::PipelineFragmentContext(
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
_is_report_on_cancel(true),
_report_status_cb(report_status_cb),
_report_status_cb(std::move(report_status_cb)),
_create_time(MonotonicNanos()) {
_fragment_watcher.start();
}
Expand Down Expand Up @@ -951,31 +951,13 @@ Status PipelineFragmentContext::send_report(bool done) {
_fragment_instance_id,
_backend_num,
_runtime_state.get(),
[this](auto&& PH1) { return update_status(std::forward<decltype(PH1)>(PH1)); },
[this](auto&& PH1, auto&& PH2) {
cancel(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2));
[this](Status st) { return update_status(st); },
[this](const PPlanFragmentCancelReason& reason, const std::string& msg) {
cancel(reason, msg);
}},
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}

bool PipelineFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) {
OlapTableSchemaParam schema;
if (!schema.init(sink.schema).ok()) {
return false;
}
if (schema.is_partial_update()) {
return true;
}
for (const auto& index_schema : schema.indexes()) {
for (const auto& index : index_schema->indexes) {
if (index->index_type() == INVERTED) {
return true;
}
}
}
return false;
}

std::string PipelineFragmentContext::debug_string() {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info: QueryId = {}\n",
Expand Down
Loading

0 comments on commit 67ec419

Please sign in to comment.