diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 3110b064125..fdcc0c3406d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -62,18 +62,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin if (queryTimeout > 0) { val timeoutExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false) - val action: Runnable = () => - // Clients less than version 2.1 have no HIVE-4924 Patch, - // no queryTimeout parameter and no TIMEOUT status. - // When the server enables kyuubi.operation.query.timeout, - // this will cause the client of the lower version to get stuck. - // Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0), - // convert TIMEDOUT_STATE to CANCELED. - if (isHive21OrLower) { - cleanup(OperationState.CANCELED) - } else { - cleanup(OperationState.TIMEOUT) - } + val action: Runnable = () => cleanup(OperationState.TIMEOUT) timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS) statementTimeoutCleaner = Some(timeoutExecutor) } @@ -286,8 +275,4 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin } } } - - protected def isHive21OrLower: Boolean = { - getProtocolVersion.getValue <= TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue - } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 02c22e4f6a7..83e67abb67b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -117,15 +117,6 @@ class ExecuteStatement( case CANCELED_STATE => setState(OperationState.CANCELED) - case TIMEDOUT_STATE - // Clients less than version 2.1 have no HIVE-4924 Patch, - // no queryTimeout parameter and no TIMEOUT status. - // When the server enables kyuubi.operation.query.timeout, - // this will cause the client of the lower version to get stuck. - // Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0), - // convert TIMEDOUT_STATE to CANCELED. - if isHive21OrLower => setState(OperationState.CANCELED) - case TIMEDOUT_STATE => setState(OperationState.TIMEOUT) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBackendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBackendService.scala index b974bdf5833..d58722df118 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBackendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBackendService.scala @@ -17,12 +17,33 @@ package org.apache.kyuubi.server +import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationStatus} import org.apache.kyuubi.service.AbstractBackendService import org.apache.kyuubi.session.{KyuubiSessionManager, SessionManager} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion class KyuubiBackendService(name: String) extends AbstractBackendService(name) { def this() = this(classOf[KyuubiBackendService].getSimpleName) override val sessionManager: SessionManager = new KyuubiSessionManager() + + override def getOperationStatus( + operationHandle: OperationHandle, + maxWait: Option[Long]): OperationStatus = { + val operation = sessionManager.operationManager.getOperation(operationHandle) + val operationStatus = super.getOperationStatus(operationHandle, maxWait) + // Clients less than version 2.1 have no HIVE-4924 Patch, + // no queryTimeout parameter and no TIMEOUT status. + // When the server enables kyuubi.operation.query.timeout, + // this will cause the client of the lower version to get stuck. + // Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0), + // convert TIMEDOUT_STATE to CANCELED. + if (operationStatus.state == OperationState.TIMEOUT && operation.getSession.protocol.getValue <= + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue) { + operationStatus.copy(state = OperationState.CANCELED) + } else { + operationStatus + } + } }