Skip to content

Commit

Permalink
Merge pull request #140 from jbernal0019/master
Browse files Browse the repository at this point in the history
Implement PFCON in-network with shared filesystem storage
  • Loading branch information
jbernal0019 authored Aug 31, 2023
2 parents 02882b5 + 4e236fb commit 6ecd0c0
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 179 deletions.
4 changes: 4 additions & 0 deletions kubernetes/pfcon_dev_innetwork.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ spec:
env:
- name: APPLICATION_MODE
value: development
- name: PFCON_INNETWORK
value: true
- name: STORAGE_ENV
value: swift
command: ["python"]
args: ["-m", "pfcon"]
volumeMounts:
Expand Down
133 changes: 133 additions & 0 deletions kubernetes/pfcon_dev_innetwork_fs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
apiVersion: v1
kind: Service
metadata:
name: pfcon
labels:
app: pfcon
env: development
spec:
type: NodePort
selector:
app: pfcon
env: development
ports:
- port: 30006
targetPort: 5005
nodePort: 30006

---

apiVersion: apps/v1
kind: Deployment
metadata:
name: pfcon
labels:
app: pfcon
env: development
spec:
replicas: 1
selector:
matchLabels:
app: pfcon
env: development
template:
metadata:
name: pfcon
labels:
app: pfcon
env: development
spec:
initContainers:
- name: init-pfcon
image: busybox:1.32
command: [ 'sh', '-c', "until wget --spider -S -T 2 http://pman:5010/api/v1/ 2>&1 | grep '200 OK'; do echo waiting for pman; done" ]
containers:
- image: localhost:5000/fnndsc/pfcon:dev
name: pfcon
stdin: true
tty: true
ports:
- containerPort: 5005
env:
- name: APPLICATION_MODE
value: development
- name: PFCON_INNETWORK
value: true
- name: STORAGE_ENV
value: filesystem
command: ["python"]
args: ["-m", "pfcon"]
volumeMounts:
- mountPath: "/var/local/storeBase"
name: "storebase"
- mountPath: "/app/pfcon"
name: "pfcon-source"
- mountPath: "/app/tests"
name: "pfcon-tests"
volumes:
- name: "storebase"
hostPath:
path: ${STOREBASE}
- name: "pfcon-source"
hostPath:
path: ${SOURCEDIR}/pfcon
- name: "pfcon-tests"
hostPath:
path: ${SOURCEDIR}/tests

---

apiVersion: v1
kind: Service
metadata:
name: pman
labels:
app: pman
env: production
spec:
selector:
app: pman
env: production
ports:
- port: 5010
targetPort: 5010

---

apiVersion: apps/v1
kind: Deployment
metadata:
name: pman
labels:
app: pman
env: production
spec:
replicas: 1
selector:
matchLabels:
app: pman
env: production
template:
metadata:
name: pman
labels:
app: pman
env: production
spec:
containers:
- image: fnndsc/pman
name: pman
ports:
- containerPort: 5010
# Since pman spins off containers of its own it needs to mount storeBase dir
# (where pfcon shares the data) into the spawned container. This directory is
# passed in the STOREBASE env variable.
env:
- name: STORAGE_TYPE
value: host
- name: SECRET_KEY
value: "anysu^l=@pnsf!5piqz6!!5kdcdpo79y6jebbp+2244yjm*#+k"
- name: STOREBASE
value: ${STOREBASE}
- name: CONTAINER_ENV
value: kubernetes
5 changes: 2 additions & 3 deletions pfcon/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from logging.config import dictConfig
from environs import Env
import os

from importlib.metadata import Distribution
from .swiftmanager import SwiftManager
Expand Down Expand Up @@ -34,10 +35,8 @@ def __init__(self):
if self.STORAGE_ENV != 'zipfile':
raise ValueError(f"Unsupported value '{self.STORAGE_ENV}' for STORAGE_ENV")

if self.STORAGE_ENV == 'filesystem':
self.FILESYSTEM_BASEDIR = env('FILESYSTEM_BASEDIR', '/filesystem')
self.STOREBASE_MOUNT = env('STOREBASE_MOUNT', '/var/local/storeBase')

