Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 16, 2024
1 parent 942f8b2 commit 4dd79a2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
27 changes: 25 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public enum ConnectType {
protected volatile long loginTime;
// for arrow flight
protected volatile String peerIdentity;
protected volatile boolean readyFinalizeArrowFlightSqlRequest = false;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
Expand Down Expand Up @@ -731,6 +732,14 @@ public String getPeerIdentity() {
return peerIdentity;
}

public void setReadyFinalizeArrowFlightSqlRequest() {
this.readyFinalizeArrowFlightSqlRequest = true;
}

public boolean getReadyFinalizeArrowFlightSqlRequest() {
return readyFinalizeArrowFlightSqlRequest;
}

public FlightSqlChannel getFlightSqlChannel() {
throw new RuntimeException("getFlightSqlChannel not in flight sql connection");
}
Expand Down Expand Up @@ -818,8 +827,20 @@ public void addReturnResultFromRemoteExecutor(StmtExecutor executor) {
this.returnResultFromRemoteExecutor.add(executor);
}

/**
* The event that occurs later between FlightSqlConnectProcessorClose and ExecStatusDone will execute finalize.
* Usually, `select` queries will execute FlightSqlConnectProcessorClose first, and then execute ExecStatusDone
* after data generation is completed.
* `insert into values` query will wait for ExecStatusDone in LoadProcessor during `executor.execute()`, and then
* execute FlightSqlConnectProcessorClose.
*/
public void finalizeArrowFlightSqlRequest() {
setThreadLocalInfo();
boolean setThreadLocal = false;
if (get() == null) {
setThreadLocalInfo();
setThreadLocal = true;
}

if (executor != null && executor.getParsedStmt() != null && !executor.getParsedStmt().isExplain()
&& (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile
|| executor.getParsedStmt() instanceof LogicalPlanAdapter
Expand All @@ -839,7 +860,9 @@ public void finalizeArrowFlightSqlRequest() {
// and the query returning results from FE and `insert into` will call unregisterQuery after execute.
executor.finalizeQuery();

remove();
if (setThreadLocal) {
remove();
}
setCommand(MysqlCommand.COM_SLEEP);
clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,12 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params,
DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id), e);
return result;
}
if (params.isDone() && info.getConnectContext().getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
info.getConnectContext().finalizeArrowFlightSqlRequest();
ConnectContext ctx = info.getConnectContext();
if (ctx.getConnectType() == ConnectType.ARROW_FLIGHT_SQL && params.isDone()) {
if (ctx.getReadyFinalizeArrowFlightSqlRequest()) {
ctx.finalizeArrowFlightSqlRequest();
}
ctx.setReadyFinalizeArrowFlightSqlRequest();
}
result.setStatus(new TStatus(TStatusCode.OK));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public void fetchArrowFlightSchema(int timeoutMs) {

@Override
public void close() throws Exception {
if (ctx.getReadyFinalizeArrowFlightSqlRequest()) {
ctx.finalizeArrowFlightSqlRequest();
}
ctx.setReadyFinalizeArrowFlightSqlRequest();
ConnectContext.remove();
}
}

0 comments on commit 4dd79a2

Please sign in to comment.