Skip to content

Commit

Permalink
convert operation status TIMEDOUT to CANCELED in KyuubiBackendService
Browse files Browse the repository at this point in the history
  • Loading branch information
lsm1 committed Nov 12, 2024
1 parent dddb037 commit 3d0a8d6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
val action: Runnable = () =>
// Clients less than version 2.1 have no HIVE-4924 Patch,
// no queryTimeout parameter and no TIMEOUT status.
// When the server enables kyuubi.operation.query.timeout,
// this will cause the client of the lower version to get stuck.
// Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0),
// convert TIMEDOUT_STATE to CANCELED.
if (isHive21OrLower) {
cleanup(OperationState.CANCELED)
} else {
cleanup(OperationState.TIMEOUT)
}
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
Expand Down Expand Up @@ -286,8 +275,4 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
}
}
}

protected def isHive21OrLower: Boolean = {
getProtocolVersion.getValue <= TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,6 @@ class ExecuteStatement(
case CANCELED_STATE =>
setState(OperationState.CANCELED)

case TIMEDOUT_STATE
// Clients less than version 2.1 have no HIVE-4924 Patch,
// no queryTimeout parameter and no TIMEOUT status.
// When the server enables kyuubi.operation.query.timeout,
// this will cause the client of the lower version to get stuck.
// Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0),
// convert TIMEDOUT_STATE to CANCELED.
if isHive21OrLower => setState(OperationState.CANCELED)

case TIMEDOUT_STATE =>
setState(OperationState.TIMEOUT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,33 @@

package org.apache.kyuubi.server

import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationStatus}
import org.apache.kyuubi.service.AbstractBackendService
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionManager}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion

class KyuubiBackendService(name: String) extends AbstractBackendService(name) {

def this() = this(classOf[KyuubiBackendService].getSimpleName)

override val sessionManager: SessionManager = new KyuubiSessionManager()

override def getOperationStatus(
operationHandle: OperationHandle,
maxWait: Option[Long]): OperationStatus = {
val operation = sessionManager.operationManager.getOperation(operationHandle)
val operationStatus = super.getOperationStatus(operationHandle, maxWait)
// Clients less than version 2.1 have no HIVE-4924 Patch,
// no queryTimeout parameter and no TIMEOUT status.
// When the server enables kyuubi.operation.query.timeout,
// this will cause the client of the lower version to get stuck.
// Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0),
// convert TIMEDOUT_STATE to CANCELED.
if (operationStatus.state == OperationState.TIMEOUT && operation.getSession.protocol.getValue <=
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue) {
operationStatus.copy(state = OperationState.CANCELED)
} else {
operationStatus
}
}
}

0 comments on commit 3d0a8d6

Please sign in to comment.