Skip to content

Commit

Permalink
edit
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb committed Dec 21, 2023
1 parent 062d166 commit d9485c6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 130 deletions.
138 changes: 9 additions & 129 deletions be/src/olap/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,11 @@ Status WalTable::_get_wal_info(const std::string& wal,
}

Status WalTable::_parse_sql(int64_t wal_id, const std::string& wal, const std::string& label,
std::string& sql_str) {
std::string& sql_str, std::vector<size_t>& index_vector) {
std::string columns;
RETURN_IF_ERROR(_read_wal_header(wal, columns));
std::vector<std::string> column_id_element;
doris::vectorized::WalReader::string_split(columns, ",", column_id_element);
std::vector<size_t> index_vector;
std::stringstream ss_name;
std::stringstream ss_id;
int index_raw = 0;
Expand Down Expand Up @@ -253,15 +252,15 @@ Status WalTable::_parse_sql(int64_t wal_id, const std::string& wal, const std::s
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
<< name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
<< std::to_string(_table_id) << "\")";
sql_str = ss.str();
sql_str = ss.str().data();
return Status::OK();
}

Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
const std::string& label) {
std::string sql_str;
RETURN_IF_ERROR(_parse_sql(wal_id, wal, label, sql_str));
LOG(INFO) << "sql_str:" << sql_str;
std::vector<size_t> index_vector;
RETURN_IF_ERROR(_parse_sql(wal_id, wal, label, sql_str, index_vector));
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
ctx->wal_id = wal_id;
ctx->auth.auth_code = wal_id;
Expand Down Expand Up @@ -292,12 +291,6 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
if (!plan_status.ok()) {
auto msg = plan_status.msg();
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
// if (msg.find("LabelAlreadyUsedException") != msg.npos) {
// LOG(INFO) << "skip relay wal " << wal << ",reason " << msg;
// return Status::OK();
// } else {
// return plan_status;
// }
return plan_status;
} else {
ctx->db = ctx->put_result.params.db_name;
Expand All @@ -311,11 +304,6 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
RETURN_IF_ERROR(ctx->future.get());
if (ctx->status.ok()) {
auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
// if (commit_st.ok() || commit_st.is<ErrorCode::PUBLISH_TIMEOUT>()) {
// return Status::OK();
// } else {
// return commit_st;
// }
return commit_st;
} else if (!ctx->status.ok()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
Expand All @@ -331,121 +319,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
return Status::OK();
}
Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) {
// bool need_retry = false;
//#ifndef BE_TEST
//// std::string columns;
//// RETURN_IF_ERROR(_read_wal_header(wal, columns));
//// std::vector<std::string> column_id_element;
//// doris::vectorized::WalReader::string_split(columns, ",", column_id_element);
//// std::vector<size_t> index_vector;
//// std::stringstream ss_name;
//// std::stringstream ss_id;
//// int index_raw = 0;
//// for (auto column_id_str : column_id_element) {
//// try {
//// int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
//// auto it = _column_id_name_map.find(column_id);
//// auto it2 = _column_id_index_map.find(column_id);
//// if (it != _column_id_name_map.end() && it2 != _column_id_index_map.end()) {
//// ss_name << "`" << it->second << "`,";
//// ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ",";
//// index_vector.emplace_back(index_raw);
//// _column_id_name_map.erase(column_id);
//// _column_id_index_map.erase(column_id);
//// }
//// index_raw++;
//// } catch (const std::invalid_argument& e) {
//// return Status::InvalidArgument("Invalid format, {}", e.what());
//// }
//// }
//// _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
//// auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
//// auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
//// std::stringstream ss;
//// ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
//// << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
//// << std::to_string(_table_id) << "\")";
//// std::string sql_str;
//// RETURN_IF_ERROR(_parse_sql(wal_id, wal, label, sql_str));
//// LOG(INFO) << "sql_str:" << sql_str;
//// std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
//// ctx->wal_id = wal_id;
//// ctx->auth.auth_code = wal_id;
//// UniqueId load_id = UniqueId::gen_uid();
//// TUniqueId tload_id;
//// tload_id.__set_hi(load_id.hi);
//// tload_id.__set_lo(load_id.lo);
//// TStreamLoadPutRequest request;
//// request.__set_auth_code(ctx->auth.auth_code);
//// request.__set_load_sql(sql_str);
//// request.__set_loadId(tload_id);
//// request.__set_label(label);
//// if (_exec_env->master_info()->__isset.backend_id) {
//// request.__set_backend_id(_exec_env->master_info()->backend_id);
//// } else {
//// LOG(WARNING) << "_exec_env->master_info not set backend_id";
//// }
//// // plan this load
//// TNetworkAddress master_addr = _exec_env->master_info()->network_address;
//// int64_t stream_load_put_start_time = MonotonicNanos();
//// RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
//// master_addr.hostname, master_addr.port,
//// [&request, ctx](FrontendServiceConnection& client) {
//// client->streamLoadPut(ctx->put_result, request);
//// }));
//// ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
//// Status plan_status(Status::create(ctx->put_result.status));
//// if (!plan_status.ok()) {
//// auto msg = plan_status.msg();
//// LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
//// if (msg.find("LabelAlreadyUsedException") != msg.npos) {
//// LOG(INFO) << "skip relay wal " << wal << ",reason " << msg;
//// need_retry = false;
//// } else {
//// need_retry = true;
//// }
//// } else {
//// ctx->db = ctx->put_result.params.db_name;
//// ctx->table = ctx->put_result.params.table_name;
//// ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
//// ctx->label = ctx->put_result.params.import_label;
//// ctx->put_result.params.__set_wal_id(ctx->wal_id);
//// auto st = _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
//// if (st.ok()) {
//// // wait stream load finish
//// RETURN_IF_ERROR(ctx->future.get());
//// if (ctx->status.ok()) {
//// auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
//// if (commit_st.ok() || commit_st.is<ErrorCode::PUBLISH_TIMEOUT>()) {
//// need_retry = false;
//// } else {
//// need_retry = true;
//// }
//// } else if (!ctx->status.ok()) {
//// LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
//// << ", errmsg=" << ctx->status;
//// _exec_env->stream_load_executor()->rollback_txn(ctx.get());
//// need_retry = true;
//// }
//// }
//// }
//#else
// std::stringstream out;
// out << k_request_line;
// auto out_str = out.str();
// rapidjson::Document doc;
// doc.Parse(out_str.c_str());
// auto status = std::string(doc["Status"].GetString());
// if (status.find("Fail") != status.npos) {
// need_retry = true;
// } else {
// need_retry = false;
// }
//#endif
bool success = false;
#ifndef BE_TEST
auto st = _handle_stream_load(wal_id, wal, label);
auto msg = st.msg();
bool success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
msg.find("LabelAlreadyUsedException") != msg.npos;
success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
msg.find("LabelAlreadyUsedException") != msg.npos;
#endif
if (success) {
LOG(INFO) << "success to replay wal =" << wal;
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/wal_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class WalTable {
bool _need_replay(const replay_wal_info& info);
Status _replay_wal_internal(const std::string& wal);
Status _parse_sql(int64_t wal_id, const std::string& wal, const std::string& label,
std::string& sql_str);
std::string& sql_str, std::vector<size_t>& index_vector);
Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label);

private:
Expand Down

0 comments on commit d9485c6

Please sign in to comment.