Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job launcher #3049

Merged
merged 35 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
21b5092
WIP: implement the ProcessJobLaunch.
yhwen Oct 10, 2024
4c15158
WIP: working ProcessJobLauncher impelentation.
yhwen Oct 14, 2024
610f460
Added k8s_launcher implementation.
yhwen Oct 14, 2024
048df0d
Added logger for k8s_launcher.py
yhwen Oct 14, 2024
4296f96
Added empty launcher_map check.
yhwen Oct 14, 2024
0dfae5e
Added more config for K8sJobLauncher.
yhwen Oct 16, 2024
044b769
renamed RunProcessKey.JOB_LAUNCHER.
yhwen Oct 16, 2024
02c71e7
Separated out the JobHandleSpec.
yhwen Oct 17, 2024
b14dc75
Support for the launcher deploy image.
yhwen Oct 18, 2024
c88e00b
Changed the _get_job_launcher logic.
yhwen Oct 18, 2024
a7721ed
add more handled for the deployment_map change.
yhwen Oct 18, 2024
9a7e89e
Added logging for job launcher.
yhwen Oct 18, 2024
0c57c7b
Fixed extract_job_image usage.
yhwen Oct 18, 2024
68c025d
Added job_meta for launch_job.
yhwen Oct 18, 2024
24469b4
codestyle fix.
yhwen Oct 18, 2024
1d2fece
refactoried.
yhwen Oct 19, 2024
ceaeb8e
extract to use constants.
yhwen Oct 21, 2024
b974d9a
Change the JobLauncherSpec API signiture.
yhwen Oct 22, 2024
765357b
Added can_launch() to JobLauncherSpec.
yhwen Oct 22, 2024
1d7ebc7
refactor.
yhwen Oct 23, 2024
5103592
Merged from main.
yhwen Oct 23, 2024
dc94e01
removed duplicate const.
yhwen Oct 23, 2024
7f1a9d9
removed no use import.
yhwen Oct 23, 2024
ae8a2dc
changed to raise NotImplementedError().
yhwen Oct 24, 2024
f923e97
refactored.
yhwen Oct 25, 2024
037c7bb
Changed to use event to get the job launcher.
yhwen Oct 25, 2024
e6123b1
updated K8sJobLauncher.
yhwen Oct 25, 2024
90f8687
codestyle fix.
yhwen Oct 25, 2024
1aa8d39
removed no use import.
yhwen Oct 25, 2024
8153dd3
JobReturnCode standard.
yhwen Oct 28, 2024
baecfff
fixed the _get_job_launcher() condition logic.
yhwen Oct 29, 2024
a84d7a3
Merge branch 'main' into job_launcher
YuanTingHsieh Oct 30, 2024
1d94eb4
Merge branch 'main' into job_launcher
YuanTingHsieh Oct 31, 2024
b51ef76
fixed the missing client_name in the workspace object.
yhwen Nov 1, 2024
6db64b3
Merge branch 'main' into job_launcher
YuanTingHsieh Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nvflare/apis/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ class EventType(object):

AUTHORIZE_COMMAND_CHECK = "_authorize_command_check"
BEFORE_BUILD_COMPONENT = "_before_build_component"
GET_JOB_LAUNCHER = "_get_job_launcher"
8 changes: 7 additions & 1 deletion nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class FLContextKey(object):
AUTHORIZATION_REASON = "_authorization_reason"
DISCONNECTED_CLIENT_NAME = "_disconnected_client_name"
RECONNECTED_CLIENT_NAME = "_reconnected_client_name"
SITE_OBJ = "_site_obj_"
JOB_LAUNCHER = "_job_launcher"

CLIENT_REGISTER_DATA = "_client_register_data"
SECURITY_ITEMS = "_security_items"
Expand Down Expand Up @@ -324,7 +326,7 @@ class SnapshotKey(object):
class RunProcessKey(object):
LISTEN_PORT = "_listen_port"
CONNECTION = "_conn"
CHILD_PROCESS = "_child_process"
JOB_HANDLE = "_job_launcher"
STATUS = "_status"
JOB_ID = "_job_id"
PARTICIPANTS = "_participants"
Expand Down Expand Up @@ -356,6 +358,10 @@ class JobConstants:
CLIENT_JOB_CONFIG = "config_fed_client.json"
META_FILE = "meta.json"
META = "meta"
SITES = "sites"
JOB_IMAGE = "image"
JOB_ID = "job_id"
JOB_LAUNCHER = "job_launcher"


class WorkspaceConstants:
Expand Down
61 changes: 61 additions & 0 deletions nvflare/apis/job_launcher_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod

from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_context import FLContext

