Skip to content

Commit

Permalink
Merge pull request #173 from jbernal0019/master
Browse files Browse the repository at this point in the history
Add DELETE request handler to explicitly remove a job from the compute cluster
  • Loading branch information
jbernal0019 authored Apr 28, 2021
2 parents 35eb25b + 637f31e commit 9655948
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 36 deletions.
14 changes: 11 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,26 @@ Kubernetes:
$> kubectl exec $pman_dev -- touch /home/localuser/storeBase/key-chris-jid-1/incoming/test.txt
Using `HTTPie <https://httpie.org/>` to run a container
Using `HTTPie <https://httpie.org/>`_ to run a job

.. code-block:: bash
$> http POST http://localhost:30010/api/v1/ cmd_args='--saveinputmeta --saveoutputmeta --dir cube/uploads' cmd_path_flags='--dir' auid=cube number_of_workers=1 cpu_limit=1000 memory_limit=200 gpu_limit=0 image=fnndsc/pl-dircopy selfexec=dircopy selfpath=/usr/local/bin execshell=/usr/local/bin/python type=fs jid=chris-jid-1
Get the result
Get job status

.. code-block:: bash
$> http http://localhost:30010/api/v1/chris-jid-1/
Keep making the previous ``GET`` request until the ``"status"`` descriptor in the response becomes ``"finishedSuccessfully"``:

Delete the job

.. code-block:: bash
$> http DELETE http://localhost:30010/api/v1/chris-jid-1/
``pman`` usage
===============
Expand Down
2 changes: 1 addition & 1 deletion pman/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Config:
"""
DEBUG = False
TESTING = False
SERVER_VERSION = "3.0.0"
SERVER_VERSION = "3.1.0"

def __init__(self):
# Environment variables
Expand Down
77 changes: 47 additions & 30 deletions pman/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,25 @@ def post(self):
share_dir = os.path.join(storebase, 'key-' + job_id)

try:
swarm_mgr = SwarmManager(app.config)
compute_mgr = SwarmManager(app.config)
except RuntimeError as e:
abort(503, message=str(e))

logger.info(f'Scheduling job {job_id} on the Swarm cluster')
try:
service = swarm_mgr.schedule(compute_data['image'], cmd, job_id, 'none',
share_dir)
service = compute_mgr.schedule(compute_data['image'], cmd, job_id, 'none',
share_dir)
except docker.errors.APIError as e:
logger.error(f'Error from Swarm while scheduling job {job_id}, detail: '
f'{str(e)}')
status_code = e.response.status_code
status_code = 503 if status_code == 500 else status_code
abort(status_code, message=str(e))

job_info = swarm_mgr.get_service_task_info(service)
job_info = compute_mgr.get_service_task_info(service)
logger.info(f'Successful job {job_id} schedule response from Swarm: '
f'{job_info}')
job_logs = swarm_mgr.get_service_logs(service)
job_logs = compute_mgr.get_service_logs(service)

return {
'jid': job_id,
Expand All @@ -108,7 +108,7 @@ def post(self):
'exitcode': job_info['exitcode'],
'pid': job_info['pid'],
'logs': job_logs
}
}, 201

def build_app_cmd(self, compute_data):
"""
Expand Down Expand Up @@ -151,39 +151,25 @@ class Job(Resource):
"""
Resource representing a single job running on the compute.
"""
def __init__(self):
super(Job, self).__init__()

self.container_env = app.config.get('CONTAINER_ENV')
self.compute_mgr = None

def get(self, job_id):
job_id = job_id.lstrip('/')
container_env = app.config.get('CONTAINER_ENV')
job_logs = ''
job_info = {'id': '', 'image': '', 'cmd': '', 'timestamp': '', 'message': '',
'status': 'undefined', 'containerid': '', 'exitcode': '', 'pid': ''}

if container_env == 'swarm':
try:
swarm_mgr = SwarmManager(app.config)
except RuntimeError as e:
abort(503, message=str(e))

if self.container_env == 'swarm':
logger.info(f'Getting job {job_id} status from the Swarm cluster')
try:
service = swarm_mgr.get_service(job_id)
except docker.errors.NotFound as e:
abort(404, message=str(e))
except docker.errors.APIError as e:
status_code = e.response.status_code
status_code = 503 if status_code == 500 else status_code
abort(status_code, message=str(e))
except docker.errors.InvalidVersion as e:
abort(400, message=str(e))

job_info = swarm_mgr.get_service_task_info(service)
job = self.get_job(job_id)
job_info = self.compute_mgr.get_service_task_info(job)
logger.info(f'Successful job {job_id} status response from Swarm: '
f'{job_info}')
job_logs = swarm_mgr.get_service_logs(service)

if job_info['status'] in ('undefined', 'finishedWithError',
'finishedSuccessfully'):
service.remove() # remove job from swarm cluster
job_logs = self.compute_mgr.get_service_logs(job)

return {
'jid': job_id,
Expand All @@ -197,3 +183,34 @@ def get(self, job_id):
'pid': job_info['pid'],
'logs': job_logs
}

def delete(self, job_id):
job_id = job_id.lstrip('/')

if self.container_env == 'swarm':
logger.info(f'Deleting job {job_id} from {self.container_env}')
job = self.get_job(job_id)
job.remove() # remove job from swarm cluster
logger.info(f'Successfully removed job {job_id} from {self.container_env}')
return '', 204

def get_job(self, job_id):
"""
Return the job object.
"""
if self.container_env == 'swarm':
try:
self.compute_mgr = SwarmManager(app.config)
except RuntimeError as e:
abort(503, message=str(e))
try:
service = self.compute_mgr.get_service(job_id)
except docker.errors.NotFound as e:
abort(404, message=str(e))
except docker.errors.APIError as e:
status_code = e.response.status_code
status_code = 503 if status_code == 500 else status_code
abort(status_code, message=str(e))
except docker.errors.InvalidVersion as e:
abort(400, message=str(e))
return service
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name = 'pman',
version = '3.0.0',
version = '3.1.0',
description = 'Process Manager',
long_description = readme,
author = 'FNNDSC Developers',
Expand Down
9 changes: 8 additions & 1 deletion tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def tearDown(self):

def test_get(self):
response = self.client.get(self.url)
self.assertEqual(response.status_code, 200)
self.assertTrue('server_version' in response.json)

def test_post(self):
Expand All @@ -75,14 +76,20 @@ def test_post(self):
}
# make the POST request
response = self.client.post(self.url, json=data)
self.assertEqual(response.status_code, 201)
self.assertIn('status', response.json)

time.sleep(5)
with self.app.test_request_context():
# test get job status and cleanup swarm job
# test get job status
url = url_for('api.job', job_id=self.job_id)
response = self.client.get(url)
if response.json['status'] != 'finishedSuccessfully':
time.sleep(10)
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json['status'], 'finishedSuccessfully')

# test remove job (cleanup swarm job)
response = self.client.delete(url)
self.assertEqual(response.status_code, 204)

0 comments on commit 9655948

Please sign in to comment.