Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 16, 2024
1 parent 4dd79a2 commit 4b6a18d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
27 changes: 22 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -131,7 +132,9 @@ public enum ConnectType {
protected volatile long loginTime;
// for arrow flight
protected volatile String peerIdentity;
protected volatile boolean readyFinalizeArrowFlightSqlRequest = false;
protected volatile boolean isFlightSqlConnectProcessorClose = false;
protected volatile boolean isExecStatusDone = false;
protected volatile boolean finalizeArrowFlightSqlRequestFinished = false;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
Expand Down Expand Up @@ -732,12 +735,24 @@ public String getPeerIdentity() {
return peerIdentity;
}

public void setReadyFinalizeArrowFlightSqlRequest() {
this.readyFinalizeArrowFlightSqlRequest = true;
public void setIsFlightSqlConnectProcessorClose() {
this.isFlightSqlConnectProcessorClose = true;
}

public boolean getReadyFinalizeArrowFlightSqlRequest() {
return readyFinalizeArrowFlightSqlRequest;
public boolean getIsFlightSqlConnectProcessorClose() {
return isFlightSqlConnectProcessorClose;
}

public void setIsExecStatusDone() {
this.isExecStatusDone = true;
}

public boolean getIsExecStatusDone() {
return isExecStatusDone;
}

public boolean getFinalizeArrowFlightSqlRequestFinished() {
return finalizeArrowFlightSqlRequestFinished;
}

public FlightSqlChannel getFlightSqlChannel() {
Expand Down Expand Up @@ -835,6 +850,8 @@ public void addReturnResultFromRemoteExecutor(StmtExecutor executor) {
* execute FlightSqlConnectProcessorClose.
*/
public void finalizeArrowFlightSqlRequest() {
Preconditions.checkState(!finalizeArrowFlightSqlRequestFinished);
finalizeArrowFlightSqlRequestFinished = true;
boolean setThreadLocal = false;
if (get() == null) {
setThreadLocalInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,11 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params,
}
ConnectContext ctx = info.getConnectContext();
if (ctx.getConnectType() == ConnectType.ARROW_FLIGHT_SQL && params.isDone()) {
if (ctx.getReadyFinalizeArrowFlightSqlRequest()) {
// `params.isDone` may be true in multiple reportExecStatus.
if (ctx.getIsFlightSqlConnectProcessorClose() && !ctx.getFinalizeArrowFlightSqlRequestFinished()) {
ctx.finalizeArrowFlightSqlRequest();
}
ctx.setReadyFinalizeArrowFlightSqlRequest();
ctx.setIsExecStatusDone();
}
result.setStatus(new TStatus(TStatusCode.OK));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ public void fetchArrowFlightSchema(int timeoutMs) {

@Override
public void close() throws Exception {
if (ctx.getReadyFinalizeArrowFlightSqlRequest()) {
if (ctx.getIsExecStatusDone()) {
ctx.finalizeArrowFlightSqlRequest();
}
ctx.setReadyFinalizeArrowFlightSqlRequest();
ctx.setIsFlightSqlConnectProcessorClose();
ConnectContext.remove();
}
}

0 comments on commit 4b6a18d

Please sign in to comment.