yhwen marked this conversation as resolved.
Show resolved Hide resolved

class JobHandleSpec:
@abstractmethod
def terminate(self):
"""To terminate the job run.

Returns: the job run return code.

"""
raise NotImplementedError()

@abstractmethod
def poll(self):
yhwen marked this conversation as resolved.
Show resolved Hide resolved
"""To get the return code of the job run.

Returns: return_code

"""
raise NotImplementedError()

@abstractmethod
def wait(self):
"""To wait until the job run complete.

Returns: returns until the job run complete.

"""
raise NotImplementedError()


class JobLauncherSpec(FLComponent):
@abstractmethod
def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
"""To launch a job run.

Args:
job_meta: job meta data
fl_ctx: FLContext

Returns: boolean to indicates the job launch success or fail.

"""
raise NotImplementedError()
4 changes: 2 additions & 2 deletions nvflare/apis/server_engine_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ def restore_components(self, snapshot: RunSnapshot, fl_ctx: FLContext):
pass

@abstractmethod
def start_client_job(self, job_id, client_sites, fl_ctx: FLContext):
def start_client_job(self, job, client_sites, fl_ctx: FLContext):
"""To send the start client run commands to the clients

Args:
client_sites: client sites
job_id: job_id
job: job object
fl_ctx: FLContext

Returns:
Expand Down
13 changes: 13 additions & 0 deletions nvflare/app_common/job_launcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
120 changes: 120 additions & 0 deletions nvflare/app_common/job_launcher/process_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shlex
import subprocess
import sys

from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec
from nvflare.apis.workspace import Workspace
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path, extract_job_image


class ProcessHandle(JobHandleSpec):
def __init__(self, process):
super().__init__()

self.process = process
self.logger = logging.getLogger(self.__class__.__name__)

def terminate(self):
if self.process:
try:
os.killpg(os.getpgid(self.process.pid), 9)
self.logger.debug("kill signal sent")
except:
pass

self.process.terminate()

def poll(self):
if self.process:
return self.process.poll()
else:
return None

def wait(self):
if self.process:
self.process.wait()


class ProcessJobLauncher(JobLauncherSpec):
def __init__(self):
super().__init__()

self.logger = logging.getLogger(self.__class__.__name__)

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:

new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = job_meta.get(JobMetaKey.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t
command = (
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m "
+ args.workspace
+ " -w "
+ (workspace_obj.get_startup_kit_dir())
+ " -t "
+ client.token
+ " -d "
+ client.ssid
+ " -n "
+ job_id
+ " -c "
+ client.client_name
+ " -p "
+ str(client.cell.get_internal_listener_url())
+ " -g "
+ service.get("target")
+ " -scheme "
+ service.get("scheme", "grpc")
+ " -s fed_client.json "
" --set" + command_options + " print_conf=True"
)
# use os.setsid to create new process group ID
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)

self.logger.info("Worker child process ID: {}".format(process.pid))

return ProcessHandle(process)

def handle_event(self, event_type: str, fl_ctx: FLContext):
if event_type == EventType.GET_JOB_LAUNCHER:
job_meta = fl_ctx.get_prop(FLContextKey.JOB_META)
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if not job_image:
job_launcher: list = fl_ctx.get_prop(FLContextKey.JOB_LAUNCHER, [])
job_launcher.append(self)
fl_ctx.set_prop(FLContextKey.JOB_LAUNCHER, job_launcher, private=True, sticky=False)
5 changes: 4 additions & 1 deletion nvflare/app_common/job_schedulers/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec
from nvflare.apis.job_scheduler_spec import DispatchInfo, JobSchedulerSpec
from nvflare.apis.server_engine_spec import ServerEngineSpec
from nvflare.private.fed.utils.fed_utils import extract_participants

SCHEDULE_RESULT_OK = 0 # the job is scheduled
SCHEDULE_RESULT_NO_RESOURCE = 1 # job is not scheduled due to lack of resources
Expand Down Expand Up @@ -109,7 +110,9 @@ def _try_job(self, job: Job, fl_ctx: FLContext) -> (int, Optional[Dict[str, Disp
applicable_sites = []
sites_to_app = {}
for app_name in job.deploy_map:
for site_name in job.deploy_map[app_name]:
deployments = job.deploy_map[app_name]
deployments = extract_participants(deployments)
for site_name in deployments:
if site_name.upper() == ALL_SITES:
# deploy_map: {"app_name": ["ALL_SITES"]} will be treated as deploying to all online clients
applicable_sites = online_site_names
Expand Down
13 changes: 13 additions & 0 deletions nvflare/app_opt/job_launcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
yhwen marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading
Loading