From 138ac1ad565fc90208fa2cd67194899e1801352e Mon Sep 17 00:00:00 2001 From: Jorge Date: Fri, 21 Jul 2023 17:50:11 -0400 Subject: [PATCH] Implement pfcon in-network operation mode --- .dockerignore | 1 + .gitignore | 1 + README.rst | 33 +++ kubernetes/pfcon_dev.yaml | 4 +- kubernetes/pfcon_dev_innetwork.yaml | 191 ++++++++++++++++ make.sh | 68 ++++-- openshift/pfcon-openshift-template.json | 2 +- pfcon/base_storage.py | 40 ++++ pfcon/config.py | 41 +++- pfcon/resources.py | 179 ++++++++++----- pfcon/swift_storage.py | 99 ++++++++ pfcon/swift_store.py | 136 ----------- pfcon/swiftmanager.py | 170 ++++++++++++++ pfcon/{mount_dir.py => zip_file_storage.py} | 31 ++- swarm/docker-compose_dev.yml | 5 + swarm/docker-compose_dev_innetwork.yml | 88 +++++++ tests/test_resources.py | 36 +-- tests/test_resources_innetwork.py | 239 ++++++++++++++++++++ unmake.sh | 40 +++- 19 files changed, 1140 insertions(+), 264 deletions(-) create mode 100755 kubernetes/pfcon_dev_innetwork.yaml create mode 100755 pfcon/base_storage.py create mode 100755 pfcon/swift_storage.py delete mode 100755 pfcon/swift_store.py create mode 100755 pfcon/swiftmanager.py rename pfcon/{mount_dir.py => zip_file_storage.py} (77%) create mode 100755 swarm/docker-compose_dev_innetwork.yml create mode 100755 tests/test_resources_innetwork.py diff --git a/.dockerignore b/.dockerignore index 8e86b04..d06106f 100755 --- a/.dockerignore +++ b/.dockerignore @@ -9,3 +9,4 @@ Dockerfile .git LICENSE CHRIS_REMOTE_FS +swift_storage diff --git a/.gitignore b/.gitignore index df5e2ae..f5070c6 100755 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ dc.out CHRIS_REMOTE_FS/ +swift_storage/ diff --git a/README.rst b/README.rst index ce0c962..19aa38b 100755 --- a/README.rst +++ b/README.rst @@ -87,6 +87,22 @@ Remove pfcon's containers $> cd pfcon $> ./unmake.sh +Start pfcon's development server and backend containers operating in-network (with Swift storage) +------------------------------------------------------------------------------------------------- + +.. code-block:: bash + + $> cd pfcon + $> ./make.sh -N + +Remove pfcon's containers operating in-network (with Swift storage) +------------------------------------------------------------------- + +.. code-block:: bash + + $> cd pfcon + $> ./unmake.sh -N + Remove the local Docker Swarm cluster if desired ------------------------------------------------ @@ -132,6 +148,23 @@ Remove pfcon's containers $> ./unmake.sh -O kubernetes +Start pfcon's development server and backend containers operating in-network (with Swift storage) +------------------------------------------------------------------------------------------------- + +.. code-block:: bash + + $> cd pfcon + $> ./make.sh -N -O kubernetes + +Remove pfcon's containers operating in-network (with Swift storage) +------------------------------------------------------------------- + +.. code-block:: bash + + $> cd pfcon + $> ./unmake.sh -N -O kubernetes + + ********************** Production deployments ********************** diff --git a/kubernetes/pfcon_dev.yaml b/kubernetes/pfcon_dev.yaml index 3f8fcf5..6ecf804 100755 --- a/kubernetes/pfcon_dev.yaml +++ b/kubernetes/pfcon_dev.yaml @@ -42,7 +42,7 @@ spec: image: busybox:1.32 command: [ 'sh', '-c', "until wget --spider -S -T 2 http://pman:5010/api/v1/ 2>&1 | grep '200 OK'; do echo waiting for pman; done" ] containers: - - image: fnndsc/pfcon:dev + - image: localhost:5000/fnndsc/pfcon:dev name: pfcon stdin: true tty: true @@ -119,6 +119,8 @@ spec: # (where pfcon shares the data) into the spawned container. This directory is # passed in the STOREBASE env variable. env: + - name: STORAGE_TYPE + value: host - name: SECRET_KEY value: "anysu^l=@pnsf!5piqz6!!5kdcdpo79y6jebbp+2244yjm*#+k" - name: STOREBASE diff --git a/kubernetes/pfcon_dev_innetwork.yaml b/kubernetes/pfcon_dev_innetwork.yaml new file mode 100755 index 0000000..2eab7d6 --- /dev/null +++ b/kubernetes/pfcon_dev_innetwork.yaml @@ -0,0 +1,191 @@ +apiVersion: v1 +kind: Service +metadata: + name: pfcon + labels: + app: pfcon + env: development +spec: + type: NodePort + selector: + app: pfcon + env: development + ports: + - port: 30006 + targetPort: 5005 + nodePort: 30006 + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pfcon + labels: + app: pfcon + env: development +spec: + replicas: 1 + selector: + matchLabels: + app: pfcon + env: development + template: + metadata: + name: pfcon + labels: + app: pfcon + env: development + spec: + initContainers: + - name: init-pfcon + image: busybox:1.32 + command: [ 'sh', '-c', "until wget --spider -S -T 2 http://pman:5010/api/v1/ 2>&1 | grep '200 OK'; do echo waiting for pman; done" ] + - name: init-swift + image: busybox:1.32 + command: [ "sh", "-c", "until wget --spider -S -T 2 http://swift:8080/info 2>&1 | grep '200 OK'; do echo waiting for Swift storage; sleep2; done" ] + containers: + - image: fnndsc/pfcon:dev + name: pfcon + stdin: true + tty: true + ports: + - containerPort: 5005 + env: + - name: APPLICATION_MODE + value: development + command: ["python"] + args: ["-m", "pfcon"] + volumeMounts: + - mountPath: "/var/local/storeBase" + name: "storebase" + - mountPath: "/app/pfcon" + name: "pfcon-source" + - mountPath: "/app/tests" + name: "pfcon-tests" + volumes: + - name: "storebase" + hostPath: + path: ${STOREBASE} + - name: "pfcon-source" + hostPath: + path: ${SOURCEDIR}/pfcon + - name: "pfcon-tests" + hostPath: + path: ${SOURCEDIR}/tests + +--- + +apiVersion: v1 +kind: Service +metadata: + name: pman + labels: + app: pman + env: production +spec: + selector: + app: pman + env: production + ports: + - port: 5010 + targetPort: 5010 + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pman + labels: + app: pman + env: production +spec: + replicas: 1 + selector: + matchLabels: + app: pman + env: production + template: + metadata: + name: pman + labels: + app: pman + env: production + spec: + containers: + - image: fnndsc/pman + name: pman + ports: + - containerPort: 5010 + # Since pman spins off containers of its own it needs to mount storeBase dir + # (where pfcon shares the data) into the spawned container. This directory is + # passed in the STOREBASE env variable. + env: + - name: STORAGE_TYPE + value: host + - name: SECRET_KEY + value: "anysu^l=@pnsf!5piqz6!!5kdcdpo79y6jebbp+2244yjm*#+k" + - name: STOREBASE + value: ${STOREBASE} + - name: CONTAINER_ENV + value: kubernetes + +--- + +apiVersion: v1 +kind: Service +metadata: + name: swift + labels: + app: swift + env: production +spec: + type: NodePort + selector: + app: swift + env: production + ports: + - port: 8080 + targetPort: 8080 + nodePort: 30080 + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: swift + labels: + app: swift + env: production +spec: + replicas: 1 # stateful service, so only a single replica must be used + selector: + matchLabels: + app: swift + env: production + template: + metadata: + name: swift + labels: + app: swift + env: production + spec: + containers: + - name: swift + image: fnndsc/docker-swift-onlyone + ports: + - containerPort: 8080 + env: + - name: SWIFT_USERNAME + value: chris:chris1234 + - name: SWIFT_KEY + value: testing + volumeMounts: + - name: swiftdb + mountPath: "/srv" + volumes: + - name: swiftdb + hostPath: + path: ${SOURCEDIR}/swift_storage diff --git a/make.sh b/make.sh index b624cf7..c2ecdad 100755 --- a/make.sh +++ b/make.sh @@ -6,9 +6,9 @@ # # SYNPOSIS # -# make.sh [-h] [-i] [-s] [-U] \ -# [-O ] \ -# [-S ] \ +# make.sh [-h] [-i] [-s] [-N] [-U] \ +# [-O ] \ +# [-S ] \ # [local|fnndsc[:dev]] # # DESC @@ -19,16 +19,20 @@ # # TYPICAL CASES: # -# Run full pfcon instantiation on Swarm: +# Run full pfcon instantiation operating out-of-network on Swarm: # # unmake.sh ; sudo rm -fr CHRIS_REMOTE_FS; rm -fr CHRIS_REMOTE_FS; make.sh # +# Run full pfcon instantiation operating in-network on Swarm: +# +# unmake.sh -N; sudo rm -fr CHRIS_REMOTE_FS; rm -fr CHRIS_REMOTE_FS; make.sh -N +# # Skip the intro: # # unmake.sh ; sudo rm -fr CHRIS_REMOTE_FS; rm -fr CHRIS_REMOTE_FS; make.sh -s # # -# Run full pfcon instantiation on Kubernetes: +# Run full pfcon instantiation operating out-of-network on Kubernetes: # # unmake.sh -O kubernetes; sudo rm -fr CHRIS_REMOTE_FS; rm -fr CHRIS_REMOTE_FS; make.sh -O kubernetes # @@ -52,6 +56,11 @@ # # Optional do not automatically attach interactive terminal to pfcon container. # +# -N +# +# Optional set pfcon to operate in-network mode (using a swift storage instead of +# a zip file). +# # -U # # Optional skip the UNIT tests. @@ -84,11 +93,11 @@ ORCHESTRATOR=swarm HERE=$(pwd) print_usage () { - echo "Usage: ./make.sh [-h] [-i] [-s] [-U] [-O ] [-S ] [local|fnndsc[:dev]]" + echo "Usage: ./make.sh [-h] [-i] [-s] [-N] [-U] [-O ] [-S ] [local|fnndsc[:dev]]" exit 1 } -while getopts ":hsiUO:S:" opt; do +while getopts ":hsiNUO:S:" opt; do case $opt in h) print_usage ;; @@ -96,6 +105,8 @@ while getopts ":hsiUO:S:" opt; do ;; i) b_norestartinteractive_pfcon_dev=1 ;; + N) b_pfconInNetwork=1 + ;; U) b_skipUnitTests=1 ;; O) ORCHESTRATOR=$OPTARG @@ -116,6 +127,7 @@ while getopts ":hsiUO:S:" opt; do done shift $(($OPTIND - 1)) +export SWIFTREPO=fnndsc export PMANREPO=fnndsc export TAG= if (( $# == 1 )) ; then @@ -127,6 +139,7 @@ if (( $# == 1 )) ; then fi declare -a A_CONTAINER=( + "fnndsc/docker-swift-onlyone^SWIFTREPO" "fnndsc/pman^PMANREPO" "fnndsc/pl-simplefsapp" ) @@ -142,6 +155,11 @@ title -d 1 "Setting global exports..." mkdir -p $STOREBASE fi fi + if (( b_pfconInNetwork )) ; then + echo -e "PFCON_INNETWORK=True" | ./boxes.sh + else + echo -e "PFCON_INNETWORK=False" | ./boxes.sh + fi echo -e "ORCHESTRATOR=$ORCHESTRATOR" | ./boxes.sh echo -e "exporting STOREBASE=$STOREBASE " | ./boxes.sh export STOREBASE=$STOREBASE @@ -168,7 +186,11 @@ windowBottom title -d 1 "Building :dev" cd $HERE - CMD="docker compose -f swarm/docker-compose_dev.yml build" + if (( b_pfconInNetwork )) ; then + CMD="docker compose -f swarm/docker-compose_dev_innetwork.yml build" + else + CMD="docker compose -f swarm/docker-compose_dev.yml build" + fi echo "$CMD" | ./boxes.sh echo $CMD | sh | ./boxes.sh -c windowBottom @@ -209,11 +231,21 @@ windowBottom title -d 1 "Starting pfcon containerized dev environment on $ORCHESTRATOR" if [[ $ORCHESTRATOR == swarm ]]; then - echo "docker stack deploy -c swarm/docker-compose_dev.yml pfcon_dev_stack" | ./boxes.sh ${LightCyan} - docker stack deploy -c swarm/docker-compose_dev.yml pfcon_dev_stack + if (( b_pfconInNetwork )) ; then + echo "docker stack deploy -c swarm/docker-compose_dev_innetwork.yml pfcon_dev_stack" | ./boxes.sh ${LightCyan} + docker stack deploy -c swarm/docker-compose_dev_innetwork.yml pfcon_dev_stack + else + echo "docker stack deploy -c swarm/docker-compose_dev.yml pfcon_dev_stack" | ./boxes.sh ${LightCyan} + docker stack deploy -c swarm/docker-compose_dev.yml pfcon_dev_stack + fi elif [[ $ORCHESTRATOR == kubernetes ]]; then - echo "envsubst < kubernetes/pfcon_dev.yaml | kubectl apply -f -" | ./boxes.sh ${LightCyan} - envsubst < kubernetes/pfcon_dev.yaml | kubectl apply -f - + if (( b_pfconInNetwork )) ; then + echo "envsubst < kubernetes/pfcon_dev_innetwork.yaml | kubectl apply -f -" | ./boxes.sh ${LightCyan} + envsubst < kubernetes/pfcon_dev_innetwork.yaml | kubectl apply -f - + else + echo "envsubst < kubernetes/pfcon_dev.yaml | kubectl apply -f -" | ./boxes.sh ${LightCyan} + envsubst < kubernetes/pfcon_dev.yaml | kubectl apply -f - + fi fi windowBottom @@ -241,9 +273,17 @@ if (( ! b_skipUnitTests )) ; then title -d 1 "Running pfcon tests..." sleep 5 if [[ $ORCHESTRATOR == swarm ]]; then - docker exec $pfcon_dev pytest --color=yes + if (( b_pfconInNetwork )) ; then + docker exec $pfcon_dev pytest tests/test_resources_innetwork.py --color=yes + else + docker exec $pfcon_dev pytest tests/test_resources.py --color=yes + fi elif [[ $ORCHESTRATOR == kubernetes ]]; then - kubectl exec $pfcon_dev -- pytest --color=yes + if (( b_pfconInNetwork )) ; then + kubectl exec $pfcon_dev -- pytest tests/test_resources_innetwork.py --color=yes + else + kubectl exec $pfcon_dev -- pytest tests/test_resources.py --color=yes + fi fi status=$? title -d 1 "pfcon test results" diff --git a/openshift/pfcon-openshift-template.json b/openshift/pfcon-openshift-template.json index 6440a97..9bef333 100755 --- a/openshift/pfcon-openshift-template.json +++ b/openshift/pfcon-openshift-template.json @@ -97,7 +97,7 @@ "value": "http://pman-test-moc.k-apps.osh.massopen.cloud/api/v1/" }, { - "name": "STORE_ENV", + "name": "STORAGE_ENV", "value": "swift" }, { diff --git a/pfcon/base_storage.py b/pfcon/base_storage.py new file mode 100755 index 0000000..be48171 --- /dev/null +++ b/pfcon/base_storage.py @@ -0,0 +1,40 @@ +""" +Module containining the base abstract class for handling data storage. +""" + +import logging +import abc +import shutil + + +logger = logging.getLogger(__name__) + + +class BaseStorage(abc.ABC): + + def __init__(self, config): + + self.config = config + + @abc.abstractmethod + def store_data(self, job_id, job_incoming_dir, data, **kwargs): + """ + Store the files/directories specified in the input data at the specified + local incoming directory. The data parameter and its interpretation depends on + the concrete storage subclass. + """ + ... + + @abc.abstractmethod + def get_data(self, job_id, job_outgoing_dir, **kwargs): + """ + Return the data from the local outgoing directory. The returned data and its + interpretation depends on the concrete storage subclass. + """ + ... + + def delete_data(self, job_dir): + """ + Delete job data from the local storage. + """ + shutil.rmtree(job_dir) diff --git a/pfcon/config.py b/pfcon/config.py index 1b20216..9d3cd1a 100755 --- a/pfcon/config.py +++ b/pfcon/config.py @@ -1,8 +1,10 @@ + from logging.config import dictConfig from environs import Env - from importlib.metadata import Distribution +from .swiftmanager import SwiftManager + pkg = Distribution.from_name(__package__) @@ -21,10 +23,18 @@ def __init__(self): env = Env() env.read_env() # also read .env file, if it exists - self.STORE_ENV = env('STORE_ENV', 'mount') - if self.STORE_ENV == 'mount': - self.STORE_BASE = env('STOREBASE', '/var/local/storeBase') + self.PFCON_INNETWORK = env.bool('PFCON_INNETWORK', False) + + if self.PFCON_INNETWORK: + self.STORAGE_ENV = env('STORAGE_ENV', 'swift') + if self.STORAGE_ENV != 'swift': + raise ValueError(f"Unsupported value '{self.STORAGE_ENV}' for STORAGE_ENV") + else: + self.STORAGE_ENV = env('STORAGE_ENV', 'zipfile') + if self.STORAGE_ENV != 'zipfile': + raise ValueError(f"Unsupported value '{self.STORAGE_ENV}' for STORAGE_ENV") + self.STORE_BASE = env('STOREBASE', '/var/local/storeBase') self.env = env @@ -57,7 +67,7 @@ def __init__(self): }, 'console_stdout': { 'level': 'DEBUG', - 'class': 'logging.StreamHandle', + 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout', 'formatter': 'simple' } @@ -83,6 +93,18 @@ def __init__(self): # EXTERNAL SERVICES self.COMPUTE_SERVICE_URL = self.env('COMPUTE_SERVICE_URL', 'http://pman:5010/api/v1/') + if self.STORAGE_ENV == 'swift': + SWIFT_AUTH_URL = self.env('SWIFT_AUTH_URL', + 'http://swift_service:8080/auth/v1.0') + SWIFT_USERNAME = 'chris:chris1234' + SWIFT_KEY = 'testing' + self.SWIFT_CONTAINER_NAME = 'users' + self.SWIFT_CONNECTION_PARAMS = {'user': SWIFT_USERNAME, + 'key': SWIFT_KEY, + 'authurl': SWIFT_AUTH_URL} + SwiftManager(self.SWIFT_CONTAINER_NAME, + self.SWIFT_CONNECTION_PARAMS).create_container() + class ProdConfig(Config): """ @@ -138,3 +160,12 @@ def __init__(self): # EXTERNAL SERVICES self.COMPUTE_SERVICE_URL = env('COMPUTE_SERVICE_URL') + + if self.STORAGE_ENV == 'swift': + SWIFT_AUTH_URL = env('SWIFT_AUTH_URL') + SWIFT_USERNAME = env('SWIFT_USERNAME') + SWIFT_KEY = env('SWIFT_KEY') + self.SWIFT_CONTAINER_NAME = env('SWIFT_CONTAINER_NAME') + self.SWIFT_CONNECTION_PARAMS = {'user': SWIFT_USERNAME, + 'key': SWIFT_KEY, + 'authurl': SWIFT_AUTH_URL} diff --git a/pfcon/resources.py b/pfcon/resources.py index 2e5acd9..590f780 100755 --- a/pfcon/resources.py +++ b/pfcon/resources.py @@ -7,10 +7,11 @@ from flask import request, send_file, current_app as app from flask_restful import reqparse, abort, Resource +from swiftclient.exceptions import ClientException from .services import PmanService, ServiceException -from .mount_dir import MountDir -from .swift_store import SwiftStore +from .zip_file_storage import ZipFileStorage +from .swift_storage import SwiftStorage logger = logging.getLogger(__name__) @@ -36,7 +37,10 @@ location='form') parser.add_argument('env', dest='env', type=str, action='append', location='form', default=[]) -parser.add_argument('data_file', dest='data_file', required=True, location='files') + +parser.add_argument('input_dirs', dest='input_dirs', required=False, type=str, + action='append', location='form') +parser.add_argument('data_file', dest='data_file', required=False, location='files') parser_auth = reqparse.RequestParser(bundle_errors=True) parser_auth.add_argument('pfcon_user', dest='pfcon_user', required=True) @@ -45,48 +49,73 @@ class JobList(Resource): """ - Resource representing the list of jobs running on the compute. + Resource representing the list of jobs running on the compute environment. """ def __init__(self): super(JobList, self).__init__() - self.store_env = app.config.get('STORE_ENV') + self.storage_env = app.config.get('STORAGE_ENV') + self.pfcon_innetwork = app.config.get('PFCON_INNETWORK') + self.storebase = app.config.get('STORE_BASE') def get(self): - return { - 'server_version': app.config.get('SERVER_VERSION') + response = { + 'server_version': app.config.get('SERVER_VERSION'), + 'pfcon_innetwork': self.pfcon_innetwork, + 'storage_env': self.storage_env } + if self.pfcon_innetwork: + if self.storage_env == 'swift': + auth_url = app.config['SWIFT_CONNECTION_PARAMS']['authurl'] + response['swift_auth_url'] = auth_url + return response def post(self): args = parser.parse_args() + + if self.pfcon_innetwork: + if args.input_dirs is None: + abort(400, message='input_dirs: field is required') + else: + if request.files['data_file'] is None: + abort(400, message='data_file: field is required') + job_id = args.jid.lstrip('/') + logger.info(f'Received job {job_id}') # process data - if self.store_env == 'mount': - storebase = app.config.get('STORE_BASE') - job_dir = os.path.join(storebase, 'key-' + job_id) - incoming_dir = os.path.join(job_dir, 'incoming') - outgoing_dir = os.path.join(job_dir, 'outgoing') - os.makedirs(incoming_dir, exist_ok=True) - os.makedirs(outgoing_dir, exist_ok=True) - mdir = MountDir(app.config) - - logger.info(f'Received job {job_id}') - try: - d_info = mdir.store_data(job_id, incoming_dir, request.files['data_file']) - except zipfile.BadZipFile as e: - logger.error(f'Error while decompressing and storing job {job_id} data, ' - f'detail: {str(e)}') - abort(400, message='data_file: Bad zip file') - - if self.store_env == 'swift': - swift = SwiftStore(app.config) - d_info = swift.storeData(job_id, 'incoming', request.files['data_file']) + + job_dir = os.path.join(self.storebase, 'key-' + job_id) + incoming_dir = os.path.join(job_dir, 'incoming') + outgoing_dir = os.path.join(job_dir, 'outgoing') + os.makedirs(incoming_dir, exist_ok=True) + os.makedirs(outgoing_dir, exist_ok=True) + + if self.pfcon_innetwork: + if self.storage_env == 'swift': + storage = SwiftStorage(app.config) + try: + d_info = storage.store_data(job_id, incoming_dir, args.input_dirs) + except ClientException as e: + logger.error(f'Error while fetching files from swift and ' + f'storing job {job_id} data, detail: {str(e)}') + abort(400, message='input_dirs: Error fetching files from swift') + else: + if self.storage_env == 'zipfile': + storage = ZipFileStorage(app.config) + data_file = request.files['data_file'] + try: + d_info = storage.store_data(job_id, incoming_dir, data_file) + except zipfile.BadZipFile as e: + logger.error(f'Error while decompressing and storing job {job_id} ' + f'data, detail: {str(e)}') + abort(400, message='data_file: Bad zip file') logger.info(f'Successfully stored job {job_id} input data') # process compute + compute_data = { 'args': args.args, 'args_path_flags': args.args_path_flags, @@ -105,21 +134,20 @@ def post(self): d_compute_response = pman.run_job(job_id, compute_data) except ServiceException as e: abort(e.code, message=str(e)) - return { - 'data': d_info, - 'compute': d_compute_response - }, 201 + + return {'data': d_info, 'compute': d_compute_response}, 201 class Job(Resource): """ - Resource representing a single job running on the compute. + Resource representing a single job running on the compute environment. """ def __init__(self): super(Job, self).__init__() - self.store_env = app.config.get('STORE_ENV') + self.storage_env = app.config.get('STORAGE_ENV') + self.pfcon_innetwork = app.config.get('PFCON_INNETWORK') def get(self, job_id): pman = PmanService.get_service_obj() @@ -132,53 +160,80 @@ def get(self, job_id): } def delete(self, job_id): - if self.store_env == 'mount': - storebase = app.config.get('STORE_BASE') - job_dir = os.path.join(storebase, 'key-' + job_id) - if os.path.isdir(job_dir): - mdir = MountDir() - logger.info(f'Deleting job {job_id} data from store') - mdir.delete_data(job_dir) - logger.info(f'Successfully removed job {job_id} data from store') + storage = None + + if self.pfcon_innetwork: + if self.storage_env == 'swift': + storage = SwiftStorage(app.config) + else: + if self.storage_env == 'zipfile': + storage = ZipFileStorage(app.config) + + storebase = app.config.get('STORE_BASE') + job_dir = os.path.join(storebase, 'key-' + job_id) + if os.path.isdir(job_dir): + logger.info(f'Deleting job {job_id} data from store') + storage.delete_data(job_dir) + logger.info(f'Successfully removed job {job_id} data from store') + pman = PmanService.get_service_obj() try: pman.delete_job(job_id) except ServiceException as e: abort(e.code, message=str(e)) + logger.info(f'Successfully removed job {job_id} from remote compute') return '', 204 class JobFile(Resource): """ - Resource representing a single job data for a job running on the compute. + Resource representing a single job data for a job running on the compute environment. """ def __init__(self): super(JobFile, self).__init__() - self.store_env = app.config.get('STORE_ENV') + self.storage_env = app.config.get('STORAGE_ENV') + self.pfcon_innetwork = app.config.get('PFCON_INNETWORK') def get(self, job_id): - if self.store_env == 'mount': - storebase = app.config.get('STORE_BASE') - job_dir = os.path.join(storebase, 'key-' + job_id) - if not os.path.isdir(job_dir): - abort(404) - outgoing_dir = os.path.join(job_dir, 'outgoing') - if not os.path.exists(outgoing_dir): - os.mkdir(outgoing_dir) - mdir = MountDir(app.config) - logger.info(f'Retrieving job {job_id} output data') - content = mdir.get_data(job_id, outgoing_dir) - logger.info(f'Successfully retrieved job {job_id} output data') - - if self.store_env == 'swift': - swift = SwiftStore(app.config) - content = swift.getData(job_id) - - return send_file(content, download_name=f'{job_id}.zip', - as_attachment=True, mimetype='application/zip') + storebase = app.config.get('STORE_BASE') + job_dir = os.path.join(storebase, 'key-' + job_id) + if not os.path.isdir(job_dir): + abort(404) + outgoing_dir = os.path.join(job_dir, 'outgoing') + if not os.path.exists(outgoing_dir): + os.mkdir(outgoing_dir) + + logger.info(f'Retrieving job {job_id} output data') + + content = b'' + download_name = f'{job_id}.zip' + mimetype = 'application/zip' + + if self.pfcon_innetwork: + job_output_path = request.args.get('job_output_path') + if job_output_path: + if self.storage_env == 'swift': + storage = SwiftStorage(app.config) + content = storage.get_data(job_id, outgoing_dir, + job_output_path=job_output_path) + download_name = f'{job_id}.json' + mimetype = 'application/json' + else: + # if no query parameter passed then the job's zip file is returned + storage = ZipFileStorage(app.config) + content = storage.get_data(job_id, outgoing_dir) + else: + if self.storage_env == 'zipfile': + storage = ZipFileStorage(app.config) + content = storage.get_data(job_id, outgoing_dir) + + logger.info(f'Successfully retrieved job {job_id} output data') + + return send_file(content, download_name=download_name, as_attachment=True, + mimetype=mimetype) class Auth(Resource): diff --git a/pfcon/swift_storage.py b/pfcon/swift_storage.py new file mode 100755 index 0000000..d263597 --- /dev/null +++ b/pfcon/swift_storage.py @@ -0,0 +1,99 @@ +""" +Handle swift-based storage. This is used when pfcon is in-network and +configured to directly download the data from swift object storage. +""" + +import logging +import datetime +import os +import json +import io + +from swiftclient.exceptions import ClientException + +from .base_storage import BaseStorage +from .swiftmanager import SwiftManager + + +logger = logging.getLogger(__name__) + + +class SwiftStorage(BaseStorage): + + def __init__(self, config): + + super().__init__(config) + + self.swift_manager = SwiftManager(config.get('SWIFT_CONTAINER_NAME'), + config.get('SWIFT_CONNECTION_PARAMS')) + + def store_data(self, job_id, job_incoming_dir, data, **kwargs): + """ + Fetch the files with prefixes in the data list from swift storage into the + specified incoming directory. + """ + nfiles = 0 + for swift_path in data: + try: + l_ls = self.swift_manager.ls(swift_path) + except ClientException as e: + logger.error(f'Error while listing swift storage files in {swift_path} ' + f'for job {job_id}, detail: {str(e)}') + raise + for obj_path in l_ls: + try: + contents = self.swift_manager.download_obj(obj_path) + except ClientException as e: + logger.error(f'Error while downloading file {obj_path} from swift ' + f'storage for job {job_id}, detail: {str(e)}') + raise + + local_file_path = obj_path.replace(swift_path, '', 1).lstrip('/') + local_file_path = os.path.join(job_incoming_dir, local_file_path) + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) + with open(local_file_path, 'wb') as f: + f.write(contents) + nfiles += 1 + + logger.info(f'{nfiles} files fetched from swift storage for job {job_id}') + return { + 'jid': job_id, + 'nfiles': nfiles, + 'timestamp': f'{datetime.datetime.now()}', + 'path': job_incoming_dir + } + + def get_data(self, job_id, job_outgoing_dir, **kwargs): + """ + Upload output files from the specified outgoing directory into swift storage + with the prefix specified by job_output_path keyword argument. + Then create job json file ready for transmission to a remote origin. The json + file contains the job_output_path prefix and the list of relative file paths + created in swift storage. + """ + swift_output_path = kwargs['job_output_path'] + swift_rel_file_paths = [] + + for root, dirs, files in os.walk(job_outgoing_dir): + for filename in files: + local_file_path = os.path.join(root, filename) + if not os.path.islink(local_file_path): + rel_file_path = os.path.relpath(local_file_path, job_outgoing_dir) + swift_file_path = os.path.join(swift_output_path, rel_file_path) + try: + if not self.swift_manager.obj_exists(swift_file_path): + with open(local_file_path, 'rb') as f: + self.swift_manager.upload_obj(swift_file_path, f.read()) + except ClientException as e: + logger.error(f'Error while uploading file {swift_file_path} to ' + f'swift storage for job {job_id}, detail: {str(e)}') + raise + except Exception as e: + logger.error(f'Failed to read file {local_file_path} for ' + f'job {job_id}, detail: {str(e)}') + raise + swift_rel_file_paths.append(rel_file_path) + + data = {'job_output_path': swift_output_path, + 'rel_file_paths': swift_rel_file_paths} + return io.BytesIO(json.dumps(data).encode()) diff --git a/pfcon/swift_store.py b/pfcon/swift_store.py deleted file mode 100755 index b4a283d..0000000 --- a/pfcon/swift_store.py +++ /dev/null @@ -1,136 +0,0 @@ -""" -Handle Swift File Storage Option -""" - -import logging -import zipfile -import configparser -from keystoneauth1.identity import v3 -from keystoneauth1 import session -from swiftclient import service as swift_service -from shutil import copyfileobj - -import base64 -import datetime -import os - -logger = logging.getLogger(__name__) - - -class SwiftStore: - - def __init__(self, config=None): - - self.config = config - - def _createSwiftService(self, configPath): - config = configparser.ConfigParser() - f = open(configPath, 'r') - config.readfp(f) - f.close() - - options = { - 'os_auth_url': config['AUTHORIZATION']['osAuthUrl'], - 'application_id': config['SECRET']['applicationId'], - 'application_secret': config['SECRET']['applicationSecret'], - } - - auth_swift = v3.application_credential.ApplicationCredential( - options['os_auth_url'], - application_credential_id=options['application_id'], - application_credential_secret=options['application_secret'] - ) - - session_client = session.Session(auth=auth_swift) - service = swift_service.Connection(session=session_client) - return service - - def storeData(self, key, file_path, input_stream): - """ - Creates an object of the file and stores it into the container as key-value object - """ - - configPath = "/etc/swift/swift-credentials.cfg" - - - swiftService = self._createSwiftService(configPath) - - f = open('/tmp/{}.zip'.format(key), 'wb') - buf = 16*1024 - while 1: - chunk = input_stream.read(buf) - if not chunk: - break - f.write(chunk) - f.close() - - zip_file_contents = open('/tmp/{}.zip'.format(key), mode='rb') - - - - - try: - success = True - filePath = "input/data" - - resp_headers, containers = swiftService.get_account() - listContainers = [d['name'] for d in containers if 'name' in d] - - if key not in listContainers: - swiftService.put_container(key) - resp_headers, containers = swiftService.get_account() - listContainers = [d['name'] for d in containers if 'name' in d] - if key in listContainers: - logger.info('The container was created successfully') - - else: - raise Exception('The container was not created successfully') - - swiftService.put_object( - key, - filePath, - contents=zip_file_contents, - content_type='application/zip' - ) - zip_file_contents.close() - - - - - # Confirm presence of the object in swift - response_headers = swiftService.head_object(key, file_path) - logger.info('The upload was successful') - except Exception as err: - logger.error(f'Error, detail: {str(err)}') - success = False - - #Headers - return { - 'jid': key, - 'nfiles': 'application/zip', - 'timestamp': f'{datetime.datetime.now()}', - 'path': file_path - } - - - def getData(self, container_name): - """ - Gets the data from the Swift Storage, zips and/or encodes it and sends it to the client - """ - - b_delete = False - configPath = "/etc/swift/swift-credentials.cfg" - - - - swiftService = self._createSwiftService(configPath) - - key = "output/data" - success = True - - response_headers, object_contents = swiftService.get_object(container_name, key) - - - - return object_contents - diff --git a/pfcon/swiftmanager.py b/pfcon/swiftmanager.py new file mode 100755 index 0000000..48a892a --- /dev/null +++ b/pfcon/swiftmanager.py @@ -0,0 +1,170 @@ +""" +Swift storage manager module. +""" + +import logging +import os +import time + +from swiftclient import Connection +from swiftclient.exceptions import ClientException + + +logger = logging.getLogger(__name__) + + +class SwiftManager(object): + + def __init__(self, container_name, conn_params): + self.container_name = container_name + # swift storage connection parameters dictionary + self.conn_params = conn_params + # swift storage connection object + self._conn = None + + def get_connection(self): + """ + Connect to swift storage and return the connection object. + """ + if self._conn is not None: + return self._conn + for i in range(5): # 5 retries at most + try: + self._conn = Connection(**self.conn_params) + except ClientException as e: + logger.error(str(e)) + if i == 4: + raise # give up + time.sleep(0.4) + else: + return self._conn + + def create_container(self): + """ + Create the storage container. + """ + conn = self.get_connection() + try: + conn.put_container(self.container_name) + except ClientException as e: + logger.error(str(e)) + raise + + def ls(self, path, **kwargs): + """ + Return a list of objects in the swift storage with the provided path + as a prefix. + """ + b_full_listing = kwargs.get('full_listing', True) + l_ls = [] # listing of names to return + if path: + conn = self.get_connection() + for i in range(5): + try: + # get the full list of objects in Swift storage with given prefix + ld_obj = conn.get_container(self.container_name, + prefix=path, + full_listing=b_full_listing)[1] + except ClientException as e: + logger.error(str(e)) + if i == 4: + raise + time.sleep(0.4) + else: + l_ls = [d_obj['name'] for d_obj in ld_obj] + break + return l_ls + + def path_exists(self, path): + """ + Return True/False if passed path exists in swift storage. + """ + return len(self.ls(path, full_listing=False)) > 0 + + def obj_exists(self, obj_path): + """ + Return True/False if passed object exists in swift storage. + """ + conn = self.get_connection() + for i in range(5): + try: + conn.head_object(self.container_name, obj_path) + except ClientException as e: + if e.http_status == 404: + return False + else: + logger.error(str(e)) + if i == 4: + raise + time.sleep(0.4) + else: + return True + + def upload_obj(self, swift_path, contents, **kwargs): + """ + Upload an object (a file contents) into swift storage. + """ + conn = self.get_connection() + for i in range(5): + try: + conn.put_object(self.container_name, + swift_path, + contents=contents, + **kwargs) + except ClientException as e: + logger.error(str(e)) + if i == 4: + raise + time.sleep(0.4) + else: + break + + def download_obj(self, obj_path, **kwargs): + """ + Download an object from swift storage. + """ + conn = self.get_connection() + for i in range(5): + try: + resp_headers, obj_contents = conn.get_object(self.container_name, + obj_path, **kwargs) + except ClientException as e: + logger.error(str(e)) + if i == 4: + raise + time.sleep(0.4) + else: + return obj_contents + + def copy_obj(self, obj_path, dest_path, **kwargs): + """ + Copy an object to a new destination in swift storage. + """ + conn = self.get_connection() + dest = os.path.join('/' + self.container_name, dest_path.lstrip('/')) + for i in range(5): + try: + conn.copy_object(self.container_name, obj_path, dest, **kwargs) + except ClientException as e: + logger.error(str(e)) + if i == 4: + raise + time.sleep(0.4) + else: + break + + def delete_obj(self, obj_path): + """ + Delete an object from swift storage. + """ + conn = self.get_connection() + for i in range(5): + try: + conn.delete_object(self.container_name, obj_path) + except ClientException as e: + logger.error(str(e)) + if i == 4: + raise + time.sleep(0.4) + else: + break diff --git a/pfcon/mount_dir.py b/pfcon/zip_file_storage.py similarity index 77% rename from pfcon/mount_dir.py rename to pfcon/zip_file_storage.py index 2dfa62a..52b4da9 100755 --- a/pfcon/mount_dir.py +++ b/pfcon/zip_file_storage.py @@ -1,5 +1,6 @@ """ -Handle MountDir file storage. +Handle zip file-based storage. This is used when pfcon is out-of-network and +configured to receive a zip file with the data. """ import logging @@ -7,28 +8,30 @@ import zipfile import os import io -import shutil + +from .base_storage import BaseStorage logger = logging.getLogger(__name__) -class MountDir: +class ZipFileStorage(BaseStorage): - def __init__(self, config=None): + def __init__(self, config): - self.config = config + super().__init__(config) - def store_data(self, job_id, job_incoming_dir, input_stream): + def store_data(self, job_id, job_incoming_dir, data, **kwargs): """ - Unpack and store the files/directories in the input zip stream at the specified - incoming directory. + Unpack and store the files/directories in the input zip stream data at the + specified incoming directory. """ - with zipfile.ZipFile(input_stream, 'r', zipfile.ZIP_DEFLATED) as job_zip: + with zipfile.ZipFile(data, 'r', zipfile.ZIP_DEFLATED) as job_zip: filenames = job_zip.namelist() nfiles = len(filenames) logger.info(f'{nfiles} files to decompress for job {job_id}') job_zip.extractall(path=job_incoming_dir) + return { 'jid': job_id, 'nfiles': nfiles, @@ -36,13 +39,14 @@ def store_data(self, job_id, job_incoming_dir, input_stream): 'path': job_incoming_dir } - def get_data(self, job_id, job_outgoing_dir): + def get_data(self, job_id, job_outgoing_dir, **kwargs): """ Create job zip file ready for transmission to a remote origin from the outgoing directory. """ memory_zip_file = io.BytesIO() nfiles = 0 + with zipfile.ZipFile(memory_zip_file, 'w', zipfile.ZIP_DEFLATED) as job_zip: for root, dirs, files in os.walk(job_outgoing_dir): for filename in files: @@ -58,11 +62,6 @@ def get_data(self, job_id, job_outgoing_dir): else: nfiles += 1 memory_zip_file.seek(0) + logger.info(f'{nfiles} files compressed for job {job_id}') return memory_zip_file - - def delete_data(self, job_dir): - """ - Delete job data from the store. - """ - shutil.rmtree(job_dir) diff --git a/swarm/docker-compose_dev.yml b/swarm/docker-compose_dev.yml index 3600360..14dc993 100755 --- a/swarm/docker-compose_dev.yml +++ b/swarm/docker-compose_dev.yml @@ -45,11 +45,16 @@ services: # pfcon shares the data) into the spawned container. This directory is passed in the # STOREBASE env variable. environment: + - STORAGE_TYPE=host - STOREBASE - SECRET_KEY="w1kxu^l=@pnsf!5piqz6!!5kdcdpo79y6jebbp+2244yjm*#+k" - CONTAINER_ENV=swarm volumes: - /var/run/docker.sock:/var/run/docker.sock:z + deploy: + placement: + constraints: + - "node.role==manager" networks: - remote labels: diff --git a/swarm/docker-compose_dev_innetwork.yml b/swarm/docker-compose_dev_innetwork.yml new file mode 100755 index 0000000..eef5e96 --- /dev/null +++ b/swarm/docker-compose_dev_innetwork.yml @@ -0,0 +1,88 @@ +# https://docs.docker.com/compose/yml/ +# Each service defined in docker-compose.yml must specify exactly one of +# image or build. Other keys are optional, and are analogous to their +# docker run command-line counterparts. +# +# As with docker run, options specified in the Dockerfile (e.g., CMD, +# EXPOSE, VOLUME, ENV) are respected by default - you don't need to +# specify them again in docker-compose.yml. +# + +version: '3.7' + +services: + pfcon: + image: localhost:5000/fnndsc/pfcon:dev + build: + context: .. + args: + ENVIRONMENT: local + stdin_open: true # docker run -i + tty: true # docker run -t + # We need to mount a physical dir in the HOST onto the key store in pfcon. This dir + # is given by the STOREBASE env variable substitution. The keystore can be specified + # by the --storeBase flag during development. + command: ["python", "-m", "pfcon"] + environment: + - APPLICATION_MODE=development + - PFCON_INNETWORK=true + - STORAGE_ENV=swift + volumes: + - ${STOREBASE:?}:/var/local/storeBase:z + - ../pfcon:/app/pfcon:z + - ../tests:/app/tests:z + ports: + - "30006:5005" + depends_on: + - pman + - swift_service + networks: + - remote + labels: + name: "pfcon" + role: "pfcon service" + + pman: + image: ${PMANREPO:?}/pman + # Since pman spins off containers of its own it needs to mount storeBase dir (where + # pfcon shares the data) into the spawned container. This directory is passed in the + # STOREBASE env variable. + environment: + - STORAGE_TYPE=host + - STOREBASE + - SECRET_KEY="w1kxu^l=@pnsf!5piqz6!!5kdcdpo79y6jebbp+2244yjm*#+k" + - CONTAINER_ENV=swarm + volumes: + - /var/run/docker.sock:/var/run/docker.sock:z + deploy: + placement: + constraints: + - "node.role==manager" + networks: + - remote + labels: + name: "pman" + role: "pman service" + + swift_service: + image: ${SWIFTREPO:?}/docker-swift-onlyone + init: true + volumes: + - swift_storage_dev:/srv + environment: + - SWIFT_USERNAME=chris:chris1234 + - SWIFT_KEY=testing + ports: + - "8080:8080" + networks: + - remote + labels: + name: "Swift" + role: "Swift object storage service" + + +networks: + remote: + +volumes: + swift_storage_dev: diff --git a/tests/test_resources.py b/tests/test_resources.py index 21c9bfc..789f7ae 100755 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -36,7 +36,11 @@ def setUp(self): response = self.client.post(url, data=json.dumps(creds), content_type='application/json') self.headers = {'Authorization': 'Bearer ' + response.json['token']} + self.job_dir = '' + def tearDown(self): + if os.path.isdir(self.job_dir): + shutil.rmtree(self.job_dir) # re-enable logging logging.disable(logging.NOTSET) @@ -48,19 +52,15 @@ class TestJobList(ResourceTests): def setUp(self): super().setUp() - self.job_dir = '' with self.app.test_request_context(): self.url = url_for('api.joblist') - def tearDown(self): - if os.path.isdir(self.job_dir): - shutil.rmtree(self.job_dir) - super().tearDown() - def test_get(self): response = self.client.get(self.url, headers=self.headers) self.assertEqual(response.status_code, 200) self.assertTrue('server_version' in response.json) + self.assertFalse(response.json['pfcon_innetwork']) + self.assertEqual(response.json['storage_env'], 'zipfile') def test_post(self): job_id = 'chris-jid-1' @@ -111,7 +111,6 @@ class TestJob(ResourceTests): def setUp(self): super().setUp() - self.job_dir = '' self.compute_data = { 'entrypoint': ['python3', '/usr/local/bin/simplefsapp'], 'args': ['--saveinputmeta', '--saveoutputmeta', '--dir', 'cube'], @@ -125,11 +124,6 @@ def setUp(self): 'type': 'fs' } - def tearDown(self): - if os.path.isdir(self.job_dir): - shutil.rmtree(self.job_dir) - super().tearDown() - def test_get(self): job_id = 'chris-jid-2' self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) @@ -183,29 +177,23 @@ class TestJobFile(ResourceTests): """ Test the JobFile resource. """ - def setUp(self): - super().setUp() - - self.job_dir = '' - - def tearDown(self): - if os.path.isdir(self.job_dir): - shutil.rmtree(self.job_dir) - super().tearDown() def test_get(self): job_id = 'chris-jid-4' self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + with self.app.test_request_context(): url = url_for('api.jobfile', job_id=job_id) outgoing = os.path.join(self.job_dir, 'outgoing') - Path(outgoing).mkdir(parents=True, exist_ok=True) - with open(os.path.join(outgoing, 'test.txt'), 'w') as f: + test_file_path = os.path.join(outgoing, 'out') + Path(test_file_path).mkdir(parents=True, exist_ok=True) + with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: f.write('job input test file') response = self.client.get(url, headers=self.headers) self.assertEqual(response.status_code, 200) + memory_zip_file = io.BytesIO(response.data) with zipfile.ZipFile(memory_zip_file, 'r', zipfile.ZIP_DEFLATED) as job_zip: filenames = job_zip.namelist() self.assertEqual(len(filenames), 1) - self.assertEqual(filenames[0], 'test.txt') + self.assertEqual(filenames[0], 'out/test.txt') diff --git a/tests/test_resources_innetwork.py b/tests/test_resources_innetwork.py new file mode 100755 index 0000000..694af85 --- /dev/null +++ b/tests/test_resources_innetwork.py @@ -0,0 +1,239 @@ + +import logging +from pathlib import Path +import shutil +import os +import io +import time +import zipfile +import json +from unittest import TestCase +from unittest import mock, skip + +from flask import url_for + +from pfcon.app import create_app +from pfcon.services import PmanService, ServiceException +from pfcon.swiftmanager import SwiftManager + + +class ResourceTests(TestCase): + """ + Base class for all the resource tests. + """ + def setUp(self): + # avoid cluttered console output (for instance logging all the http requests) + logging.disable(logging.WARNING) + + self.app = create_app({'PFCON_INNETWORK': True, + 'STORAGE_ENV': 'swift', + 'SWIFT_CONTAINER_NAME': 'users', + 'SWIFT_CONNECTION_PARAMS': { + 'user': 'chris:chris1234', + 'key': 'testing', + 'authurl': 'http://swift_service:8080/auth/v1.0'} + }) + self.client = self.app.test_client() + with self.app.test_request_context(): + # create a header with authorization token + url = url_for('api.auth') + creds = { + 'pfcon_user': self.app.config.get('PFCON_USER'), + 'pfcon_password': self.app.config.get('PFCON_PASSWORD') + } + response = self.client.post(url, data=json.dumps(creds), content_type='application/json') + self.headers = {'Authorization': 'Bearer ' + response.json['token']} + + self.swift_manager = SwiftManager(self.app.config.get('SWIFT_CONTAINER_NAME'), + self.app.config.get('SWIFT_CONNECTION_PARAMS')) + + # upload a file to the Swift storage input path + self.swift_input_path = 'foo/feed/input' + self.swift_output_path = 'foo/feed/output' + with io.StringIO('Test file') as f: + self.swift_manager.upload_obj(self.swift_input_path + '/test.txt', + f.read(), content_type='text/plain') + self.job_dir = '' + + def tearDown(self): + if os.path.isdir(self.job_dir): + shutil.rmtree(self.job_dir) + + # delete files from swift storage + l_ls = self.swift_manager.ls(self.swift_input_path) + for obj_path in l_ls: + self.swift_manager.delete_obj(obj_path) + + l_ls = self.swift_manager.ls(self.swift_output_path) + for obj_path in l_ls: + self.swift_manager.delete_obj(obj_path) + + # re-enable logging + logging.disable(logging.NOTSET) + + +class TestJobList(ResourceTests): + """ + Test the JobList resource. + """ + def setUp(self): + super().setUp() + + with self.app.test_request_context(): + self.url = url_for('api.joblist') + + def test_get(self): + response = self.client.get(self.url, headers=self.headers) + self.assertEqual(response.status_code, 200) + self.assertTrue('server_version' in response.json) + self.assertTrue(response.json['pfcon_innetwork']) + self.assertEqual(response.json['storage_env'], 'swift') + + def test_post(self): + job_id = 'chris-jid-1' + self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + + data = { + 'jid': job_id, + 'entrypoint': ['python3', '/usr/local/bin/simplefsapp'], + 'args': ['--saveinputmeta', '--saveoutputmeta', '--dir', '/share/incoming'], + 'auid': 'cube', + 'number_of_workers': '1', + 'cpu_limit': '1000', + 'memory_limit': '200', + 'gpu_limit': '0', + 'image': 'fnndsc/pl-simplefsapp', + 'type': 'fs', + 'input_dirs': [self.swift_input_path] + } + # make the POST request + response = self.client.post(self.url, data=data, headers=self.headers) + self.assertEqual(response.status_code, 201) + self.assertIn('compute', response.json) + self.assertIn('data', response.json) + self.assertEqual(response.json['data']['nfiles'], 1) + + with self.app.test_request_context(): + pman = PmanService.get_service_obj() + for _ in range(10): + time.sleep(3) + d_compute_response = pman.get_job(job_id) + if d_compute_response['status'] == 'finishedSuccessfully': break + self.assertEqual(d_compute_response['status'], 'finishedSuccessfully') + + # cleanup swarm job + pman.delete_job(job_id) + + +class TestJob(ResourceTests): + """ + Test the Job resource. + """ + def setUp(self): + super().setUp() + + self.compute_data = { + 'entrypoint': ['python3', '/usr/local/bin/simplefsapp'], + 'args': ['--saveinputmeta', '--saveoutputmeta', '--dir', 'cube'], + 'args_path_flags': ['--dir'], + 'auid': 'cube', + 'number_of_workers': '1', + 'cpu_limit': '1000', + 'memory_limit': '200', + 'gpu_limit': '0', + 'image': 'fnndsc/pl-simplefsapp', + 'type': 'fs' + } + + def test_get(self): + job_id = 'chris-jid-2' + self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + incoming = os.path.join(self.job_dir, 'incoming') + Path(incoming).mkdir(parents=True, exist_ok=True) + outgoing = os.path.join(self.job_dir, 'outgoing') + Path(outgoing).mkdir(parents=True, exist_ok=True) + with open(os.path.join(incoming, 'test.txt'), 'w') as f: + f.write('job input test file') + + with self.app.test_request_context(): + # create job + url = url_for('api.job', job_id=job_id) + pman = PmanService.get_service_obj() + pman.run_job(job_id, self.compute_data) + + # make the GET requests + for _ in range(10): + time.sleep(3) + response = self.client.get(url, headers=self.headers) + if response.json['compute']['status'] == 'finishedSuccessfully': break + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json['compute']['status'], 'finishedSuccessfully') + + # cleanup swarm job + pman.delete_job(job_id) + + def test_delete(self): + job_id = 'chris-jid-3' + self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + incoming = os.path.join(self.job_dir, 'incoming') + Path(incoming).mkdir(parents=True, exist_ok=True) + outgoing = os.path.join(self.job_dir, 'outgoing') + Path(outgoing).mkdir(parents=True, exist_ok=True) + with open(os.path.join(incoming, 'test.txt'), 'w') as f: + f.write('job input test file') + + with self.app.test_request_context(): + # create job + url = url_for('api.job', job_id=job_id) + pman = PmanService.get_service_obj() + pman.run_job(job_id, self.compute_data) + + # make the DELETE request + time.sleep(3) + response = self.client.delete(url, headers=self.headers) + self.assertEqual(response.status_code, 204) + + +class TestJobFile(ResourceTests): + """ + Test the JobFile resource. + """ + + def test_get_without_query_parameters(self): + job_id = 'chris-jid-4' + self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + + with self.app.test_request_context(): + url = url_for('api.jobfile', job_id=job_id) + outgoing = os.path.join(self.job_dir, 'outgoing') + test_file_path = os.path.join(outgoing, 'out') + Path(test_file_path).mkdir(parents=True, exist_ok=True) + with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: + f.write('job input test file') + + response = self.client.get(url, headers=self.headers) + self.assertEqual(response.status_code, 200) + memory_zip_file = io.BytesIO(response.data) + with zipfile.ZipFile(memory_zip_file, 'r', zipfile.ZIP_DEFLATED) as job_zip: + filenames = job_zip.namelist() + self.assertEqual(len(filenames), 1) + self.assertEqual(filenames[0], 'out/test.txt') + + def test_get_with_query_parameters(self): + job_id = 'chris-jid-4' + self.job_dir = os.path.join('/var/local/storeBase', 'key-' + job_id) + + with self.app.test_request_context(): + url = url_for('api.jobfile', job_id=job_id) + outgoing = os.path.join(self.job_dir, 'outgoing') + test_file_path = os.path.join(outgoing, 'out') + Path(test_file_path).mkdir(parents=True, exist_ok=True) + with open(os.path.join(test_file_path, 'test.txt'), 'w') as f: + f.write('job input test file') + + response = self.client.get(url, query_string={'job_output_path': self.swift_output_path}, + headers=self.headers) + self.assertEqual(response.status_code, 200) + content = json.loads(response.data.decode()) + self.assertEqual(content['job_output_path'], self.swift_output_path) + self.assertEqual(content['rel_file_paths'], ["out/test.txt"]) diff --git a/unmake.sh b/unmake.sh index a8e799e..9648135 100755 --- a/unmake.sh +++ b/unmake.sh @@ -6,7 +6,7 @@ # # SYNPOSIS # -# unmake.sh [-h] +# unmake.sh [-h] [-N] # [-O ] # [-S ] # @@ -21,6 +21,10 @@ # # unmake.sh # +# Destroy pfcon dev instance operating in-network on Swarm: +# +# unmake.sh -N +# # Destroy pfcon dev instance on Kubernetes: # # unmake.sh -O kubernetes @@ -30,6 +34,11 @@ # # -h # +# -N +# +# Explicitly set pfcon to operate in-network mode (using a swift storage instead of +# a zip file). +# # Optional print usage help. # # -O @@ -49,14 +58,16 @@ declare -i STEP=0 ORCHESTRATOR=swarm print_usage () { - echo "Usage: ./unmake.sh [-h] [-O ] [-S ]" + echo "Usage: ./unmake.sh [-h] [-N] [-O ] [-S ]" exit 1 } -while getopts ":hO:S:" opt; do +while getopts ":hNO:S:" opt; do case $opt in h) print_usage ;; + N) b_pfconInNetwork=1 + ;; O) ORCHESTRATOR=$OPTARG if ! [[ "$ORCHESTRATOR" =~ ^(swarm|kubernetes)$ ]]; then echo "Invalid value for option -- O" @@ -79,18 +90,37 @@ title -d 1 "Setting global exports..." if [ -z ${STOREBASE+x} ]; then STOREBASE=$(pwd)/CHRIS_REMOTE_FS fi + if (( b_pfconInNetwork )) ; then + echo -e "PFCON_INNETWORK=True" | ./boxes.sh + else + echo -e "PFCON_INNETWORK=False" | ./boxes.sh + fi echo -e "ORCHESTRATOR=$ORCHESTRATOR" | ./boxes.sh echo -e "exporting STOREBASE=$STOREBASE " | ./boxes.sh export STOREBASE=$STOREBASE + export SOURCEDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" + echo -e "exporting SOURCEDIR=$SOURCEDIR " | ./boxes.sh windowBottom title -d 1 "Destroying pfcon containerized dev environment on $ORCHESTRATOR" if [[ $ORCHESTRATOR == swarm ]]; then echo "docker stack rm pfcon_dev_stack" | ./boxes.sh ${LightCyan} docker stack rm pfcon_dev_stack + if (( b_pfconInNetwork )) ; then + echo "docker volume rm -f pfcon_dev_stack_swift_storage_dev" + sleep 15 + docker volume rm pfcon_dev_stack_swift_storage_dev + fi elif [[ $ORCHESTRATOR == kubernetes ]]; then - echo "kubectl delete -f kubernetes/pfcon_dev.yaml" | ./boxes.sh ${LightCyan} - kubectl delete -f kubernetes/pfcon_dev.yaml + if (( b_pfconInNetwork )) ; then + echo "kubectl delete -f kubernetes/pfcon_dev_innetwork.yaml" | ./boxes.sh ${LightCyan} + kubectl delete -f kubernetes/pfcon_dev_innetwork.yaml + echo "Removing swift_storage folder $SOURCEDIR/swift_storage" | ./boxes.sh + rm -fr $SOURCEDIR/swift_storage + else + echo "kubectl delete -f kubernetes/pfcon_dev.yaml" | ./boxes.sh ${LightCyan} + kubectl delete -f kubernetes/pfcon_dev.yaml + fi fi echo "Removing STOREBASE tree $STOREBASE" | ./boxes.sh rm -fr $STOREBASE