diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp index a971d6ac5ff0e4a..0e8a3b9d3da321b 100644 --- a/be/src/olap/wal_table.cpp +++ b/be/src/olap/wal_table.cpp @@ -219,9 +219,8 @@ Status WalTable::_get_wal_info(const std::string& wal, return Status::OK(); } -Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) { - bool need_retry = false; -#ifndef BE_TEST +Status WalTable::_parse_sql(int64_t wal_id, const std::string& wal, const std::string& label, + std::string& sql_str) { std::string columns; RETURN_IF_ERROR(_read_wal_header(wal, columns)); std::vector column_id_element; @@ -254,6 +253,15 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " (" << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" << std::to_string(_table_id) << "\")"; + sql_str = ss.str(); + return Status::OK(); +} + +Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, + const std::string& label) { + std::string sql_str; + RETURN_IF_ERROR(_parse_sql(wal_id, wal, label, sql_str)); + LOG(INFO) << "sql_str:" << sql_str; std::shared_ptr ctx = std::make_shared(_exec_env); ctx->wal_id = wal_id; ctx->auth.auth_code = wal_id; @@ -263,7 +271,7 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std tload_id.__set_lo(load_id.lo); TStreamLoadPutRequest request; request.__set_auth_code(ctx->auth.auth_code); - request.__set_load_sql(ss.str()); + request.__set_load_sql(sql_str); request.__set_loadId(tload_id); request.__set_label(label); if (_exec_env->master_info()->__isset.backend_id) { @@ -284,12 +292,13 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std if (!plan_status.ok()) { auto msg = plan_status.msg(); LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); - if (msg.find("LabelAlreadyUsedException") != msg.npos) { - LOG(INFO) << "skip relay wal " << wal << ",reason " << msg; - need_retry = false; - } else { - need_retry = true; - } +// if (msg.find("LabelAlreadyUsedException") != msg.npos) { +// LOG(INFO) << "skip relay wal " << wal << ",reason " << msg; +// return Status::OK(); +// } else { +// return plan_status; +// } + return plan_status; } else { ctx->db = ctx->put_result.params.db_name; ctx->table = ctx->put_result.params.table_name; @@ -302,33 +311,152 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std RETURN_IF_ERROR(ctx->future.get()); if (ctx->status.ok()) { auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get()); - if (commit_st.ok() || commit_st.is()) { - need_retry = false; - } else { - need_retry = true; - } +// if (commit_st.ok() || commit_st.is()) { +// return Status::OK(); +// } else { +// return commit_st; +// } + return commit_st; } else if (!ctx->status.ok()) { LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << ctx->status; _exec_env->stream_load_executor()->rollback_txn(ctx.get()); - need_retry = true; + return ctx->status; } + } else { + LOG(WARNING) << "execute_plan_fragment fail, id=" << ctx->id << ", errmsg=" << st; + return st; } } -#else - std::stringstream out; - out << k_request_line; - auto out_str = out.str(); - rapidjson::Document doc; - doc.Parse(out_str.c_str()); - auto status = std::string(doc["Status"].GetString()); - if (status.find("Fail") != status.npos) { - need_retry = true; + return Status::OK(); +} +Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) { +// bool need_retry = false; +//#ifndef BE_TEST +//// std::string columns; +//// RETURN_IF_ERROR(_read_wal_header(wal, columns)); +//// std::vector column_id_element; +//// doris::vectorized::WalReader::string_split(columns, ",", column_id_element); +//// std::vector index_vector; +//// std::stringstream ss_name; +//// std::stringstream ss_id; +//// int index_raw = 0; +//// for (auto column_id_str : column_id_element) { +//// try { +//// int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10); +//// auto it = _column_id_name_map.find(column_id); +//// auto it2 = _column_id_index_map.find(column_id); +//// if (it != _column_id_name_map.end() && it2 != _column_id_index_map.end()) { +//// ss_name << "`" << it->second << "`,"; +//// ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ","; +//// index_vector.emplace_back(index_raw); +//// _column_id_name_map.erase(column_id); +//// _column_id_index_map.erase(column_id); +//// } +//// index_raw++; +//// } catch (const std::invalid_argument& e) { +//// return Status::InvalidArgument("Invalid format, {}", e.what()); +//// } +//// } +//// _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector); +//// auto name = ss_name.str().substr(0, ss_name.str().size() - 1); +//// auto id = ss_id.str().substr(0, ss_id.str().size() - 1); +//// std::stringstream ss; +//// ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " (" +//// << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" +//// << std::to_string(_table_id) << "\")"; +//// std::string sql_str; +//// RETURN_IF_ERROR(_parse_sql(wal_id, wal, label, sql_str)); +//// LOG(INFO) << "sql_str:" << sql_str; +//// std::shared_ptr ctx = std::make_shared(_exec_env); +//// ctx->wal_id = wal_id; +//// ctx->auth.auth_code = wal_id; +//// UniqueId load_id = UniqueId::gen_uid(); +//// TUniqueId tload_id; +//// tload_id.__set_hi(load_id.hi); +//// tload_id.__set_lo(load_id.lo); +//// TStreamLoadPutRequest request; +//// request.__set_auth_code(ctx->auth.auth_code); +//// request.__set_load_sql(sql_str); +//// request.__set_loadId(tload_id); +//// request.__set_label(label); +//// if (_exec_env->master_info()->__isset.backend_id) { +//// request.__set_backend_id(_exec_env->master_info()->backend_id); +//// } else { +//// LOG(WARNING) << "_exec_env->master_info not set backend_id"; +//// } +//// // plan this load +//// TNetworkAddress master_addr = _exec_env->master_info()->network_address; +//// int64_t stream_load_put_start_time = MonotonicNanos(); +//// RETURN_IF_ERROR(ThriftRpcHelper::rpc( +//// master_addr.hostname, master_addr.port, +//// [&request, ctx](FrontendServiceConnection& client) { +//// client->streamLoadPut(ctx->put_result, request); +//// })); +//// ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; +//// Status plan_status(Status::create(ctx->put_result.status)); +//// if (!plan_status.ok()) { +//// auto msg = plan_status.msg(); +//// LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); +//// if (msg.find("LabelAlreadyUsedException") != msg.npos) { +//// LOG(INFO) << "skip relay wal " << wal << ",reason " << msg; +//// need_retry = false; +//// } else { +//// need_retry = true; +//// } +//// } else { +//// ctx->db = ctx->put_result.params.db_name; +//// ctx->table = ctx->put_result.params.table_name; +//// ctx->txn_id = ctx->put_result.params.txn_conf.txn_id; +//// ctx->label = ctx->put_result.params.import_label; +//// ctx->put_result.params.__set_wal_id(ctx->wal_id); +//// auto st = _exec_env->stream_load_executor()->execute_plan_fragment(ctx); +//// if (st.ok()) { +//// // wait stream load finish +//// RETURN_IF_ERROR(ctx->future.get()); +//// if (ctx->status.ok()) { +//// auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get()); +//// if (commit_st.ok() || commit_st.is()) { +//// need_retry = false; +//// } else { +//// need_retry = true; +//// } +//// } else if (!ctx->status.ok()) { +//// LOG(WARNING) << "handle streaming load failed, id=" << ctx->id +//// << ", errmsg=" << ctx->status; +//// _exec_env->stream_load_executor()->rollback_txn(ctx.get()); +//// need_retry = true; +//// } +//// } +//// } +//#else +// std::stringstream out; +// out << k_request_line; +// auto out_str = out.str(); +// rapidjson::Document doc; +// doc.Parse(out_str.c_str()); +// auto status = std::string(doc["Status"].GetString()); +// if (status.find("Fail") != status.npos) { +// need_retry = true; +// } else { +// need_retry = false; +// } +//#endif + auto st = _handle_stream_load(wal_id, wal, label); + auto msg = st.msg(); + bool success = st.ok() || st.is() || + msg.find("LabelAlreadyUsedException") != msg.npos; + if (success) { + LOG(INFO) << "success to replay wal =" << wal; + RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id)); + std::lock_guard lock(_replay_wal_lock); + if (_replay_wal_map.erase(wal)) { + LOG(INFO) << "erase " << wal << " from _replay_wal_map"; + } else { + LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map"; + } } else { - need_retry = false; - } -#endif - if (need_retry) { LOG(INFO) << "fail to replay wal =" << wal; std::lock_guard lock(_replay_wal_lock); auto it = _replay_wal_map.find(wal); @@ -338,16 +466,6 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std } else { _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); } - } else { - LOG(INFO) << "success to replay wal =" << wal; - RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); - RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id)); - std::lock_guard lock(_replay_wal_lock); - if (_replay_wal_map.erase(wal)) { - LOG(INFO) << "erase " << wal << " from _replay_wal_map"; - } else { - LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map"; - } } _exec_env->wal_mgr()->erase_wal_column_index(wal_id); return Status::OK(); diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h index 354f4f16b05cc4a..a59de6994d6f1f6 100644 --- a/be/src/olap/wal_table.h +++ b/be/src/olap/wal_table.h @@ -50,6 +50,9 @@ class WalTable { Status _read_wal_header(const std::string& wal, std::string& columns); bool _need_replay(const replay_wal_info& info); Status _replay_wal_internal(const std::string& wal); + Status _parse_sql(int64_t wal_id, const std::string& wal, const std::string& label, + std::string& sql_str); + Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label); private: ExecEnv* _exec_env;