self.STORE_BASE = env('STOREBASE', '/var/local/storeBase')
self.env = env


Expand Down
64 changes: 23 additions & 41 deletions pfcon/filesystem_storage.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""
Handle filesystem-based (eg. mount directory) storage. This is used when pfcon is
in-network and configured to directly copy the data from a filesystem.
Handle filesystem-based (eg. mounted directory) storage. This is used when pfcon is
in-network and configured to directly access the data from a filesystem.
"""

import logging
import datetime
import os
import json
import io
import shutil


from .base_storage import BaseStorage
Expand All @@ -23,33 +22,17 @@ def __init__(self, config):

super().__init__(config)

self.base_dir = config.get('FILESYSTEM_BASEDIR')
self.fs_mount_base_dir = config.get('STOREBASE_MOUNT')


def store_data(self, job_id, job_incoming_dir, data, **kwargs):
def store_data(self, job_id, job_incoming_dir, data=None, **kwargs):
"""
Copy all the files/folders under each input folder in the specified data list
into the specified incoming directory.
Count the number of files in the specified job incoming directory.
"""
nfiles = 0
for rel_path in data:
abs_path = os.path.join(self.base_dir, rel_path.strip('/'))

for root, dirs, files in os.walk(abs_path):
local_path = root.replace(abs_path, job_incoming_dir, 1)
os.makedirs(local_path, exist_ok=True)

for filename in files:
fs_file_path = os.path.join(root, filename)
try:
shutil.copy(fs_file_path, local_path)
except Exception as e:
logger.error(f'Failed to copy file {fs_file_path} for '
f'job {job_id}, detail: {str(e)}')
raise
nfiles += 1

logger.info(f'{nfiles} files copied from file system for job {job_id}')
for root, dirs, files in os.walk(job_incoming_dir):
nfiles += len(files)

logger.info(f'{nfiles} files found in file system for job {job_id}')
return {
'jid': job_id,
'nfiles': nfiles,
Expand All @@ -59,33 +42,32 @@ def store_data(self, job_id, job_incoming_dir, data, **kwargs):

def get_data(self, job_id, job_outgoing_dir, **kwargs):
"""
Copy output files/folders from the specified outgoing directory into the folder
specified by job_output_path keyword argument (relative to the FS base dir).
List the output files' relative paths from the folder specified by
the job_output_path keyword argument which in turn is relative to the filesystem
base directory (assumed to be the storebase mount directory).
Then create job json file ready for transmission to a remote origin. The json
file contains the job_output_path prefix and the list of relative file paths.
"""
job_output_path = kwargs['job_output_path']
fs_output_path = os.path.join(self.base_dir, job_output_path)
fs_rel_file_paths = []
job_output_path = kwargs['job_output_path']
abs_path = os.path.join(self.fs_mount_base_dir, job_output_path)

for root, dirs, files in os.walk(job_outgoing_dir):
rel_path = os.path.relpath(root, job_outgoing_dir)
for root, dirs, files in os.walk(abs_path):
rel_path = os.path.relpath(root, abs_path)
if rel_path == '.':
rel_path = ''
fs_path = os.path.join(fs_output_path, rel_path)
os.makedirs(fs_path, exist_ok=True)

for filename in files:
local_file_path = os.path.join(root, filename)
if not os.path.islink(local_file_path):
try:
shutil.copy(local_file_path, fs_path)
except Exception as e:
logger.error(f'Failed to copy file {local_file_path} for '
f'job {job_id}, detail: {str(e)}')
raise
fs_rel_file_paths.append(os.path.join(rel_path, filename))

data = {'job_output_path': job_output_path,
data = {'job_output_path': kwargs['job_output_path'],
'rel_file_paths': fs_rel_file_paths}
return io.BytesIO(json.dumps(data).encode())

def delete_data(self, job_dir):
"""
Delete job data from the local storage.
"""
pass
Loading

0 comments on commit 6ecd0c0

Please sign in to comment.