Skip to content

Commit

Permalink
Run task first and detect local execution (#1997)
Browse files Browse the repository at this point in the history
* Run task first and detect local execution

Signed-off-by: troychiu <[email protected]>

* fix log

Signed-off-by: troychiu <[email protected]>

* fix log

Signed-off-by: troychiu <[email protected]>

* lint

Signed-off-by: troychiu <[email protected]>

---------

Signed-off-by: troychiu <[email protected]>
  • Loading branch information
troychiu authored Nov 27, 2023
1 parent 690650c commit 67b3eef
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Callable, List, Optional

import fsspec
from flytekit.core.context_manager import FlyteContextManager
import flytekit
from .constants import (
DEFAULT_CODE_SERVER_DIR_NAME,
Expand Down Expand Up @@ -175,6 +176,7 @@ def vscode(
max_idle_seconds: Optional[int] = MAX_IDLE_SECONDS,
port: Optional[int] = 8080,
enable: Optional[bool] = True,
run_task_first: Optional[bool] = False,
pre_execute: Optional[Callable] = None,
post_execute: Optional[Callable] = None,
config: Optional[VscodeConfig] = None,
Expand All @@ -191,6 +193,7 @@ def vscode(
max_idle_seconds (int, optional): The duration in seconds to live after no activity detected.
port (int, optional): The port to be used by the VSCode server. Defaults to 8080.
enable (bool, optional): Whether to enable the VSCode decorator. Defaults to True.
run_task_first (bool, optional): Executes the user's task first when True. Launches the VSCode server only if the user's task fails. Defaults to False.
pre_execute (function, optional): The function to be executed before the vscode setup function.
post_execute (function, optional): The function to be executed before the vscode is self-terminated.
config (VscodeConfig, optional): VSCode config contains default URLs of the VSCode server and extension remote paths.
Expand All @@ -207,6 +210,19 @@ def wrapper(fn):
def inner_wrapper(*args, **kwargs):
logger = flytekit.current_context().logging

# When user use pyflyte run or python to execute the task, we don't launch the VSCode server.
# Only when user use pyflyte run --remote to submit the task to cluster, we launch the VSCode server.
if FlyteContextManager.current_context().execution_state.is_local_execution():
return fn(*args, **kwargs)

if run_task_first:
logger.info("Run user's task first")
try:
return fn(*args, **kwargs)
except Exception as e:
logger.error(f"Task Error: {e}")
logger.info("Launching VSCode server")

# 0. Executes the pre_execute function if provided.
if pre_execute is not None:
pre_execute()
Expand Down
73 changes: 70 additions & 3 deletions plugins/flytekit-flyin/tests/test_flyin_plugin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
import mock
from flytekitplugins.flyin import vscode
from flytekitplugins.flyin import jupyter
import pytest

from flytekitplugins.flyin import vscode, jupyter
from flytekit import task, workflow
from flytekit.core.context_manager import ExecutionState


@pytest.fixture
def mock_local_execution():
with mock.patch.object(ExecutionState, "is_local_execution", return_value=True) as mock_func:
yield mock_func


@pytest.fixture
def mock_remote_execution():
with mock.patch.object(ExecutionState, "is_local_execution", return_value=False) as mock_func:
yield mock_func


@mock.patch("multiprocessing.Process")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode")
def test_vscode(mock_download_vscode, mock_exit_handler, mock_process):
def test_vscode_remote_execution(mock_download_vscode, mock_exit_handler, mock_process, mock_remote_execution):
@task
@vscode
def t():
Expand All @@ -24,6 +37,60 @@ def wf():
mock_exit_handler.assert_called_once()


@mock.patch("multiprocessing.Process")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode")
def test_vscode_local_execution(mock_download_vscode, mock_exit_handler, mock_process, mock_local_execution):
@task
@vscode
def t():
return

@workflow
def wf():
t()

wf()
mock_download_vscode.assert_not_called()
mock_process.assert_not_called()
mock_exit_handler.assert_not_called()


def test_vscode_run_task_first_succeed(mock_remote_execution):
@task
@vscode(run_task_first=True)
def t(a: int, b: int) -> int:
return a + b

@workflow
def wf(a: int, b: int) -> int:
out = t(a=a, b=b)
return out

res = wf(a=10, b=5)
assert res == 15


@mock.patch("multiprocessing.Process")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.exit_handler")
@mock.patch("flytekitplugins.flyin.vscode_lib.decorator.download_vscode")
def test_vscode_run_task_first_fail(mock_download_vscode, mock_exit_handler, mock_process, mock_remote_execution):
@task
@vscode
def t(a: int, b: int):
dummy = a // b # noqa: F841
return

@workflow
def wf(a: int, b: int):
t(a=a, b=b)

wf(a=10, b=0)
mock_download_vscode.assert_called_once()
mock_process.assert_called_once()
mock_exit_handler.assert_called_once()


@mock.patch("flytekitplugins.flyin.jupyter_lib.decorator.subprocess.Popen")
@mock.patch("flytekitplugins.flyin.jupyter_lib.decorator.sys.exit")
def test_jupyter(mock_exit, mock_popen):
Expand Down

0 comments on commit 67b3eef

Please sign in to comment.