From ff7aadc8babc3549f8f0030f32f99bae88056f49 Mon Sep 17 00:00:00 2001 From: Troy Chiu <114708546+troychiu@users.noreply.github.com> Date: Wed, 29 Nov 2023 23:09:04 -0800 Subject: [PATCH] Flyin - interactive debugging (#2000) * WIP Signed-off-by: troychiu * finish functionality Signed-off-by: troychiu * add hint Signed-off-by: troychiu * add util test Signed-off-by: troychiu * fix suggestions Signed-off-by: troychiu * lint Signed-off-by: troychiu * add launch.json Signed-off-by: troychiu * fix comment Signed-off-by: troychiu * change generated file name Signed-off-by: troychiu * fix suggestions Signed-off-by: troychiu --------- Signed-off-by: troychiu --- .../flytekitplugins/flyin/__init__.py | 3 + .../flytekitplugins/flyin/utils.py | 56 ++++++++++++++ .../flyin/vscode_lib/constants.py | 2 + .../flyin/vscode_lib/decorator.py | 73 +++++++++++++++++- .../flytekit-flyin/tests/test_flyin_plugin.py | 18 ++++- plugins/flytekit-flyin/tests/test_utils.py | 20 +++++ .../flytekit-flyin/tests/testdata/inputs.pb | Bin 0 -> 26 bytes plugins/flytekit-flyin/tests/testdata/task.py | 8 ++ 8 files changed, 173 insertions(+), 7 deletions(-) create mode 100644 plugins/flytekit-flyin/flytekitplugins/flyin/utils.py create mode 100644 plugins/flytekit-flyin/tests/test_utils.py create mode 100644 plugins/flytekit-flyin/tests/testdata/inputs.pb create mode 100644 plugins/flytekit-flyin/tests/testdata/task.py diff --git a/plugins/flytekit-flyin/flytekitplugins/flyin/__init__.py b/plugins/flytekit-flyin/flytekitplugins/flyin/__init__.py index 6898be3373..2e520451c4 100644 --- a/plugins/flytekit-flyin/flytekitplugins/flyin/__init__.py +++ b/plugins/flytekit-flyin/flytekitplugins/flyin/__init__.py @@ -11,8 +11,11 @@ VscodeConfig DEFAULT_CODE_SERVER_REMOTE_PATH DEFAULT_CODE_SERVER_EXTENSIONS + jupyter + get_task_inputs """ from .vscode_lib.decorator import vscode, VscodeConfig from .vscode_lib.constants import DEFAULT_CODE_SERVER_REMOTE_PATH, DEFAULT_CODE_SERVER_EXTENSIONS from .jupyter_lib.decorator import jupyter +from .utils import get_task_inputs diff --git a/plugins/flytekit-flyin/flytekitplugins/flyin/utils.py b/plugins/flytekit-flyin/flytekitplugins/flyin/utils.py new file mode 100644 index 0000000000..7e879583ff --- /dev/null +++ b/plugins/flytekit-flyin/flytekitplugins/flyin/utils.py @@ -0,0 +1,56 @@ +import importlib +import os +import sys + +from flytekit.core import utils +from flytekit.core.context_manager import FlyteContextManager +from flyteidl.core import literals_pb2 as _literals_pb2 +from flytekit.core.type_engine import TypeEngine +from flytekit.models import literals as _literal_models + + +def load_module_from_path(module_name, path): + """ + Imports a Python module from a specified file path. + + Args: + module_name (str): The name you want to assign to the imported module. + path (str): The file system path to the Python file (.py) that contains the module you want to import. + + Returns: + module: The imported module. + + Raises: + ImportError: If the module cannot be loaded from the provided path, an ImportError is raised. + """ + spec = importlib.util.spec_from_file_location(module_name, path) + if spec is not None: + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + else: + raise ImportError(f"Module at {path} could not be loaded") + + +def get_task_inputs(task_module_name, task_name, context_working_dir): + """ + Read task input data from inputs.pb for a specific task function and convert it into Python types and structures. + + Args: + task_module_name (str): The name of the Python module containing the task function. + task_name (str): The name of the task function within the module. + context_working_dir (str): The directory path where the input file and module file are located. + + Returns: + dict: A dictionary containing the task inputs, converted into Python types and structures. + """ + local_inputs_file = os.path.join(context_working_dir, "inputs.pb") + input_proto = utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) + idl_input_literals = _literal_models.LiteralMap.from_flyte_idl(input_proto) + task_module = load_module_from_path(task_module_name, os.path.join(context_working_dir, f"{task_module_name}.py")) + task_def = getattr(task_module, task_name) + native_inputs = TypeEngine.literal_map_to_kwargs( + FlyteContextManager(), idl_input_literals, task_def.python_interface.inputs + ) + return native_inputs diff --git a/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/constants.py b/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/constants.py index f8c971eaee..cf9022dce0 100644 --- a/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/constants.py +++ b/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/constants.py @@ -23,3 +23,5 @@ # The path is hardcoded by code-server # https://coder.com/docs/code-server/latest/FAQ#what-is-the-heartbeat-file HEARTBEAT_PATH = os.path.expanduser("~/.local/share/code-server/heartbeat") + +INTERACTIVE_DEBUGGING_FILE_NAME = "flyin_interactive_entrypoint.py" diff --git a/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py b/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py index 5281f62eb5..c7d2e56f09 100644 --- a/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py +++ b/plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py @@ -1,3 +1,4 @@ +import json import multiprocessing import os import shutil @@ -21,6 +22,7 @@ HEARTBEAT_CHECK_SECONDS, HEARTBEAT_PATH, MAX_IDLE_SECONDS, + INTERACTIVE_DEBUGGING_FILE_NAME, ) @@ -171,6 +173,64 @@ def download_vscode(vscode_config: VscodeConfig): execute_command(f"code-server --install-extension {p}") +def prepare_interactive_python(task_function): + """ + 1. Copy the original task file to the context working directory. This ensures that the inputs.pb can be loaded, as loading requires the original task interface. + By doing so, even if users change the task interface in their code, we can use the copied task file to load the inputs as native Python objects. + 2. Generate a Python script and a launch.json for users to debug interactively. + + Args: + task_function (function): User's task function. + """ + + context_working_dir = FlyteContextManager.current_context().execution_state.working_dir + + # Copy the user's Python file to the working directory. + shutil.copy(f"{task_function.__module__}.py", os.path.join(context_working_dir, f"{task_function.__module__}.py")) + + # Generate a Python script + task_module_name, task_name = task_function.__module__, task_function.__name__ + python_script = f"""# This file is auto-generated by flyin + +from {task_module_name} import {task_name} +from flytekitplugins.flyin import get_task_inputs + +if __name__ == "__main__": + inputs = get_task_inputs( + task_module_name="{task_module_name}", + task_name="{task_name}", + context_working_dir="{context_working_dir}", + ) + # You can modify the inputs! Ex: inputs['a'] = 5 + print({task_name}(**inputs)) +""" + + with open(INTERACTIVE_DEBUGGING_FILE_NAME, "w") as file: + file.write(python_script) + + # Generate a launch.json + launch_json = { + "version": "0.2.0", + "configurations": [ + { + "name": "Interactive Debugging", + "type": "python", + "request": "launch", + "program": os.path.join(os.getcwd(), INTERACTIVE_DEBUGGING_FILE_NAME), + "console": "integratedTerminal", + "justMyCode": True, + } + ], + } + + vscode_directory = ".vscode" + if not os.path.exists(vscode_directory): + os.makedirs(vscode_directory) + + with open(os.path.join(vscode_directory, "launch.json"), "w") as file: + json.dump(launch_json, file, indent=4) + + def vscode( _task_function: Optional[Callable] = None, max_idle_seconds: Optional[int] = MAX_IDLE_SECONDS, @@ -185,8 +245,9 @@ def vscode( vscode decorator modifies a container to run a VSCode server: 1. Overrides the user function with a VSCode setup function. 2. Download vscode server and extension from remote to local. - 3. Launches and monitors the VSCode server. - 4. Terminates if the server is idle for a set duration. + 3. Prepare the interactive debugging Python script and launch.json. + 4. Launches and monitors the VSCode server. + 5. Terminates if the server is idle for a set duration. Args: _task_function (function, optional): The user function to be decorated. Defaults to None. @@ -208,11 +269,12 @@ def wrapper(fn): @wraps(fn) def inner_wrapper(*args, **kwargs): + ctx = FlyteContextManager.current_context() logger = flytekit.current_context().logging # When user use pyflyte run or python to execute the task, we don't launch the VSCode server. # Only when user use pyflyte run --remote to submit the task to cluster, we launch the VSCode server. - if FlyteContextManager.current_context().execution_state.is_local_execution(): + if ctx.execution_state.is_local_execution(): return fn(*args, **kwargs) if run_task_first: @@ -231,7 +293,10 @@ def inner_wrapper(*args, **kwargs): # 1. Downloads the VSCode server from Internet to local. download_vscode(config) - # 2. Launches and monitors the VSCode server. + # 2. Prepare the interactive debugging Python script and launch.json. + prepare_interactive_python(fn) + + # 3. Launches and monitors the VSCode server. # Run the function in the background child_process = multiprocessing.Process( target=execute_command, kwargs={"cmd": f"code-server --bind-addr 0.0.0.0:{port} --auth none"} diff --git a/plugins/flytekit-flyin/tests/test_flyin_plugin.py b/plugins/flytekit-flyin/tests/test_flyin_plugin.py index 9d364e8fb3..fe244e4500 100644 --- a/plugins/flytekit-flyin/tests/test_flyin_plugin.py +++ b/plugins/flytekit-flyin/tests/test_flyin_plugin.py @@ -19,9 +19,12 @@ def mock_remote_execution(): @mock.patch("multiprocessing.Process") +@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.prepare_interactive_python") @mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler") @mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode") -def test_vscode_remote_execution(mock_download_vscode, mock_exit_handler, mock_process, mock_remote_execution): +def test_vscode_remote_execution( + mock_download_vscode, mock_exit_handler, mock_process, mock_prepare_interactive_python, mock_remote_execution +): @task @vscode def t(): @@ -35,12 +38,16 @@ def wf(): mock_download_vscode.assert_called_once() mock_process.assert_called_once() mock_exit_handler.assert_called_once() + mock_prepare_interactive_python.assert_called_once() @mock.patch("multiprocessing.Process") +@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.prepare_interactive_python") @mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler") @mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode") -def test_vscode_local_execution(mock_download_vscode, mock_exit_handler, mock_process, mock_local_execution): +def test_vscode_local_execution( + mock_download_vscode, mock_exit_handler, mock_process, mock_prepare_interactive_python, mock_local_execution +): @task @vscode def t(): @@ -54,6 +61,7 @@ def wf(): mock_download_vscode.assert_not_called() mock_process.assert_not_called() mock_exit_handler.assert_not_called() + mock_prepare_interactive_python.assert_not_called() def test_vscode_run_task_first_succeed(mock_remote_execution): @@ -72,9 +80,12 @@ def wf(a: int, b: int) -> int: @mock.patch("multiprocessing.Process") +@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.prepare_interactive_python") @mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler") @mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode") -def test_vscode_run_task_first_fail(mock_download_vscode, mock_exit_handler, mock_process, mock_remote_execution): +def test_vscode_run_task_first_fail( + mock_download_vscode, mock_exit_handler, mock_process, mock_prepare_interactive_python, mock_remote_execution +): @task @vscode def t(a: int, b: int): @@ -89,6 +100,7 @@ def wf(a: int, b: int): mock_download_vscode.assert_called_once() mock_process.assert_called_once() mock_exit_handler.assert_called_once() + mock_prepare_interactive_python.assert_called_once() @mock.patch("flytekitplugins.flyin.jupyter_lib.decorator.subprocess.Popen") diff --git a/plugins/flytekit-flyin/tests/test_utils.py b/plugins/flytekit-flyin/tests/test_utils.py new file mode 100644 index 0000000000..99e73deba5 --- /dev/null +++ b/plugins/flytekit-flyin/tests/test_utils.py @@ -0,0 +1,20 @@ +import os + +from flytekitplugins.flyin import get_task_inputs +from flytekitplugins.flyin.utils import load_module_from_path + + +def test_load_module_from_path(): + module_name = "task" + module_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "testdata", "task.py") + task_name = "t1" + task_module = load_module_from_path(module_name, module_path) + assert hasattr(task_module, task_name) + task_def = getattr(task_module, task_name) + assert task_def(a=6, b=3) == 2 + + +def test_get_task_inputs(): + test_working_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "testdata") + native_inputs = get_task_inputs("task", "t1", test_working_dir) + assert native_inputs == {"a": 30, "b": 0} diff --git a/plugins/flytekit-flyin/tests/testdata/inputs.pb b/plugins/flytekit-flyin/tests/testdata/inputs.pb new file mode 100644 index 0000000000000000000000000000000000000000..3cb31285034b048b3e34fe2d02a518c87a36513b GIT binary patch literal 26 acmd<$=3-0|V&h`rV&Y)n0&)_e9610MD*=`O literal 0 HcmV?d00001 diff --git a/plugins/flytekit-flyin/tests/testdata/task.py b/plugins/flytekit-flyin/tests/testdata/task.py new file mode 100644 index 0000000000..ba035fcc84 --- /dev/null +++ b/plugins/flytekit-flyin/tests/testdata/task.py @@ -0,0 +1,8 @@ +from flytekitplugins.flyin import vscode +from flytekit import task + + +@task() +@vscode(run_task_first=True) +def t1(a: int, b: int) -> int: + return a // b