Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic for some CUBE 'env' information to be added as a file #380

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
73 changes: 64 additions & 9 deletions chris_backend/plugininstances/services/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
if settings.DEBUG:
import pdb, pudb, rpudb
from celery.contrib import rdb
from pudb.remote import set_trace


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -435,6 +436,7 @@ def create_zip_file(self, swift_paths):
"""
job_id = self.str_job_id
memory_zip_file = io.BytesIO()
# set_trace(host = "0.0.0.0", port = 6900, term_size = (252, 72))
with zipfile.ZipFile(memory_zip_file, 'w', zipfile.ZIP_DEFLATED) as job_data_zip:
for swift_path in swift_paths:
try:
Expand All @@ -452,27 +454,65 @@ def create_zip_file(self, swift_paths):
f'{obj_path} from swift storage, detail: {str(e)}')
self.c_plugin_inst.error_code = 'CODE08'
raise
if 'pl.meta.json' in obj_path: obj_path = obj_path.replace(
'pl.meta.json',
'pl-parent.meta.json')
zip_path = obj_path.replace(swift_path, '', 1).lstrip('/')
job_data_zip.writestr(zip_path, contents)
memory_zip_file.seek(0)
return memory_zip_file

def chrisEnvMetaFile_contents(self):
'''
Generate a "pl.meta.json" meta file, read contents into memory, and
return the contents
'''
bytes = None
# Meta data to capture...
d_metaInfo : dict = {
'jid' : self.str_job_id,
'previous_id' : self.c_plugin_inst.previous_id,
'id' : self.c_plugin_inst.id
}
str_metaFile : str = '/tmp/%s.json' % self.str_job_id
# First, save to file
with open(str_metaFile, 'w') as fw:
json.dump(d_metaInfo, fw, indent = 4)
fw.close()
# then, read the file into bytes
fr = open(str_metaFile, 'rb')
if fr:
bytes = fr.read()
os.remove(str_metaFile)
return(bytes)

def unpack_zip_file(self, zip_file_content):
"""
Unpack job zip file from the remote into swift storage and register the
extracted files with the DB.

Inject at this point into the unpack file payload, a file called "chris.env"
that contains some CUBE environmental information. This provides a test
mechanism for communicating to a downstream plugin information about its
parent, such as the parent's pluginInstanceID.
"""
job_id = self.str_job_id
swift_filenames = []
# set_trace(host = "0.0.0.0", port = 6900, term_size = (252, 72))
try:
memory_zip_file = io.BytesIO(zip_file_content)
with zipfile.ZipFile(memory_zip_file, 'r', zipfile.ZIP_DEFLATED) as job_zip:
filenames = job_zip.namelist()
filenames.append('pl.meta.json')
logger.info(f'{len(filenames)} files to decompress for job {job_id}')
output_path = self.c_plugin_inst.get_output_path() + '/'
for fname in filenames:
content = job_zip.read(fname)
if fname != 'pl.meta.json':
content = job_zip.read(fname)
else:
content = self.chrisEnvMetaFile_contents()
swift_fname = output_path + fname.lstrip('/')
# logger.info(f'fname {fname}; swift_fname {swift_fname}')
try:
self.swift_manager.upload_obj(swift_fname, content)
except ClientException as e:
Expand Down Expand Up @@ -502,12 +542,22 @@ def save_plugin_instance_final_status(self):

def _handle_app_unextpath_parameters(self, unextpath_parameters_dict):
"""
Internal method to handle parameters of type 'unextpath' passed to the plugin
instance app.
Internal method to handle parameters of type 'unextpath' passed to the
plugin instance app.

NOTE:

Full swift path names are now preserved in the copy process, allowing
for each copy argument to be preserved in its own directory tree in the
destination.

NB: This preservation could exhaust DB string lengths!
"""
job_id = self.str_job_id
outputdir = self.c_plugin_inst.get_output_path()
obj_output_path_list = []

str_sourceTraceDir : str = ''
job_id : str = self.str_job_id
outputdir : str = self.c_plugin_inst.get_output_path()
obj_output_path_list : list = []
for param_flag in unextpath_parameters_dict:
# each parameter value is a string of one or more paths separated by comma
path_list = unextpath_parameters_dict[param_flag].split(',')
Expand All @@ -520,9 +570,14 @@ def _handle_app_unextpath_parameters(self, unextpath_parameters_dict):
self.c_plugin_inst.error_code = 'CODE06'
raise
for obj in obj_list:
obj_output_path = obj.replace(path.rstrip('/'), outputdir, 1)
if not obj_output_path.startswith(outputdir + '/'):
obj_output_path = outputdir + '/' + obj.split('/')[-1]
# Uncomment the following to fire up a trace event, accessible via
# telnet localhost 6900
# Note, you might need to change the term_size on an ad-hoc manner
# set_trace(host = "0.0.0.0", port = 6900, term_size = (223, 60))
str_sourceTraceDir = path.rstrip('/').replace('/', '_')

obj_output_path = outputdir + '/' + str_sourceTraceDir + '/' + '/'.join(obj.split('/')[2:])

try:
if not self.swift_manager.obj_exists(obj_output_path):
self.swift_manager.copy_obj(obj, obj_output_path)
Expand Down
2 changes: 1 addition & 1 deletion chris_backend/plugininstances/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def test_integration_ts_plugin_instance_create_success(self):
b_checkAgain = False
currentLoop += 1
self.assertEqual(pl_inst.status, 'finishedSuccessfully')
self.assertEqual(pl_inst.files.count(), 3)
self.assertEqual(pl_inst.files.count(), 4)

# delete files from swift storage
self.swift_manager.delete_obj(path)
Expand Down
3 changes: 2 additions & 1 deletion postscript.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
fnndsc/pl-pfdorun, \
fnndsc/pl-mgz2imageslices, \
fnndsc/pl-multipass, \
fnndsc/pl-heatmap
fnndsc/pl-heatmap, \
fnndsc/pl-pfdo_med2img
"

#
Expand Down