Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the compatibility of queryTimeout in more version clients #6787

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,18 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
val action: Runnable = () =>
// Clients less than version 2.1 have no HIVE-4924 Patch,
// no queryTimeout parameter and no TIMEOUT status.
// When the server enables kyuubi.operation.query.timeout,
// this will cause the client of the lower version to get stuck.
// Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0),
// convert TIMEDOUT_STATE to CANCELED.
if (isHive21OrLower) {
cleanup(OperationState.CANCELED)
} else {
cleanup(OperationState.TIMEOUT)
}
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
Expand Down Expand Up @@ -275,4 +286,8 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
}
}
}

protected def isHive21OrLower: Boolean = {
getProtocolVersion.getValue <= TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetOperationStatusResp, TOperationState, TProtocolVersion}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetOperationStatusResp, TOperationState}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState._

class ExecuteStatement(
Expand Down Expand Up @@ -124,9 +124,7 @@ class ExecuteStatement(
// this will cause the client of the lower version to get stuck.
// Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0),
// convert TIMEDOUT_STATE to CANCELED.
if getProtocolVersion.getValue <=
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue =>
setState(OperationState.CANCELED)
if isHive21OrLower => setState(OperationState.CANCELED)

case TIMEDOUT_STATE =>
setState(OperationState.TIMEOUT)
Expand Down
Loading