Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix be calling fe not setting rpc timeout (backport #45101) #45206

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/audit_statistics_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ AuditStatisticsReporter::AuditStatisticsReporter() {
Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr) {
Status fe_status;
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, &fe_status);
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, config::thrift_rpc_timeout_ms,
&fe_status);
if (!fe_status.ok()) {
LOG(WARNING) << "Couldn't get a client for " << fe_addr;
return fe_status;
Expand All @@ -59,7 +60,7 @@ Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatis
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
// if not TIMED_OUT, retry
rpc_status = coord.reopen();
rpc_status = coord.reopen(config::thrift_rpc_timeout_ms);

if (!rpc_status.ok()) {
return rpc_status;
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ using apache::thrift::transport::TTransportException;
Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr) {
Status fe_status;
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, &fe_status);
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, config::thrift_rpc_timeout_ms,
&fe_status);
if (!fe_status.ok()) {
LOG(WARNING) << "Couldn't get a client for " << fe_addr;
return fe_status;
Expand All @@ -135,7 +136,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
// if not TIMED_OUT, retry
rpc_status = coord.reopen();
rpc_status = coord.reopen(config::thrift_rpc_timeout_ms);

if (!rpc_status.ok()) {
return rpc_status;
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ void QueryContextManager::report_fragments(
auto* runtime_state = fragment_ctx->runtime_state();
DCHECK(runtime_state != nullptr);

FrontendServiceConnection fe_connection(exec_env->frontend_client_cache(), fe_addr, &fe_connection_status);
FrontendServiceConnection fe_connection(exec_env->frontend_client_cache(), fe_addr,
config::thrift_rpc_timeout_ms, &fe_connection_status);
if (!fe_connection_status.ok()) {
std::stringstream ss;
ss << "couldn't get a client for " << fe_addr;
Expand Down Expand Up @@ -545,7 +546,7 @@ void QueryContextManager::report_fragments(
fe_connection->batchReportExecStatus(res, report_batch);
} catch (TTransportException& e) {
LOG(WARNING) << "Retrying ReportExecStatus: " << e.what();
rpc_status = fe_connection.reopen();
rpc_status = fe_connection.reopen(config::thrift_rpc_timeout_ms);
if (!rpc_status.ok()) {
continue;
}
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
Status exec_status = update_status(status);

Status coord_status;
FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status);
FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, config::thrift_rpc_timeout_ms,
&coord_status);
if (!coord_status.ok()) {
std::stringstream ss;
ss << "couldn't get a client for " << _coord_addr;
Expand Down Expand Up @@ -306,7 +307,7 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
coord->reportExecStatus(res, params);
} catch (TTransportException& e) {
LOG(WARNING) << "Retrying ReportExecStatus: " << e.what();
rpc_status = coord.reopen();
rpc_status = coord.reopen(config::thrift_rpc_timeout_ms);

if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
Expand Down Expand Up @@ -657,7 +658,8 @@ void FragmentMgr::report_fragments(const std::vector<TUniqueId>& non_pipeline_ne
Status fe_connection_status;

FrontendServiceConnection fe_connection(fragment_exec_state->exec_env()->frontend_client_cache(),
fragment_exec_state->coord_addr(), &fe_connection_status);
fragment_exec_state->coord_addr(), config::thrift_rpc_timeout_ms,
&fe_connection_status);
if (!fe_connection_status.ok()) {
std::stringstream ss;
ss << "couldn't get a client for " << fragment_exec_state->coord_addr();
Expand Down Expand Up @@ -711,7 +713,7 @@ void FragmentMgr::report_fragments(const std::vector<TUniqueId>& non_pipeline_ne
fe_connection->batchReportExecStatus(res, report_batch);
} catch (TTransportException& e) {
LOG(WARNING) << "Retrying ReportExecStatus: " << e.what();
rpc_status = fe_connection.reopen();
rpc_status = fe_connection.reopen(config::thrift_rpc_timeout_ms);
if (!rpc_status.ok()) {
continue;
}
Expand Down
Loading