diff --git a/kubernetes/pfcon_dev_innetwork.yaml b/kubernetes/pfcon_dev_innetwork.yaml index 2eab7d6..bfcffd5 100755 --- a/kubernetes/pfcon_dev_innetwork.yaml +++ b/kubernetes/pfcon_dev_innetwork.yaml @@ -54,6 +54,10 @@ spec: env: - name: APPLICATION_MODE value: development + - name: PFCON_INNETWORK + value: true + - name: STORAGE_ENV + value: swift command: ["python"] args: ["-m", "pfcon"] volumeMounts: diff --git a/kubernetes/pfcon_dev_innetwork_fs.yaml b/kubernetes/pfcon_dev_innetwork_fs.yaml new file mode 100755 index 0000000..9d2923c --- /dev/null +++ b/kubernetes/pfcon_dev_innetwork_fs.yaml @@ -0,0 +1,133 @@ +apiVersion: v1 +kind: Service +metadata: + name: pfcon + labels: + app: pfcon + env: development +spec: + type: NodePort + selector: + app: pfcon + env: development + ports: + - port: 30006 + targetPort: 5005 + nodePort: 30006 + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pfcon + labels: + app: pfcon + env: development +spec: + replicas: 1 + selector: + matchLabels: + app: pfcon + env: development + template: + metadata: + name: pfcon + labels: + app: pfcon + env: development + spec: + initContainers: + - name: init-pfcon + image: busybox:1.32 + command: [ 'sh', '-c', "until wget --spider -S -T 2 http://pman:5010/api/v1/ 2>&1 | grep '200 OK'; do echo waiting for pman; done" ] + containers: + - image: localhost:5000/fnndsc/pfcon:dev + name: pfcon + stdin: true + tty: true + ports: + - containerPort: 5005 + env: + - name: APPLICATION_MODE + value: development + - name: PFCON_INNETWORK + value: true + - name: STORAGE_ENV + value: filesystem + command: ["python"] + args: ["-m", "pfcon"] + volumeMounts: + - mountPath: "/var/local/storeBase" + name: "storebase" + - mountPath: "/app/pfcon" + name: "pfcon-source" + - mountPath: "/app/tests" + name: "pfcon-tests" + volumes: + - name: "storebase" + hostPath: + path: ${STOREBASE} + - name: "pfcon-source" + hostPath: + path: ${SOURCEDIR}/pfcon + - name: "pfcon-tests" + hostPath: + path: ${SOURCEDIR}/tests + +--- + +apiVersion: v1 +kind: Service +metadata: + name: pman + labels: + app: pman + env: production +spec: + selector: + app: pman + env: production + ports: + - port: 5010 + targetPort: 5010 + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pman + labels: + app: pman + env: production +spec: + replicas: 1 + selector: + matchLabels: + app: pman + env: production + template: + metadata: + name: pman + labels: + app: pman + env: production + spec: + containers: + - image: fnndsc/pman + name: pman + ports: + - containerPort: 5010 + # Since pman spins off containers of its own it needs to mount storeBase dir + # (where pfcon shares the data) into the spawned container. This directory is + # passed in the STOREBASE env variable. + env: + - name: STORAGE_TYPE + value: host + - name: SECRET_KEY + value: "anysu^l=@pnsf!5piqz6!!5kdcdpo79y6jebbp+2244yjm*#+k" + - name: STOREBASE + value: ${STOREBASE} + - name: CONTAINER_ENV + value: kubernetes diff --git a/pfcon/config.py b/pfcon/config.py index d7b765c..5ec734a 100755 --- a/pfcon/config.py +++ b/pfcon/config.py @@ -1,6 +1,7 @@ from logging.config import dictConfig from environs import Env +import os from importlib.metadata import Distribution from .swiftmanager import SwiftManager @@ -34,10 +35,8 @@ def __init__(self): if self.STORAGE_ENV != 'zipfile': raise ValueError(f"Unsupported value '{self.STORAGE_ENV}' for STORAGE_ENV") - if self.STORAGE_ENV == 'filesystem': - self.FILESYSTEM_BASEDIR = env('FILESYSTEM_BASEDIR', '/filesystem') + self.STOREBASE_MOUNT = env('STOREBASE_MOUNT', '/var/local/storeBase') - self.STORE_BASE = env('STOREBASE', '/var/local/storeBase') self.env = env diff --git a/pfcon/filesystem_storage.py b/pfcon/filesystem_storage.py index 3f9ce6d..35381d6 100755 --- a/pfcon/filesystem_storage.py +++ b/pfcon/filesystem_storage.py @@ -1,6 +1,6 @@ """ -Handle filesystem-based (eg. mount directory) storage. This is used when pfcon is -in-network and configured to directly copy the data from a filesystem. +Handle filesystem-based (eg. mounted directory) storage. This is used when pfcon is +in-network and configured to directly access the data from a filesystem. """ import logging @@ -8,7 +8,6 @@ import os import json import io -import shutil from .base_storage import BaseStorage @@ -23,33 +22,17 @@ def __init__(self, config): super().__init__(config) - self.base_dir = config.get('FILESYSTEM_BASEDIR') + self.fs_mount_base_dir = config.get('STOREBASE_MOUNT') - - def store_data(self, job_id, job_incoming_dir, data, **kwargs): + def store_data(self, job_id, job_incoming_dir, data=None, **kwargs): """ - Copy all the files/folders under each input folder in the specified data list - into the specified incoming directory. + Count the number of files in the specified job incoming directory. """ nfiles = 0 - for rel_path in data: - abs_path = os.path.join(self.base_dir, rel_path.strip('/')) - - for root, dirs, files in os.walk(abs_path): - local_path = root.replace(abs_path, job_incoming_dir, 1) - os.makedirs(local_path, exist_ok=True) - - for filename in files: - fs_file_path = os.path.join(root, filename) - try: - shutil.copy(fs_file_path, local_path) - except Exception as e: - logger.error(f'Failed to copy file {fs_file_path} for ' - f'job {job_id}, detail: {str(e)}') - raise - nfiles += 1 - - logger.info(f'{nfiles} files copied from file system for job {job_id}') + for root, dirs, files in os.walk(job_incoming_dir): + nfiles += len(files) + + logger.info(f'{nfiles} files found in file system for job {job_id}') return { 'jid': job_id, 'nfiles': nfiles, @@ -59,33 +42,32 @@ def store_data(self, job_id, job_incoming_dir, data, **kwargs): def get_data(self, job_id, job_outgoing_dir, **kwargs): """ - Copy output files/folders from the specified outgoing directory into the folder - specified by job_output_path keyword argument (relative to the FS base dir). + List the output files' relative paths from the folder specified by + the job_output_path keyword argument which in turn is relative to the filesystem + base directory (assumed to be the storebase mount directory). Then create job json file ready for transmission to a remote origin. The json file contains the job_output_path prefix and the list of relative file paths. """ - job_output_path = kwargs['job_output_path'] - fs_output_path = os.path.join(self.base_dir, job_output_path) fs_rel_file_paths = [] + job_output_path = kwargs['job_output_path'] + abs_path = os.path.join(self.fs_mount_base_dir, job_output_path) - for root, dirs, files in os.walk(job_outgoing_dir): - rel_path = os.path.relpath(root, job_outgoing_dir) + for root, dirs, files in os.walk(abs_path): + rel_path = os.path.relpath(root, abs_path) if rel_path == '.': rel_path = '' - fs_path = os.path.join(fs_output_path, rel_path) - os.makedirs(fs_path, exist_ok=True) for filename in files: local_file_path = os.path.join(root, filename) if not os.path.islink(local_file_path): - try: - shutil.copy(local_file_path, fs_path) - except Exception as e: - logger.error(f'Failed to copy file {local_file_path} for ' - f'job {job_id}, detail: {str(e)}') - raise fs_rel_file_paths.append(os.path.join(rel_path, filename)) - data = {'job_output_path': job_output_path, + data = {'job_output_path': kwargs['job_output_path'], 'rel_file_paths': fs_rel_file_paths} return io.BytesIO(json.dumps(data).encode()) + + def delete_data(self, job_dir): + """ + Delete job data from the local storage. + """ + pass diff --git a/pfcon/resources.py b/pfcon/resources.py index 40af6cb..b497434 100755 --- a/pfcon/resources.py +++ b/pfcon/resources.py @@ -42,6 +42,7 @@ parser.add_argument('input_dirs', dest='input_dirs', required=False, type=str, action='append', location='form') +parser.add_argument('output_dir', dest='output_dir', required=False, location='form') parser.add_argument('data_file', dest='data_file', required=False, location='files') parser_auth = reqparse.RequestParser(bundle_errors=True) @@ -59,7 +60,7 @@ def __init__(self): self.storage_env = app.config.get('STORAGE_ENV') self.pfcon_innetwork = app.config.get('PFCON_INNETWORK') - self.storebase = app.config.get('STORE_BASE') + self.storebase_mount = app.config.get('STOREBASE_MOUNT') def get(self): response = { @@ -79,6 +80,8 @@ def post(self): if self.pfcon_innetwork: if args.input_dirs is None: abort(400, message='input_dirs: field is required') + if self.storage_env == 'filesystem' and args.output_dir is None: + abort(400, message='output_dir: field is required') else: if request.files['data_file'] is None: abort(400, message='data_file: field is required') @@ -88,42 +91,49 @@ def post(self): # process data - job_dir = os.path.join(self.storebase, 'key-' + job_id) - incoming_dir = os.path.join(job_dir, 'incoming') - outgoing_dir = os.path.join(job_dir, 'outgoing') - os.makedirs(incoming_dir, exist_ok=True) - os.makedirs(outgoing_dir, exist_ok=True) - - if self.pfcon_innetwork: - if self.storage_env == 'swift': - storage = SwiftStorage(app.config) - try: - d_info = storage.store_data(job_id, incoming_dir, args.input_dirs) - except ClientException as e: - logger.error(f'Error while fetching files from swift and ' - f'storing job {job_id} data, detail: {str(e)}') - abort(400, message='input_dirs: Error fetching files from swift') - - elif self.storage_env == 'filesystem': - storage = FileSystemStorage(app.config) - try: - d_info = storage.store_data(job_id, incoming_dir, args.input_dirs) - except Exception as e: - logger.error(f'Error while copying files from filesystem and ' - f'storing job {job_id} data, detail: {str(e)}') - abort(400, message='input_dirs: Error copying files from filesystem') + input_dir = 'key-' + job_id + '/incoming' + output_dir = 'key-' + job_id + '/outgoing' + + if self.pfcon_innetwork and self.storage_env == 'filesystem': + # only the first input dir is considered here + input_dir = args.input_dirs[0].strip('/') + output_dir = args.output_dir.strip('/') + incoming_dir = os.path.join(self.storebase_mount, input_dir) + storage = FileSystemStorage(app.config) + try: + d_info = storage.store_data(job_id, incoming_dir) + except Exception as e: + logger.error(f'Error while accessing files from shared filesystem ' + f'for job {job_id}, detail: {str(e)}') + abort(400, message='input_dirs: Error accessing files from shared ' + 'filesystem') else: - if self.storage_env == 'zipfile': - storage = ZipFileStorage(app.config) - data_file = request.files['data_file'] - try: - d_info = storage.store_data(job_id, incoming_dir, data_file) - except zipfile.BadZipFile as e: - logger.error(f'Error while decompressing and storing job {job_id} ' - f'data, detail: {str(e)}') - abort(400, message='data_file: Bad zip file') + incoming_dir = os.path.join(self.storebase_mount, input_dir) + outgoing_dir = os.path.join(self.storebase_mount, output_dir) + os.makedirs(incoming_dir, exist_ok=True) + os.makedirs(outgoing_dir, exist_ok=True) - logger.info(f'Successfully stored job {job_id} input data') + if self.pfcon_innetwork: + if self.storage_env == 'swift': + storage = SwiftStorage(app.config) + try: + d_info = storage.store_data(job_id, incoming_dir, args.input_dirs) + except ClientException as e: + logger.error(f'Error while fetching files from swift and ' + f'storing job {job_id} data, detail: {str(e)}') + abort(400, message='input_dirs: Error fetching files from swift') + else: + if self.storage_env == 'zipfile': + storage = ZipFileStorage(app.config) + data_file = request.files['data_file'] + try: + d_info = storage.store_data(job_id, incoming_dir, data_file) + except zipfile.BadZipFile as e: + logger.error(f'Error while decompressing and storing job {job_id} ' + f'data, detail: {str(e)}') + abort(400, message='data_file: Bad zip file') + + logger.info(f'Successfully stored job {job_id} input data') # process compute @@ -138,7 +148,9 @@ def post(self): 'image': args.image, 'entrypoint': args.entrypoint, 'type': args.type, - 'env': args.env + 'env': args.env, + 'input_dir': input_dir, + 'output_dir': output_dir, } pman = PmanService.get_service_obj() try: @@ -159,6 +171,7 @@ def __init__(self): self.storage_env = app.config.get('STORAGE_ENV') self.pfcon_innetwork = app.config.get('PFCON_INNETWORK') + self.storebase_mount = app.config.get('STOREBASE_MOUNT') def get(self, job_id): pman = PmanService.get_service_obj() @@ -183,8 +196,7 @@ def delete(self, job_id): if self.storage_env == 'zipfile': storage = ZipFileStorage(app.config) - storebase = app.config.get('STORE_BASE') - job_dir = os.path.join(storebase, 'key-' + job_id) + job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) if os.path.isdir(job_dir): logger.info(f'Deleting job {job_id} data from store') storage.delete_data(job_dir) @@ -210,46 +222,61 @@ def __init__(self): self.storage_env = app.config.get('STORAGE_ENV') self.pfcon_innetwork = app.config.get('PFCON_INNETWORK') + self.storebase_mount = app.config.get('STOREBASE_MOUNT') def get(self, job_id): - storebase = app.config.get('STORE_BASE') - job_dir = os.path.join(storebase, 'key-' + job_id) - if not os.path.isdir(job_dir): - abort(404) - outgoing_dir = os.path.join(job_dir, 'outgoing') - if not os.path.exists(outgoing_dir): - os.mkdir(outgoing_dir) - - logger.info(f'Retrieving job {job_id} output data') - content = b'' download_name = f'{job_id}.zip' mimetype = 'application/zip' - if self.pfcon_innetwork: + if self.pfcon_innetwork and self.storage_env == 'filesystem': job_output_path = request.args.get('job_output_path') + if not job_output_path: + abort(400, message='job_output_path: query parameter is required') - if job_output_path: - storage = None - if self.storage_env == 'swift': - storage = SwiftStorage(app.config) - elif self.storage_env == 'filesystem': - storage = FileSystemStorage(app.config) + job_output_path = job_output_path.strip('/') + outgoing_dir = os.path.join(self.storebase_mount, job_output_path) + if not os.path.isdir(outgoing_dir): + abort(404) + + storage = FileSystemStorage(app.config) + content = storage.get_data(job_id, outgoing_dir, + job_output_path=job_output_path) + download_name = f'{job_id}.json' + mimetype = 'application/json' - content = storage.get_data(job_id, outgoing_dir, - job_output_path=job_output_path.lstrip('/')) - download_name = f'{job_id}.json' - mimetype = 'application/json' - else: - # if no query parameter passed then the job's zip file is returned - storage = ZipFileStorage(app.config) - content = storage.get_data(job_id, outgoing_dir) else: - if self.storage_env == 'zipfile': - storage = ZipFileStorage(app.config) - content = storage.get_data(job_id, outgoing_dir) + job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) + if not os.path.isdir(job_dir): + abort(404) + outgoing_dir = os.path.join(job_dir, 'outgoing') + if not os.path.exists(outgoing_dir): + os.mkdir(outgoing_dir) + + logger.info(f'Retrieving job {job_id} output data') + + if self.pfcon_innetwork: + job_output_path = request.args.get('job_output_path') + + if job_output_path: + job_output_path = job_output_path.lstrip('/') + + if self.storage_env == 'swift': + storage = SwiftStorage(app.config) + content = storage.get_data(job_id, outgoing_dir, + job_output_path=job_output_path) + download_name = f'{job_id}.json' + mimetype = 'application/json' + else: + # if no query parameter passed then the job's zip file is returned + storage = ZipFileStorage(app.config) + content = storage.get_data(job_id, outgoing_dir) + else: + if self.storage_env == 'zipfile': + storage = ZipFileStorage(app.config) + content = storage.get_data(job_id, outgoing_dir) - logger.info(f'Successfully retrieved job {job_id} output data') + logger.info(f'Successfully retrieved job {job_id} output data') return send_file(content, download_name=download_name, as_attachment=True, mimetype=mimetype) diff --git a/swarm/docker-compose_dev_innetwork_fs.yml b/swarm/docker-compose_dev_innetwork_fs.yml index 86b27fd..4c3152d 100755 --- a/swarm/docker-compose_dev_innetwork_fs.yml +++ b/swarm/docker-compose_dev_innetwork_fs.yml @@ -28,7 +28,6 @@ services: - PFCON_INNETWORK=true - STORAGE_ENV=filesystem volumes: - - fs_storage_dev:/filesystem - ${STOREBASE:?}:/var/local/storeBase:z - ../pfcon:/app/pfcon:z - ../tests:/app/tests:z @@ -36,7 +35,6 @@ services: - "30006:5005" depends_on: - pman - - swift_service networks: - remote labels: @@ -68,6 +66,3 @@ services: networks: remote: - -volumes: - fs_storage_dev: diff --git a/tests/test_resources.py b/tests/test_resources.py index 789f7ae..c9fad94 100755 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -36,6 +36,7 @@ def setUp(self): response = self.client.post(url, data=json.dumps(creds), content_type='application/json') self.headers = {'Authorization': 'Bearer ' + response.json['token']} + self.storebase_mount = self.app.config.get('STOREBASE_MOUNT') self.job_dir = '' def tearDown(self): @@ -64,7 +65,7 @@ def test_get(self): def test_post(self): job_id = 'chris-jid-1' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) # create zip data file memory_zip_file = io.BytesIO() with zipfile.ZipFile(memory_zip_file, 'w', zipfile.ZIP_DEFLATED) as job_data_zip: @@ -126,11 +127,16 @@ def setUp(self): def test_get(self): job_id = 'chris-jid-2' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) incoming = os.path.join(self.job_dir, 'incoming') + input_dir = os.path.relpath(incoming, self.storebase_mount) Path(incoming).mkdir(parents=True, exist_ok=True) outgoing = os.path.join(self.job_dir, 'outgoing') + output_dir = os.path.relpath(outgoing, self.storebase_mount) Path(outgoing).mkdir(parents=True, exist_ok=True) + self.compute_data['input_dir'] = input_dir + self.compute_data['output_dir'] = output_dir + with open(os.path.join(incoming, 'test.txt'), 'w') as f: f.write('job input test file') @@ -155,9 +161,14 @@ def test_delete(self): job_id = 'chris-jid-3' self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) incoming = os.path.join(self.job_dir, 'incoming') + input_dir = os.path.relpath(incoming, self.storebase_mount) Path(incoming).mkdir(parents=True, exist_ok=True) outgoing = os.path.join(self.job_dir, 'outgoing') + output_dir = os.path.relpath(outgoing, self.storebase_mount) Path(outgoing).mkdir(parents=True, exist_ok=True) + self.compute_data['input_dir'] = input_dir + self.compute_data['output_dir'] = output_dir + with open(os.path.join(incoming, 'test.txt'), 'w') as f: f.write('job input test file') @@ -180,13 +191,14 @@ class TestJobFile(ResourceTests): def test_get(self): job_id = 'chris-jid-4' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) with self.app.test_request_context(): url = url_for('api.jobfile', job_id=job_id) outgoing = os.path.join(self.job_dir, 'outgoing') test_file_path = os.path.join(outgoing, 'out') Path(test_file_path).mkdir(parents=True, exist_ok=True) + with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: f.write('job input test file') response = self.client.get(url, headers=self.headers) diff --git a/tests/test_resources_innetwork.py b/tests/test_resources_innetwork.py index 694af85..8400438 100755 --- a/tests/test_resources_innetwork.py +++ b/tests/test_resources_innetwork.py @@ -53,6 +53,8 @@ def setUp(self): with io.StringIO('Test file') as f: self.swift_manager.upload_obj(self.swift_input_path + '/test.txt', f.read(), content_type='text/plain') + + self.storebase_mount = self.app.config.get('STOREBASE_MOUNT') self.job_dir = '' def tearDown(self): @@ -91,7 +93,7 @@ def test_get(self): def test_post(self): job_id = 'chris-jid-1' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) data = { 'jid': job_id, @@ -147,11 +149,16 @@ def setUp(self): def test_get(self): job_id = 'chris-jid-2' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) incoming = os.path.join(self.job_dir, 'incoming') + input_dir = os.path.relpath(incoming, self.storebase_mount) Path(incoming).mkdir(parents=True, exist_ok=True) outgoing = os.path.join(self.job_dir, 'outgoing') + output_dir = os.path.relpath(outgoing, self.storebase_mount) Path(outgoing).mkdir(parents=True, exist_ok=True) + self.compute_data['input_dir'] = input_dir + self.compute_data['output_dir'] = output_dir + with open(os.path.join(incoming, 'test.txt'), 'w') as f: f.write('job input test file') @@ -174,11 +181,16 @@ def test_get(self): def test_delete(self): job_id = 'chris-jid-3' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) incoming = os.path.join(self.job_dir, 'incoming') + input_dir = os.path.relpath(incoming, self.storebase_mount) Path(incoming).mkdir(parents=True, exist_ok=True) outgoing = os.path.join(self.job_dir, 'outgoing') + output_dir = os.path.relpath(outgoing, self.storebase_mount) Path(outgoing).mkdir(parents=True, exist_ok=True) + self.compute_data['input_dir'] = input_dir + self.compute_data['output_dir'] = output_dir + with open(os.path.join(incoming, 'test.txt'), 'w') as f: f.write('job input test file') @@ -201,13 +213,14 @@ class TestJobFile(ResourceTests): def test_get_without_query_parameters(self): job_id = 'chris-jid-4' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) with self.app.test_request_context(): url = url_for('api.jobfile', job_id=job_id) outgoing = os.path.join(self.job_dir, 'outgoing') test_file_path = os.path.join(outgoing, 'out') Path(test_file_path).mkdir(parents=True, exist_ok=True) + with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: f.write('job input test file') @@ -221,7 +234,7 @@ def test_get_without_query_parameters(self): def test_get_with_query_parameters(self): job_id = 'chris-jid-4' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id) with self.app.test_request_context(): url = url_for('api.jobfile', job_id=job_id) @@ -229,7 +242,7 @@ def test_get_with_query_parameters(self): test_file_path = os.path.join(outgoing, 'out') Path(test_file_path).mkdir(parents=True, exist_ok=True) with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: - f.write('job input test file') + f.write('job output test file') response = self.client.get(url, query_string={'job_output_path': self.swift_output_path}, headers=self.headers) diff --git a/tests/test_resources_innetwork_fs.py b/tests/test_resources_innetwork_fs.py index 4bdfa55..be3002b 100755 --- a/tests/test_resources_innetwork_fs.py +++ b/tests/test_resources_innetwork_fs.py @@ -3,9 +3,7 @@ from pathlib import Path import shutil import os -import io import time -import zipfile import json from unittest import TestCase from unittest import mock, skip @@ -25,8 +23,7 @@ def setUp(self): logging.disable(logging.WARNING) self.app = create_app({'PFCON_INNETWORK': True, - 'STORAGE_ENV': 'filesystem', - 'FILESYSTEM_BASEDIR': '/filesystem' + 'STORAGE_ENV': 'filesystem' }) self.client = self.app.test_client() with self.app.test_request_context(): @@ -39,8 +36,8 @@ def setUp(self): response = self.client.post(url, data=json.dumps(creds), content_type='application/json') self.headers = {'Authorization': 'Bearer ' + response.json['token']} - self.fs_base_dir = self.app.config.get('FILESYSTEM_BASEDIR') - self.user_dir = os.path.join(self.fs_base_dir, 'foo') + self.storebase_mount = self.app.config.get('STOREBASE_MOUNT') + self.user_dir = os.path.join(self.storebase_mount, 'foo') # copy a file to the filesystem storage input path self.fs_input_path = os.path.join(self.user_dir, 'feed/input') @@ -50,12 +47,7 @@ def setUp(self): with open(self.fs_input_path + '/test.txt', 'w') as f: f.write('Test file') - self.job_dir = '' - def tearDown(self): - if os.path.isdir(self.job_dir): - shutil.rmtree(self.job_dir) - # delete files from filesystem storage if os.path.isdir(self.user_dir): shutil.rmtree(self.user_dir) @@ -83,7 +75,6 @@ def test_get(self): def test_post(self): job_id = 'chris-jid-1' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) data = { 'jid': job_id, @@ -96,7 +87,8 @@ def test_post(self): 'gpu_limit': '0', 'image': 'fnndsc/pl-simplefsapp', 'type': 'fs', - 'input_dirs': [os.path.relpath(self.fs_input_path, self.fs_base_dir)] + 'input_dirs': [os.path.relpath(self.fs_input_path, self.storebase_mount)], + 'output_dir': os.path.relpath(self.fs_output_path, self.storebase_mount) } # make the POST request response = self.client.post(self.url, data=data, headers=self.headers) @@ -134,18 +126,13 @@ def setUp(self): 'memory_limit': '200', 'gpu_limit': '0', 'image': 'fnndsc/pl-simplefsapp', - 'type': 'fs' + 'type': 'fs', + 'input_dir': os.path.relpath(self.fs_input_path, self.storebase_mount), + 'output_dir': os.path.relpath(self.fs_output_path, self.storebase_mount) } def test_get(self): job_id = 'chris-jid-2' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) - incoming = os.path.join(self.job_dir, 'incoming') - Path(incoming).mkdir(parents=True, exist_ok=True) - outgoing = os.path.join(self.job_dir, 'outgoing') - Path(outgoing).mkdir(parents=True, exist_ok=True) - with open(os.path.join(incoming, 'test.txt'), 'w') as f: - f.write('job input test file') with self.app.test_request_context(): # create job @@ -166,13 +153,6 @@ def test_get(self): def test_delete(self): job_id = 'chris-jid-3' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) - incoming = os.path.join(self.job_dir, 'incoming') - Path(incoming).mkdir(parents=True, exist_ok=True) - outgoing = os.path.join(self.job_dir, 'outgoing') - Path(outgoing).mkdir(parents=True, exist_ok=True) - with open(os.path.join(incoming, 'test.txt'), 'w') as f: - f.write('job input test file') with self.app.test_request_context(): # create job @@ -193,41 +173,30 @@ class TestJobFile(ResourceTests): def test_get_without_query_parameters(self): job_id = 'chris-jid-4' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) with self.app.test_request_context(): url = url_for('api.jobfile', job_id=job_id) - outgoing = os.path.join(self.job_dir, 'outgoing') - test_file_path = os.path.join(outgoing, 'out') - Path(test_file_path).mkdir(parents=True, exist_ok=True) - with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: - f.write('job input test file') response = self.client.get(url, headers=self.headers) - self.assertEqual(response.status_code, 200) - memory_zip_file = io.BytesIO(response.data) - with zipfile.ZipFile(memory_zip_file, 'r', zipfile.ZIP_DEFLATED) as job_zip: - filenames = job_zip.namelist() - self.assertEqual(len(filenames), 1) - self.assertEqual(filenames[0], 'out/test.txt') + self.assertEqual(response.status_code, 400) def test_get_with_query_parameters(self): job_id = 'chris-jid-4' - self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) with self.app.test_request_context(): url = url_for('api.jobfile', job_id=job_id) - outgoing = os.path.join(self.job_dir, 'outgoing') - test_file_path = os.path.join(outgoing, 'out') + + test_file_path = os.path.join(self.fs_output_path, 'out') Path(test_file_path).mkdir(parents=True, exist_ok=True) + with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: - f.write('job input test file') + f.write('job output test file') - job_output_path = os.path.relpath(self.fs_output_path, self.fs_base_dir) + job_output_path = os.path.relpath(self.fs_output_path, self.storebase_mount) response = self.client.get(url, query_string={'job_output_path': job_output_path}, headers=self.headers) self.assertEqual(response.status_code, 200) content = json.loads(response.data.decode()) self.assertEqual(content['job_output_path'], job_output_path) - self.assertEqual(content['rel_file_paths'], ["out/test.txt"]) + self.assertEqual(content['rel_file_paths'], ['out/test.txt']) diff --git a/unmake.sh b/unmake.sh index 4b0ad48..a2e5470 100755 --- a/unmake.sh +++ b/unmake.sh @@ -134,18 +134,19 @@ title -d 1 "Destroying pfcon containerized dev environment on $ORCHESTRATOR" echo "docker volume rm -f pfcon_dev_stack_swift_storage_dev" sleep 15 docker volume rm pfcon_dev_stack_swift_storage_dev - elif [[ $STORAGE == 'filesystem' ]]; then - echo "docker volume rm -f pfcon_dev_stack_fs_storage_dev" - sleep 15 - docker volume rm pfcon_dev_stack_fs_storage_dev fi fi elif [[ $ORCHESTRATOR == kubernetes ]]; then if (( b_pfconInNetwork )) ; then - echo "kubectl delete -f kubernetes/pfcon_dev_innetwork.yaml" | ./boxes.sh ${LightCyan} - kubectl delete -f kubernetes/pfcon_dev_innetwork.yaml - echo "Removing swift_storage folder $SOURCEDIR/swift_storage" | ./boxes.sh - rm -fr $SOURCEDIR/swift_storage + if [[ $STORAGE == 'swift' ]]; then + echo "kubectl delete -f kubernetes/pfcon_dev_innetwork.yaml" | ./boxes.sh ${LightCyan} + kubectl delete -f kubernetes/pfcon_dev_innetwork.yaml + echo "Removing swift_storage folder $SOURCEDIR/swift_storage" | ./boxes.sh + rm -fr $SOURCEDIR/swift_storage + elif [[ $STORAGE == 'filesystem' ]]; then + echo "kubectl delete -f kubernetes/pfcon_dev_innetwork_fs.yaml" | ./boxes.sh ${LightCyan} + kubectl delete -f kubernetes/pfcon_dev_innetwork_fs.yaml + fi else echo "kubectl delete -f kubernetes/pfcon_dev.yaml" | ./boxes.sh ${LightCyan} kubectl delete -f kubernetes/pfcon_dev.yaml