Skip to content

Commit

Permalink
edit
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb committed Dec 21, 2023
1 parent a23f75a commit 062d166
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 39 deletions.
196 changes: 157 additions & 39 deletions be/src/olap/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,8 @@ Status WalTable::_get_wal_info(const std::string& wal,
return Status::OK();
}

Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) {
bool need_retry = false;
#ifndef BE_TEST
Status WalTable::_parse_sql(int64_t wal_id, const std::string& wal, const std::string& label,
std::string& sql_str) {
std::string columns;
RETURN_IF_ERROR(_read_wal_header(wal, columns));
std::vector<std::string> column_id_element;
Expand Down Expand Up @@ -254,6 +253,15 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
<< name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
<< std::to_string(_table_id) << "\")";
sql_str = ss.str();
return Status::OK();
}

Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
const std::string& label) {
std::string sql_str;
RETURN_IF_ERROR(_parse_sql(wal_id, wal, label, sql_str));
LOG(INFO) << "sql_str:" << sql_str;
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
ctx->wal_id = wal_id;
ctx->auth.auth_code = wal_id;
Expand All @@ -263,7 +271,7 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
tload_id.__set_lo(load_id.lo);
TStreamLoadPutRequest request;
request.__set_auth_code(ctx->auth.auth_code);
request.__set_load_sql(ss.str());
request.__set_load_sql(sql_str);
request.__set_loadId(tload_id);
request.__set_label(label);
if (_exec_env->master_info()->__isset.backend_id) {
Expand All @@ -284,12 +292,13 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
if (!plan_status.ok()) {
auto msg = plan_status.msg();
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
if (msg.find("LabelAlreadyUsedException") != msg.npos) {
LOG(INFO) << "skip relay wal " << wal << ",reason " << msg;
need_retry = false;
} else {
need_retry = true;
}
// if (msg.find("LabelAlreadyUsedException") != msg.npos) {
// LOG(INFO) << "skip relay wal " << wal << ",reason " << msg;
// return Status::OK();
// } else {
// return plan_status;
// }
return plan_status;
} else {
ctx->db = ctx->put_result.params.db_name;
ctx->table = ctx->put_result.params.table_name;
Expand All @@ -302,33 +311,152 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
RETURN_IF_ERROR(ctx->future.get());
if (ctx->status.ok()) {
auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
if (commit_st.ok() || commit_st.is<ErrorCode::PUBLISH_TIMEOUT>()) {
need_retry = false;
} else {
need_retry = true;
}
// if (commit_st.ok() || commit_st.is<ErrorCode::PUBLISH_TIMEOUT>()) {
// return Status::OK();
// } else {
// return commit_st;
// }
return commit_st;
} else if (!ctx->status.ok()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
need_retry = true;
return ctx->status;
}
} else {
LOG(WARNING) << "execute_plan_fragment fail, id=" << ctx->id << ", errmsg=" << st;
return st;
}
}
#else
std::stringstream out;
out << k_request_line;
auto out_str = out.str();
rapidjson::Document doc;
doc.Parse(out_str.c_str());
auto status = std::string(doc["Status"].GetString());
if (status.find("Fail") != status.npos) {
need_retry = true;
return Status::OK();
}
Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) {
// bool need_retry = false;
//#ifndef BE_TEST
//// std::string columns;
//// RETURN_IF_ERROR(_read_wal_header(wal, columns));
//// std::vector<std::string> column_id_element;
//// doris::vectorized::WalReader::string_split(columns, ",", column_id_element);
//// std::vector<size_t> index_vector;
//// std::stringstream ss_name;
//// std::stringstream ss_id;
//// int index_raw = 0;
//// for (auto column_id_str : column_id_element) {
//// try {
//// int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
//// auto it = _column_id_name_map.find(column_id);
//// auto it2 = _column_id_index_map.find(column_id);
//// if (it != _column_id_name_map.end() && it2 != _column_id_index_map.end()) {
//// ss_name << "`" << it->second << "`,";
//// ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ",";
//// index_vector.emplace_back(index_raw);
//// _column_id_name_map.erase(column_id);
//// _column_id_index_map.erase(column_id);
//// }
//// index_raw++;
//// } catch (const std::invalid_argument& e) {
//// return Status::InvalidArgument("Invalid format, {}", e.what());
//// }
//// }
//// _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
//// auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
//// auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
//// std::stringstream ss;
//// ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
//// << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
//// << std::to_string(_table_id) << "\")";
//// std::string sql_str;
//// RETURN_IF_ERROR(_parse_sql(wal_id, wal, label, sql_str));
//// LOG(INFO) << "sql_str:" << sql_str;
//// std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
//// ctx->wal_id = wal_id;
//// ctx->auth.auth_code = wal_id;
//// UniqueId load_id = UniqueId::gen_uid();
//// TUniqueId tload_id;
//// tload_id.__set_hi(load_id.hi);
//// tload_id.__set_lo(load_id.lo);
//// TStreamLoadPutRequest request;
//// request.__set_auth_code(ctx->auth.auth_code);
//// request.__set_load_sql(sql_str);
//// request.__set_loadId(tload_id);
//// request.__set_label(label);
//// if (_exec_env->master_info()->__isset.backend_id) {
//// request.__set_backend_id(_exec_env->master_info()->backend_id);
//// } else {
//// LOG(WARNING) << "_exec_env->master_info not set backend_id";
//// }
//// // plan this load
//// TNetworkAddress master_addr = _exec_env->master_info()->network_address;
//// int64_t stream_load_put_start_time = MonotonicNanos();
//// RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
//// master_addr.hostname, master_addr.port,
//// [&request, ctx](FrontendServiceConnection& client) {
//// client->streamLoadPut(ctx->put_result, request);
//// }));
//// ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
//// Status plan_status(Status::create(ctx->put_result.status));
//// if (!plan_status.ok()) {
//// auto msg = plan_status.msg();
//// LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
//// if (msg.find("LabelAlreadyUsedException") != msg.npos) {
//// LOG(INFO) << "skip relay wal " << wal << ",reason " << msg;
//// need_retry = false;
//// } else {
//// need_retry = true;
//// }
//// } else {
//// ctx->db = ctx->put_result.params.db_name;
//// ctx->table = ctx->put_result.params.table_name;
//// ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
//// ctx->label = ctx->put_result.params.import_label;
//// ctx->put_result.params.__set_wal_id(ctx->wal_id);
//// auto st = _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
//// if (st.ok()) {
//// // wait stream load finish
//// RETURN_IF_ERROR(ctx->future.get());
//// if (ctx->status.ok()) {
//// auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
//// if (commit_st.ok() || commit_st.is<ErrorCode::PUBLISH_TIMEOUT>()) {
//// need_retry = false;
//// } else {
//// need_retry = true;
//// }
//// } else if (!ctx->status.ok()) {
//// LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
//// << ", errmsg=" << ctx->status;
//// _exec_env->stream_load_executor()->rollback_txn(ctx.get());
//// need_retry = true;
//// }
//// }
//// }
//#else
// std::stringstream out;
// out << k_request_line;
// auto out_str = out.str();
// rapidjson::Document doc;
// doc.Parse(out_str.c_str());
// auto status = std::string(doc["Status"].GetString());
// if (status.find("Fail") != status.npos) {
// need_retry = true;
// } else {
// need_retry = false;
// }
//#endif
auto st = _handle_stream_load(wal_id, wal, label);
auto msg = st.msg();
bool success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
msg.find("LabelAlreadyUsedException") != msg.npos;
if (success) {
LOG(INFO) << "success to replay wal =" << wal;
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.erase(wal)) {
LOG(INFO) << "erase " << wal << " from _replay_wal_map";
} else {
LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map";
}
} else {
need_retry = false;
}
#endif
if (need_retry) {
LOG(INFO) << "fail to replay wal =" << wal;
std::lock_guard<std::mutex> lock(_replay_wal_lock);
auto it = _replay_wal_map.find(wal);
Expand All @@ -338,16 +466,6 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
} else {
_replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false});
}
} else {
LOG(INFO) << "success to replay wal =" << wal;
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.erase(wal)) {
LOG(INFO) << "erase " << wal << " from _replay_wal_map";
} else {
LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map";
}
}
_exec_env->wal_mgr()->erase_wal_column_index(wal_id);
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/wal_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class WalTable {
Status _read_wal_header(const std::string& wal, std::string& columns);
bool _need_replay(const replay_wal_info& info);
Status _replay_wal_internal(const std::string& wal);
Status _parse_sql(int64_t wal_id, const std::string& wal, const std::string& label,
std::string& sql_str);
Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label);

private:
ExecEnv* _exec_env;
Expand Down

0 comments on commit 062d166

Please sign in to comment.