Skip to content

Commit

Permalink
use thrift
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb committed Dec 21, 2023
1 parent 2bab3dc commit eaa3c29
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 41 deletions.
88 changes: 56 additions & 32 deletions be/src/olap/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "http/http_headers.h"
#include "http/utils.h"
#include "io/fs/local_file_system.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
Expand All @@ -37,11 +36,7 @@
namespace doris {

WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
: _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {
_http_stream_action = std::make_shared<HttpStreamAction>(_exec_env);
_evhttp_req = evhttp_request_new(nullptr, nullptr);
_req = std::make_shared<HttpRequest>(_evhttp_req);
}
: _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {}
WalTable::~WalTable() {}

#ifdef BE_TEST
Expand Down Expand Up @@ -236,7 +231,7 @@ Status WalTable::_get_wal_info(const std::string& wal,
}

Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) {
bool retry = false;
bool need_retry = false;
#ifndef BE_TEST
std::string columns;
RETURN_IF_ERROR(_read_wal_header(wal, columns));
Expand Down Expand Up @@ -270,33 +265,62 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
<< name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
<< std::to_string(_table_id) << "\")";
_req->clear_header();
_req->add_header(HTTP_SQL.c_str(), ss.str().c_str());
_req->add_header(HTTP_LABEL_KEY.c_str(), label.c_str());
_req->add_header(HTTP_AUTH_CODE.c_str(), std::to_string(wal_id).c_str());
_req->add_header(HTTP_WAL_ID_KY.c_str(), std::to_string(wal_id).c_str());
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
ctx->wal_id = wal_id;
ctx->auth.auth_code = wal_id;
auto st = _http_stream_action->process_put(_req.get(), ctx);
auto msg = st.msg();
if (st.ok()) {
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
if (ctx->status.ok()) {
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
}
if (!ctx->status.ok() && !ctx->status.is<ErrorCode::PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
retry = true;
UniqueId load_id = UniqueId::gen_uid();
TUniqueId tload_id;
tload_id.__set_hi(load_id.hi);
tload_id.__set_lo(load_id.lo);
TStreamLoadPutRequest request;
request.__set_auth_code(ctx->auth.auth_code);
request.__set_load_sql(ss.str());
request.__set_loadId(tload_id);
request.__set_label(label);
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backend_id(_exec_env->master_info()->backend_id);
} else {
LOG(WARNING) << "_exec_env->master_info not set backend_id";
}
// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, ctx](FrontendServiceConnection& client) {
client->streamLoadPut(ctx->put_result, request);
}));
ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
Status plan_status(Status::create(ctx->put_result.status));
if (!plan_status.ok()) {
auto msg = plan_status.msg();
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
if (msg.find("LabelAlreadyUsedException") != msg.npos) {
LOG(INFO) << "skip relay wal " << wal << ",reason " << msg;
need_retry = false;
} else {
need_retry = true;
}
} else if (msg.find("LabelAlreadyUsedException") != msg.npos) {
LOG(INFO) << "skip relay wal " << wal << ",reason " << msg;
retry = false;
} else {
retry = true;
ctx->db = ctx->put_result.params.db_name;
ctx->table = ctx->put_result.params.table_name;
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
auto st = _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
if (st.ok()) {
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
if (ctx->status.ok()) {
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
need_retry = false;
} else if (!ctx->status.ok()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
need_retry = true;
}
}
}
#else
std::stringstream out;
Expand All @@ -306,12 +330,12 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
doc.Parse(out_str.c_str());
auto status = std::string(doc["Status"].GetString());
if (status.find("Fail") != status.npos) {
retry = true;
need_retry = true;
} else {
retry = false;
need_retry = false;
}
#endif
if (retry) {
if (need_retry) {
LOG(INFO) << "fail to replay wal =" << wal;
std::lock_guard<std::mutex> lock(_replay_wal_lock);
auto it = _replay_wal_map.find(wal);
Expand Down
9 changes: 0 additions & 9 deletions be/src/olap/wal_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/http.h>

#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "common/status.h"
#include "evhttp.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "http/action/http_stream.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
namespace doris {
Expand Down Expand Up @@ -70,8 +64,5 @@ class WalTable {
std::atomic<bool> _stop;
std::map<int64_t, std::string> _column_id_name_map;
std::map<int64_t, int64_t> _column_id_index_map;
std::shared_ptr<HttpStreamAction> _http_stream_action = nullptr;
evhttp_request* _evhttp_req = nullptr;
std::shared_ptr<HttpRequest> _req = nullptr;
};
} // namespace doris

0 comments on commit eaa3c29

Please sign in to comment.