Skip to content

Commit

Permalink
[KYUUBI #6583] Support to cancel Spark python operation
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

This pull request fixes #6583

## Background and Goals
Currently, kyuubi cannot perform operation level interrupts when executing Python code. When it is necessary to cancel an operation that has been running for a long time, the entire session needs to be interrupted, and the execution context will be lost, which is very unfriendly to users. Therefore, it is necessary to support operation level interrupts so that the execution context is not lost when the user terminal operates.

## Describe Your Solution 🔧

Refer to the implementation of Jupyter Notebook and let the Python process listen to Signel SIGINT semaphore, when receiving a signel When SIGINT, interrupt the current executing code and capture KeyboardInterrupt to treat it as cancelled

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6612 from yoock/features/support-operation-cancel.

Closes #6583

bf6334d [Wang, Fei] log error to do not break the cleanup process
ae7ad3f [Wang, Fei] comments
509627e [王龙] PySpark support operation cancel

Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: 王龙 <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
  • Loading branch information
turboFei and 王龙 committed Aug 20, 2024
1 parent 87c01e1 commit 705bb2a
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,53 +471,68 @@ def main():
try:

while True:
line = sys_stdin.readline()

if line == "":
break
elif line == "\n":
continue

try:
content = json.loads(line)
except ValueError:
continue

if content["cmd"] == "exit_worker":
break

result = execute_request(content)

try:
result = json.dumps(result)
except ValueError:
line = sys_stdin.readline()

if line == "":
break
elif line == "\n":
continue

try:
content = json.loads(line)
except ValueError:
continue

if content["cmd"] == "exit_worker":
break

result = execute_request(content)

try:
result = json.dumps(result)
except ValueError:
result = json.dumps(
{
"msg_type": "inspect_reply",
"content": {
"status": "error",
"ename": "ValueError",
"evalue": "cannot json-ify %s" % result,
"traceback": [],
},
}
)
except Exception:
exc_type, exc_value, tb = sys.exc_info()
result = json.dumps(
{
"msg_type": "inspect_reply",
"content": {
"status": "error",
"ename": str(exc_type.__name__),
"evalue": "cannot json-ify %s: %s"
% (result, str(exc_value)),
"traceback": [],
},
}
)

print(result, file=sys_stdout)
except KeyboardInterrupt:
result = json.dumps(
{
"msg_type": "inspect_reply",
"content": {
"status": "error",
"ename": "ValueError",
"evalue": "cannot json-ify %s" % result,
"status": "canceled",
"ename": "KeyboardInterrupt",
"evalue": "execution interrupted by user",
"traceback": [],
},
}
)
except Exception:
exc_type, exc_value, tb = sys.exc_info()
result = json.dumps(
{
"msg_type": "inspect_reply",
"content": {
"status": "error",
"ename": str(exc_type.__name__),
"evalue": "cannot json-ify %s: %s"
% (result, str(exc_value)),
"traceback": [],
},
}
)

print(result, file=sys_stdout)
print(result, file=sys_stdout)
print("execution interrupted by user: " + line, file=sys_stderr)
sys_stdout.flush()
clearOutputs()
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYU
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.util.JsonUtils
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.reflect.DynFields

class ExecutePython(
session: Session,
Expand Down Expand Up @@ -171,6 +173,14 @@ class ExecutePython(
}
}
}

override def cleanup(targetState: OperationState): Unit = {
if (!isTerminalState(state)) {
info(s"Staring to cancel python code: $statement")
worker.interrupt()
}
super.cleanup(targetState)
}
}

case class SessionPythonWorker(
Expand Down Expand Up @@ -225,6 +235,21 @@ case class SessionPythonWorker(
pythonWorkerMonitor.interrupt()
workerProcess.destroy()
}

def interrupt(): Unit = {
val pid = DynFields.builder()
.hiddenImpl(workerProcess.getClass, "pid")
.build[java.lang.Integer](workerProcess)
.get()
// sends a SIGINT (interrupt) signal, similar to Ctrl-C
val builder = new ProcessBuilder(Seq("kill", "-2", pid.toString).asJava)
val process = builder.start()
val exitCode = process.waitFor()
process.destroy()
if (exitCode != 0) {
error(s"Process `${builder.command().asScala.mkString(" ")}` exit with value: $exitCode")
}
}
}

object ExecutePython extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,41 @@ class PySparkTests extends WithKyuubiServer with HiveJDBCTestHelper {
}
}

test("Support to cancel Spark python operation") {
checkPythonRuntimeAndVersion()
withMultipleConnectionJdbcStatement()({ stmt =>
val statement = stmt.asInstanceOf[KyuubiStatement]
statement.executeQuery("SET kyuubi.operation.language=PYTHON")
val code1 =
"""
|i = 0
|i
|""".stripMargin
val resultSet1 = statement.executeQuery(code1)
assert(resultSet1.next())
assert(resultSet1.getString("status") === "ok")
assert(resultSet1.getString("output") === "0")
val code2 =
"""
|import time
|while True:
| i +=1
| time.sleep(1)
|""".stripMargin
statement.executeAsync(code2)
statement.cancel()

val code3 =
"""
|i
|""".stripMargin
val resultSet3 = statement.executeQuery(code3)
assert(resultSet3.next())
assert(resultSet3.getString("status") === "ok")
assert(resultSet3.getString("output").toInt > 0)
})
}

private def runPySparkTest(
pyCode: String,
output: String): Unit = {
Expand Down

0 comments on commit 705bb2a

Please sign in to comment.