Skip to content

Commit

Permalink
Flyin - interactive debugging (#2000)
Browse files Browse the repository at this point in the history
* WIP

Signed-off-by: troychiu <[email protected]>

* finish functionality

Signed-off-by: troychiu <[email protected]>

* add hint

Signed-off-by: troychiu <[email protected]>

* add util test

Signed-off-by: troychiu <[email protected]>

* fix suggestions

Signed-off-by: troychiu <[email protected]>

* lint

Signed-off-by: troychiu <[email protected]>

* add launch.json

Signed-off-by: troychiu <[email protected]>

* fix comment

Signed-off-by: troychiu <[email protected]>

* change generated file name

Signed-off-by: troychiu <[email protected]>

* fix suggestions

Signed-off-by: troychiu <[email protected]>

---------

Signed-off-by: troychiu <[email protected]>
  • Loading branch information
troychiu authored Nov 30, 2023
1 parent eb80682 commit ff7aadc
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 7 deletions.
3 changes: 3 additions & 0 deletions plugins/flytekit-flyin/flytekitplugins/flyin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
VscodeConfig
DEFAULT_CODE_SERVER_REMOTE_PATH
DEFAULT_CODE_SERVER_EXTENSIONS
jupyter
get_task_inputs
"""

from .vscode_lib.decorator import vscode, VscodeConfig
from .vscode_lib.constants import DEFAULT_CODE_SERVER_REMOTE_PATH, DEFAULT_CODE_SERVER_EXTENSIONS
from .jupyter_lib.decorator import jupyter
from .utils import get_task_inputs
56 changes: 56 additions & 0 deletions plugins/flytekit-flyin/flytekitplugins/flyin/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import importlib
import os
import sys

from flytekit.core import utils
from flytekit.core.context_manager import FlyteContextManager
from flyteidl.core import literals_pb2 as _literals_pb2
from flytekit.core.type_engine import TypeEngine
from flytekit.models import literals as _literal_models


def load_module_from_path(module_name, path):
"""
Imports a Python module from a specified file path.
Args:
module_name (str): The name you want to assign to the imported module.
path (str): The file system path to the Python file (.py) that contains the module you want to import.
Returns:
module: The imported module.
Raises:
ImportError: If the module cannot be loaded from the provided path, an ImportError is raised.
"""
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is not None:
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
else:
raise ImportError(f"Module at {path} could not be loaded")


def get_task_inputs(task_module_name, task_name, context_working_dir):
"""
Read task input data from inputs.pb for a specific task function and convert it into Python types and structures.
Args:
task_module_name (str): The name of the Python module containing the task function.
task_name (str): The name of the task function within the module.
context_working_dir (str): The directory path where the input file and module file are located.
Returns:
dict: A dictionary containing the task inputs, converted into Python types and structures.
"""
local_inputs_file = os.path.join(context_working_dir, "inputs.pb")
input_proto = utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file)
idl_input_literals = _literal_models.LiteralMap.from_flyte_idl(input_proto)
task_module = load_module_from_path(task_module_name, os.path.join(context_working_dir, f"{task_module_name}.py"))
task_def = getattr(task_module, task_name)
native_inputs = TypeEngine.literal_map_to_kwargs(
FlyteContextManager(), idl_input_literals, task_def.python_interface.inputs
)
return native_inputs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
# The path is hardcoded by code-server
# https://coder.com/docs/code-server/latest/FAQ#what-is-the-heartbeat-file
HEARTBEAT_PATH = os.path.expanduser("~/.local/share/code-server/heartbeat")

INTERACTIVE_DEBUGGING_FILE_NAME = "flyin_interactive_entrypoint.py"
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import multiprocessing
import os
import shutil
Expand All @@ -21,6 +22,7 @@
HEARTBEAT_CHECK_SECONDS,
HEARTBEAT_PATH,
MAX_IDLE_SECONDS,
INTERACTIVE_DEBUGGING_FILE_NAME,
)


Expand Down Expand Up @@ -171,6 +173,64 @@ def download_vscode(vscode_config: VscodeConfig):
execute_command(f"code-server --install-extension {p}")


def prepare_interactive_python(task_function):
"""
1. Copy the original task file to the context working directory. This ensures that the inputs.pb can be loaded, as loading requires the original task interface.
By doing so, even if users change the task interface in their code, we can use the copied task file to load the inputs as native Python objects.
2. Generate a Python script and a launch.json for users to debug interactively.
Args:
task_function (function): User's task function.
"""

context_working_dir = FlyteContextManager.current_context().execution_state.working_dir

# Copy the user's Python file to the working directory.
shutil.copy(f"{task_function.__module__}.py", os.path.join(context_working_dir, f"{task_function.__module__}.py"))

# Generate a Python script
task_module_name, task_name = task_function.__module__, task_function.__name__
python_script = f"""# This file is auto-generated by flyin
from {task_module_name} import {task_name}
from flytekitplugins.flyin import get_task_inputs
if __name__ == "__main__":
inputs = get_task_inputs(
task_module_name="{task_module_name}",
task_name="{task_name}",
context_working_dir="{context_working_dir}",
)
# You can modify the inputs! Ex: inputs['a'] = 5
print({task_name}(**inputs))
"""

with open(INTERACTIVE_DEBUGGING_FILE_NAME, "w") as file:
file.write(python_script)

# Generate a launch.json
launch_json = {
"version": "0.2.0",
"configurations": [
{
"name": "Interactive Debugging",
"type": "python",
"request": "launch",
"program": os.path.join(os.getcwd(), INTERACTIVE_DEBUGGING_FILE_NAME),
"console": "integratedTerminal",
"justMyCode": True,
}
],
}

vscode_directory = ".vscode"
if not os.path.exists(vscode_directory):
os.makedirs(vscode_directory)

with open(os.path.join(vscode_directory, "launch.json"), "w") as file:
json.dump(launch_json, file, indent=4)


def vscode(
_task_function: Optional[Callable] = None,
max_idle_seconds: Optional[int] = MAX_IDLE_SECONDS,
Expand All @@ -185,8 +245,9 @@ def vscode(
vscode decorator modifies a container to run a VSCode server:
1. Overrides the user function with a VSCode setup function.
2. Download vscode server and extension from remote to local.
3. Launches and monitors the VSCode server.
4. Terminates if the server is idle for a set duration.
3. Prepare the interactive debugging Python script and launch.json.
4. Launches and monitors the VSCode server.
5. Terminates if the server is idle for a set duration.
Args:
_task_function (function, optional): The user function to be decorated. Defaults to None.
Expand All @@ -208,11 +269,12 @@ def wrapper(fn):

@wraps(fn)
def inner_wrapper(*args, **kwargs):
ctx = FlyteContextManager.current_context()
logger = flytekit.current_context().logging

# When user use pyflyte run or python to execute the task, we don't launch the VSCode server.
# Only when user use pyflyte run --remote to submit the task to cluster, we launch the VSCode server.
if FlyteContextManager.current_context().execution_state.is_local_execution():
if ctx.execution_state.is_local_execution():
return fn(*args, **kwargs)

if run_task_first:
Expand All @@ -231,7 +293,10 @@ def inner_wrapper(*args, **kwargs):
# 1. Downloads the VSCode server from Internet to local.
download_vscode(config)

# 2. Launches and monitors the VSCode server.
# 2. Prepare the interactive debugging Python script and launch.json.
prepare_interactive_python(fn)

# 3. Launches and monitors the VSCode server.
# Run the function in the background
child_process = multiprocessing.Process(
target=execute_command, kwargs={"cmd": f"code-server --bind-addr 0.0.0.0:{port} --auth none"}
Expand Down
18 changes: 15 additions & 3 deletions plugins/flytekit-flyin/tests/test_flyin_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ def mock_remote_execution():


@mock.patch("multiprocessing.Process")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.prepare_interactive_python")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode")
def test_vscode_remote_execution(mock_download_vscode, mock_exit_handler, mock_process, mock_remote_execution):
def test_vscode_remote_execution(
mock_download_vscode, mock_exit_handler, mock_process, mock_prepare_interactive_python, mock_remote_execution
):
@task
@vscode
def t():
Expand All @@ -35,12 +38,16 @@ def wf():
mock_download_vscode.assert_called_once()
mock_process.assert_called_once()
mock_exit_handler.assert_called_once()
mock_prepare_interactive_python.assert_called_once()


@mock.patch("multiprocessing.Process")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.prepare_interactive_python")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode")
def test_vscode_local_execution(mock_download_vscode, mock_exit_handler, mock_process, mock_local_execution):
def test_vscode_local_execution(
mock_download_vscode, mock_exit_handler, mock_process, mock_prepare_interactive_python, mock_local_execution
):
@task
@vscode
def t():
Expand All @@ -54,6 +61,7 @@ def wf():
mock_download_vscode.assert_not_called()
mock_process.assert_not_called()
mock_exit_handler.assert_not_called()
mock_prepare_interactive_python.assert_not_called()


def test_vscode_run_task_first_succeed(mock_remote_execution):
Expand All @@ -72,9 +80,12 @@ def wf(a: int, b: int) -> int:


@mock.patch("multiprocessing.Process")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.prepare_interactive_python")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode")
def test_vscode_run_task_first_fail(mock_download_vscode, mock_exit_handler, mock_process, mock_remote_execution):
def test_vscode_run_task_first_fail(
mock_download_vscode, mock_exit_handler, mock_process, mock_prepare_interactive_python, mock_remote_execution
):
@task
@vscode
def t(a: int, b: int):
Expand All @@ -89,6 +100,7 @@ def wf(a: int, b: int):
mock_download_vscode.assert_called_once()
mock_process.assert_called_once()
mock_exit_handler.assert_called_once()
mock_prepare_interactive_python.assert_called_once()


@mock.patch("flytekitplugins.flyin.jupyter_lib.decorator.subprocess.Popen")
Expand Down
20 changes: 20 additions & 0 deletions plugins/flytekit-flyin/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os

from flytekitplugins.flyin import get_task_inputs
from flytekitplugins.flyin.utils import load_module_from_path


def test_load_module_from_path():
module_name = "task"
module_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "testdata", "task.py")
task_name = "t1"
task_module = load_module_from_path(module_name, module_path)
assert hasattr(task_module, task_name)
task_def = getattr(task_module, task_name)
assert task_def(a=6, b=3) == 2


def test_get_task_inputs():
test_working_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "testdata")
native_inputs = get_task_inputs("task", "t1", test_working_dir)
assert native_inputs == {"a": 30, "b": 0}
Binary file added plugins/flytekit-flyin/tests/testdata/inputs.pb
Binary file not shown.
8 changes: 8 additions & 0 deletions plugins/flytekit-flyin/tests/testdata/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from flytekitplugins.flyin import vscode
from flytekit import task


@task()
@vscode(run_task_first=True)
def t1(a: int, b: int) -> int:
return a // b

0 comments on commit ff7aadc

Please sign in to comment.