From 69ce8b8c9b0c068fc71b48c283029bb5d9fd5aff Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Thu, 12 Sep 2024 14:07:21 -0400 Subject: [PATCH] WIP feed rename changes --- build/lib/control/action.py | 713 ++++++++++++++++++++++++++++++++++++ build/lib/control/filter.py | 80 ++++ build/lib/control/jobber.py | 182 +++++++++ build/lib/dylld.py | 380 +++++++++++++++++++ build/lib/logic/behavior.py | 30 ++ build/lib/state/data.py | 311 ++++++++++++++++ control/action.py | 20 +- dylld.py | 2 +- 8 files changed, 1713 insertions(+), 5 deletions(-) create mode 100644 build/lib/control/action.py create mode 100644 build/lib/control/filter.py create mode 100644 build/lib/control/jobber.py create mode 100644 build/lib/dylld.py create mode 100644 build/lib/logic/behavior.py create mode 100644 build/lib/state/data.py diff --git a/build/lib/control/action.py b/build/lib/control/action.py new file mode 100644 index 0000000..606bfd6 --- /dev/null +++ b/build/lib/control/action.py @@ -0,0 +1,713 @@ +str_about = ''' + The action module provides functionality to run individual + plugins as well as "pipelines" of plugins. + + This module is the contact "surface" between dypi and a CUBE + instance. Control/manipulation of the ChRIS instance is effected + by a set of CLI scripts that this module creates and then executes. + + NOTE: This module is "fragily" dependent on python-chrisclient and + caw! Changes in those modules could break things here rather + completely. +''' + +from . import jobber +from state import data +import os +os.environ['XDG_CONFIG_HOME'] = '/tmp' +import re +import pudb +import json +from argparse import ArgumentParser, Namespace +from chrisclient import client +import time + +class PluginRun: + ''' + A class wrapper about the CLI tool "chrispl-run" that POSTs a pl-shexec + to CUBE. + ''' + def __init__(self, *args, **kwargs): + self.env = None + self.plugin = '' + self.shell : jobber.Jobber = jobber.Jobber({ + 'verbosity' : 1, + 'noJobLogging': True + }) + self.attachToPluginID : str = '' + self.options : Namespace = None + for k, v in kwargs.items(): + if k == 'attachToPluginID' : self.attachToPluginID = v + if k == 'env' : self.env = v + if k == 'options' : self.options = v + + self.l_runCMDresp : list = [] + self.l_branchInstanceID : list = [] + + def PLpfdorun_args(self, str_input : str) -> dict: + ''' + Return the argument string pertinent to the pl-pfdorun plugin + ''' + # pudb.set_trace() + str_filter : str = "" + # Remove any '*' and/or '/' chars from pattern search. This will + # transform a string of '**/*dcm" to just 'dcm', suitable for pl-shexec + str_ff : str = re.subn(r'[*/]', '', self.options.pattern)[0] + if not self.options.inNode: + str_filter = "--fileFilter=%s" % str_input + else: + str_filter = "--dirFilter=%s" % str_input + if len(str_ff): str_filter += ";--fileFilter=%s" % str_ff + + str_args : str = """ + %s; + --exec=cp %%inputWorkingDir/%%inputWorkingFile %%outputWorkingDir/%%inputWorkingFile; + --noJobLogging; + --verbose=5; + --pftelDB=%s; + --title=%s; + --previous_id=%s + """ % (str_filter, self.options.pftelDB, str_input, self.env.CUBE.parentPluginInstanceID) + + str_args = re.sub(r';\n.*--', ';--', str_args) + str_args = str_args.strip() + return { + 'args': str_args + } + + def chrispl_onCUBEargs(self): + ''' + Return a string specifying the CUBE instance + ''' + return { + 'onCUBE': json.dumps(self.env.CUBE.onCUBE()) + } + + def chrispl_run_cmd(self, str_inputData : str) -> dict: + ''' + Return the CLI for the chrispl_run + ''' + str_cmd = """chrispl-run --plugin name=pl-shexec --args="%s" --onCUBE %s""" % ( + self.PLpfdorun_args(str_inputData)['args'], + json.dumps(self.chrispl_onCUBEargs()['onCUBE'], indent = 4) + ) + str_cmd = str_cmd.strip().replace('\n', '') + return { + 'cmd' : str_cmd + } + + def __call__(self, str_input : str, **kwargs) ->dict: + ''' + Copy the to the output using pl-pfdorun. If the in-node + self.options.inNode is true, perform a bulk copy of all files in the + passed directory that conform to the filter. + ''' + # Remove the '/incoming/' from the str_input + str_inputTarget : str = str_input.split('/')[-1] + d_PLCmd : dict = self.chrispl_run_cmd(str_inputTarget) + str_PLCmd : str = d_PLCmd['cmd'] + str_PLCmdfile : str = '/tmp/%s.sh' % str_inputTarget + branchID : int = -1 + b_status : bool = False + + str_append : str = "" + for k,v in kwargs.items(): + if k == 'append' : str_append = v + + str_PLCmd += " " + str_append + if self.options: + str_scriptDir : str = '%s/%s' % (self.options.outputdir, str_inputTarget) + os.makedirs(str_scriptDir, exist_ok = True) + str_PLCmdfile = '%s/%s/copy.sh' % (self.options.outputdir, str_inputTarget) + + with open(str_PLCmdfile, 'w') as f: + f.write('#!/bin/bash\n') + f.write(str_PLCmd) + os.chmod(str_PLCmdfile, 0o755) + d_runCMDresp : dict = self.shell.job_run(str_PLCmdfile) + if not d_runCMDresp['returncode']: + b_status = True + self.l_runCMDresp.append(d_runCMDresp) + branchID : int = d_runCMDresp['stdout'].split()[2] + self.l_branchInstanceID.append(branchID) + else: + b_status = False + + return { + 'status' : b_status, + 'run' : d_runCMDresp, + 'input' : str_input, + 'branchInstanceID' : branchID + } + +class LLDcomputeflow: + ''' + A class to create / manage the LLD compute flow + ''' + + def __init__(self, *args, **kwargs): + self.env : data.env = None + self.options : Namespace = None + + for k, v in kwargs.items(): + if k == 'env' : self.env = v + if k == 'options' : self.options = v + + self.cl : client.Client = None + self.cl = client.Client( + self.env.CUBE('url'), + self.env.CUBE('username'), + self.env.CUBE('password') + ) + self.d_pipelines : dict = self.cl.get_pipelines() + self.pltopo : int = self.cl.get_plugins({'name': 'pl-topologicalcopy'}) + self.newTreeID : int = -1 + self.ld_workflowhist : list = [] + self.ld_topologicalNode : dict = {'data': []} + + def pluginInstanceID_findWithTitle(self, + d_workflowDetail : dict, + node_title : str + ) -> int: + """ + Determine the plugin instance id in the `d_workflowDetail` that has + title substring . If the d_workflowDetail is simply a plugin + instance, return its id provided it has the . + + Args: + d_workflowDetail (dict): workflow detail data structure + node_title (str): the node to find + + Returns: + int: id of the found node, or -1 + """ + def plugin_hasTitle(d_plinfo : dict, title : str) -> bool: + """ + Does this node (`d_plinfo`) have this `title`? + + Args: + d_plinfo (dict): the plugin data description + title (str): the name of this node + + Returns: + bool: yay or nay + """ + + nonlocal pluginIDwithTitle + if title.lower() in d_plinfo['title'].lower(): + pluginIDwithTitle = d_plinfo['id'] + return True + else: + return False + + pluginIDwithTitle : int = -1 + d_plinfo : dict = {} + if 'data' in d_workflowDetail: + for d_plinfo in d_workflowDetail['data']: + if plugin_hasTitle(d_plinfo, node_title): break + else: + plugin_hasTitle(d_workflowDetail, node_title) + + return pluginIDwithTitle + + def waitForNodeInWorkflow(self, + d_workflowDetail : dict, + node_title : str, + **kwargs + ) -> dict: + """ + Wait for a node in a workflow to transition to a finishedState + + Args: + d_workflowDetail (dict): the workflow in which the node + exists + node_title (str): the title of the node to find + + kwargs: + waitPoll = the polling interval in seconds + totalPolls = total number of polling before abandoning; + if this is 0, then poll forever. + + Future: expand to wait on list of node_titles + + Returns: + dict: _description_ + """ + waitPoll : int = 5 + totalPolls : int = 100 + pollCount : int = 0 + b_finished : bool = False + waitOnPluginID : int = self.pluginInstanceID_findWithTitle( + d_workflowDetail, node_title + ) + str_pluginStatus: str = 'unknown' + d_plinfo : dict = {} + + for k,v in kwargs.items(): + if k == 'waitPoll': waitPoll = v + if k == 'totalPolls': totalPolls = v + + if waitOnPluginID >= 0: + while 'finished' not in str_pluginStatus.lower() and \ + pollCount <= totalPolls : + d_plinfo = self.cl.get_plugin_instance_by_id(waitOnPluginID) + str_pluginStatus = d_plinfo['status'] + time.sleep(waitPoll) + if totalPolls: pollCount += 1 + if 'finished' in d_plinfo['status']: + b_finished = d_plinfo['status'] == 'finishedSuccessfully' + return { + 'finished' : b_finished, + 'status' : str_pluginStatus, + 'workflow' : d_workflowDetail, + 'plinst' : d_plinfo, + 'polls' : pollCount, + 'plid' : waitOnPluginID + } + + def pluginParameters_setInNodes(self, + d_piping : dict, + d_pluginParameters : dict + ) -> dict: + """ + Override default parameters in the `d_piping` + + Args: + d_piping (dict): the current default parameters for the + plugins in a pipeline + d_pluginParameters (dict): a list of plugins and parameters to + set in the response + + Returns: + dict: a new piping structure with changes to some parameter values + if required. If no d_pluginParameters is passed, simply + return the piping unchanged. + + """ + for pluginTitle,d_parameters in d_pluginParameters.items(): + for piping in d_piping: + if pluginTitle in piping.get('title'): + for k,v in d_parameters.items(): + for d_default in piping.get('plugin_parameter_defaults'): + if k in d_default.get('name'): + d_default['default'] = v + return d_piping + + def pipelineWithName_getNodes( + self, + str_pipelineName : str, + d_pluginParameters : dict = {} + ) -> dict : + """ + Find a pipeline that contains the passed name + and if found, return a nodes dictionary. Optionally set relevant + plugin parameters to values described in + + + Args: + str_pipelineName (str): the name of the pipeline to find + d_pluginParameters (dict): a set of optional plugin parameter + overrides + + Returns: + dict: node dictionary (name, compute env, default parameters) + and id of the pipeline + """ + # pudb.set_trace() + id_pipeline : int = -1 + ld_node : list = [] + d_pipeline : dict = self.cl.get_pipelines({'name': str_pipelineName}) + if 'data' in d_pipeline: + id_pipeline : int = d_pipeline['data'][0]['id'] + d_response : dict = self.cl.get_pipeline_default_parameters( + id_pipeline, {'limit': 1000} + ) + if 'data' in d_response: + ld_node = self.pluginParameters_setInNodes( + self.cl.compute_workflow_nodes_info(d_response['data'], True), + d_pluginParameters) + for piping in ld_node: + if piping.get('compute_resource_name'): + del piping['compute_resource_name'] + return { + 'nodes' : ld_node, + 'id' : id_pipeline + } + + def workflow_schedule(self, + inputDataNodeID : str, + str_pipelineName : str, + d_pluginParameters : dict = {} + ) -> dict: + """ + Schedule a workflow that has name off a given node id + of . + + Args: + inputDataNodeID (str): id of parent node + str_pipelineName (str): substring of workflow name to connect + d_pluginParameters (dict): optional structure of default parameter + overrides + + Returns: + dict: result from calling the client `get_workflow_plugin_instances` + """ + d_pipeline : dict = self.pipelineWithName_getNodes( + str_pipelineName, d_pluginParameters + ) + d_workflow : dict = self.cl.create_workflow( + d_pipeline['id'], + { + 'previous_plugin_inst_id' : inputDataNodeID, + 'nodes_info' : json.dumps(d_pipeline['nodes']) + }) + d_workflowInst : dict = self.cl.get_workflow_plugin_instances( + d_workflow['id'], {'limit': 1000} + ) + self.ld_workflowhist.append({ + 'name' : str_pipelineName, + 'pipeline' : d_pipeline, + 'previous_plugin_inst_id' : inputDataNodeID, + 'pipeline_plugins' : d_workflowInst + }) + return d_workflowInst + + def topologicalNode_run(self, + str_nodeTitle : str, + l_nodes : list, + str_filterArgs + ) -> dict: + """ + Perform a toplogical join between nodes + + Args: + str_nodeTitle (str): title of this join node + l_nodes (list): list of node ids to join + (logical parent is node[0]) + str_filterArgs (_type_): CLI filter arguments + + Returns: + dict: the plugin instance creation data structure + """ + idTopo : int = self.pltopo['data'][0]['id'] + d_plInstTopo : dict = self.cl.create_plugin_instance( + idTopo, + { + 'filter' : str_filterArgs, + 'plugininstances' : ','.join(map(str, l_nodes)), + 'title' : str_nodeTitle, + 'previous_id' : l_nodes[0] + } + ) + self.ld_topologicalNode['data'].append(d_plInstTopo) + return d_plInstTopo + + def nodes_join(self, str_title : str, l_nodes : list, str_joinArgs : str): + d_topological_run : dict = self.topologicalNode_run( + str_title, l_nodes, str_joinArgs + ) + d_topological_done : dict = self.waitForNodeInWorkflow( + d_topological_run, + str_title + ) + return d_topological_done + + def parentNode_isFinished(self, *args) -> bool: + """ + Check if the parent node is finished at this instance. Return + appropriate bool. + + Returns: + bool: is parent done? True or False + + """ + d_parent : dict = None + b_finished : bool = False + if len(args) : d_parent = args[0] + if not d_parent : b_finished = True + else : b_finished = d_parent['finished'] + return b_finished + + def parentNode_IDappend(self, l_nodes : list, *args) -> list: + """ + Append the node ID of the parent in the *args to l_nodes and + return the new list + + Args: + l_nodes (list): a list of node IDs + + Returns: + list: the parent id appended to the end of the list + """ + d_parent : dict = None + if len(args): + d_parent = args[0] + l_nodes.append(self.parentNode_IDget(*args)) + return l_nodes + + def parentNode_IDget(self, *args) -> int: + """ + + Simply get the plugin instance of the passed parent node + + Returns: + int: parent plugin instance id of passed `d_parent` structure + """ + id : int = -1 + d_parent : dict = None + if len(args): + d_parent = args[0] + id = d_parent['plinst']['id'] + return id + + def pluginID_findInWorkflowDesc(self, tp_workflowAndNode : tuple) -> int : + """ + Given a tuple of (, ) substrings, + return the corresponding plugin instance id of node + in workflow . Handle the special case + when the "workflow" is a "topological" node. + + Args: + tp_workflowAndNode (tuple): two tuple element containing substrings + describing a workflow and a node within + that workflow + + Returns: + int: the plugin instance ID of the found node, otherwise -1 + """ + pluginID : int = -1 + l_hit : list = [] + workflow = None + if type(tp_workflowAndNode) == int: + return tp_workflowAndNode + str_workflow, str_node = tp_workflowAndNode + + # pudb.set_trace() + if str_workflow.lower() == 'topological': + workflow = self.ld_topologicalNode + else: + filterHit = filter(lambda p: str_workflow in p['name'], self.ld_workflowhist) + workflow = list(filterHit)[0]['pipeline_plugins'] + if workflow: + pluginID = self.pluginInstanceID_findWithTitle( + workflow, str_node + ) + return pluginID + + def nodeIDs_verify(self, l_nodeID : list) -> list[int]: + """ + + Verify that a list of contains only int + types. This will map any 'distalNodeIDs' that are string + tuples of (, ) to the corrsponding + plugin instance id + + + Args: + l_nodeID (list): node list to verify + + Returns: + list: list containing only node IDs + """ + l_nodeID: list[int] = [self.pluginID_findInWorkflowDesc(x) for x in l_nodeID] + return l_nodeID + + def flow_executeAndBlockUntilNodeComplete( + self, + *args, + **kwargs, + ) -> dict: + """ + Execute a workflow identified by a (sub string) in its + by anchoring it to in the + feed/compute tree. This can be supplied in the + kwargs, or if omitted, then the "parent" node passed in args[0] + is assumed to be the connector. + + Once attached to a node, the whole workflow is scheduled. This + workflow will have N>=1 compute nodes, each identified by a + title. This method will only "return" to a caller when one of + these nodes with 'waitForNodeWithTitle' enters the finished + state. Note that this state can be 'finishedSuccessfully' or + 'finishedWithError'. + + Possible future extension: block until node _list_ complete + """ + d_prior : dict = None + str_workflowTitle : str = "no workflow title" + attachToNodeID : int = -1 + str_blockNodeTitle : str = "no node title" + b_canFlow : bool = False + d_pluginParameters : dict = {} + d_ret : dict = {} + + for k, v in kwargs.items(): + if k == 'workflowTitle' : str_workflowTitle = v + if k == 'attachToNodeID' : attachToNodeID = v + if k == 'waitForNodeWithTitle' : str_blockNodeTitle = v + if k == 'pluginParameters' : d_pluginParameters = v + + if self.parentNode_isFinished(*args): + if attachToNodeID == -1: + attachToNodeID = self.parentNode_IDget(*args) + d_ret = self.waitForNodeInWorkflow( + self.workflow_schedule( + attachToNodeID, + str_workflowTitle, + d_pluginParameters + ), + str_blockNodeTitle, + **kwargs + ) + if len(args): + d_ret['prior'] = args[0] + else: + d_ret['prior'] = None + return d_ret + + def flows_connect( + self, + *args, + **kwargs) -> dict: + """ + Perform a toplogical join by using the args[0] as logical + parent and connect this parent to a list of distalNodeIDs + + + Returns: + dict: data structure on the nodes_join operation + """ + d_prior : dict = None + str_joinNodeTitle : str = "no title specified for topo node" + l_nodeID : list = [] + str_topoJoinArgs : str = "" + b_canFlow : bool = False + d_ret : dict = {} + b_invertOrder : bool = False + + # pudb.set_trace() + for k, v in kwargs.items(): + if k == 'connectionNodeTitle' : str_joinNodeTitle = v + if k == 'distalNodeIDs' : l_nodeID = v + if k == 'invertIDorder' : b_invert = v + if k == 'topoJoinArgs' : str_topoJoinArgs = v + + if self.parentNode_isFinished(*args): + l_nodeID = self.parentNode_IDappend(l_nodeID, *args) + l_nodeID = self.nodeIDs_verify(l_nodeID) + if b_invertOrder: l_nodeID.reverse() + d_ret = self.nodes_join( + str_joinNodeTitle, + l_nodeID, + str_topoJoinArgs + ) + if len(args): + d_ret['prior'] = args[0] + else: + d_ret['prior'] = None + return d_ret + + def computeFlow_build(self) -> dict: + """The main controller for the compute flow logic + + Somewhat pedantically, this method demonstrates how to inject + override parameters for certain plugin parameters in the + workflow. + + Returns: + dict: a composite structure of the last call executed. + """ + + self.env.set_trace() + totalPolls:int = 100 if not self.options.notimeout else 0 + + d_ret : dict = \ + self.flow_executeAndBlockUntilNodeComplete( + self.flows_connect( + self.flow_executeAndBlockUntilNodeComplete( + self.flows_connect( + self.flow_executeAndBlockUntilNodeComplete( + self.flows_connect( + self.flow_executeAndBlockUntilNodeComplete( + attachToNodeID = self.newTreeID, + workflowTitle = 'Leg Length Discrepency inference on DICOM inputs v20230324-1 using CPU', + waitForNodeWithTitle = 'heatmaps', + totalPolls = totalPolls, + pluginParameters = { + 'dcm-to-mha' : { + 'imageName' : 'composite.png', + 'rotate' : '90', + 'pftelDB' : self.options.pftelDB + }, + 'generate-landmark-heatmaps' : { + 'heatmapThreshold' : '0.5', + 'imageType' : 'jpg', + 'compositeWeight' : '0.3,0.7', + 'pftelDB' : self.options.pftelDB + } + } + ), + connectionNodeTitle = 'mergeDICOMSwithInference', + distalNodeIDs = [self.newTreeID], + topoJoinArgs = '\.dcm$,\.csv$' + ), + workflowTitle = 'Leg Length Discrepency prediction formatter v20230324', + waitForNodeWithTitle = 'landmarks-to-json', + totalPolls = totalPolls, + pluginParameters = { + 'landmarks-to-json' : { + 'pftelDB' : self.options.pftelDB + } + } + ), + connectionNodeTitle = 'mergeJPGSwithInference', + distalNodeIDs = [('Leg Length Discrepency inference', 'heatmaps')], + topoJoinArgs = '\.jpg$,\.json$' + ), + workflowTitle = 'Leg Length Discrepency measurements on image v20230324', + waitForNodeWithTitle = 'measure-leg-segments', + totalPolls = 0, + pluginParameters = { + 'measure-leg-segments' : { + 'pftelDB' : self.options.pftelDB + } + } + ), + connectionNodeTitle = 'mergeMarkedJPGSwithDICOMS', + distalNodeIDs = [('Topological', 'mergeDICOMSwithInference')], + topoJoinArgs = '\.dcm$,\.*$' + ), + workflowTitle = 'PNG-to-DICOM and push to PACS v20230324', + waitForNodeWithTitle = 'pacs-push', + totalPolls = totalPolls, + pluginParameters = { + 'image-to-DICOM' : { + 'pftelDB' : self.options.pftelDB + }, + 'pacs-push' : { + 'pftelDB' : self.options.pftelDB, + 'orthancUrl' : self.env.orthanc('url'), + 'username' : self.env.orthanc('username'), + 'password' : self.env.orthanc('password'), + 'pushToRemote' : self.env.orthanc('remote') + } + } + ) + # pudb.set_trace() + return d_ret + + def __call__(self, filteredCopyInstanceID : int) -> dict: + """ Execute/manage the LLD compute flow + + + Args: + filteredCopyInstanceID (int): the plugin instance ID in the feed tree + from which to grow the compute flow + + Returns: + dict: the compute flow data structure + """ + self.newTreeID : str = int(filteredCopyInstanceID) + d_computeFlow : dict = self.computeFlow_build() + return d_computeFlow + diff --git a/build/lib/control/filter.py b/build/lib/control/filter.py new file mode 100644 index 0000000..5c13dbe --- /dev/null +++ b/build/lib/control/filter.py @@ -0,0 +1,80 @@ +str_about = ''' + A simple class that provides rudimentary filtering on some input + directory for files conforming to some pattern. The primary purpose + of this class is to provide an alternative to the built-in chris_plugin + 'PathMapper' object. +''' + + +from argparse import ArgumentParser, Namespace +from pathlib import Path +import pfmisc +import glob +import os + +class PathFilter: + ''' + A simple filter class that operates on directories to catalog + some filtered subset of the input filesystem space. + ''' + + def __init__(self, inputdir, outputdir, *args, **kwargs): + """Main constructor + """ + + self.inputdir : Path = inputdir + self.outputdir : Path = outputdir + self.glob : str = '*' + self.l_files : list = [] + self.LOG : pfmisc.debug = None + self.b_filesOnly : bool = False + + for k,v in kwargs.items(): + if k == 'glob' : self.glob = v + if k == 'logger' : self.LOG = v + if k == 'only_files' : self.b_filesOnly = True + + self.inputdir_filter(self.inputdir) + + def __iter__(self): + return PathIterator(self) + + def log(self, message, **kwargs): + if self.LOG: self.LOG(message) + + def inputdir_filter(self, input: Path) -> list: + ''' + Filter the files in Path according to the passed options.pattern -- + mostly for debugging + ''' + + self.LOG("Parent directory contains at root level") + l_ls = [self.LOG(f) for f in os.listdir(str(input))] + self.LOG("Filtering files in %s containing '%s'" % (str(input), self.glob)) + str_glob : str = '%s/%s' % (str(self.inputdir), self.glob) + self.LOG("glob = %s" % str_glob) + + self.l_files = glob.glob(str_glob) + + l_glob = [self.LOG(f) for f in self.l_files] + return self.l_files + +class PathIterator: + ''' + An iterator over the PathFilter class + ''' + + def __init__(self, pathfilter): + + self._pathfilter = pathfilter + self._index = 0 + + def __next__(self): + ''' + Iterate over the PathFilter self.files list + ''' + if self._index < len(self._pathfilter.l_files): + result = (self._pathfilter.l_files[self._index]) + self._index += 1 + return (result, self._pathfilter.outputdir) + raise StopIteration diff --git a/build/lib/control/jobber.py b/build/lib/control/jobber.py new file mode 100644 index 0000000..c064725 --- /dev/null +++ b/build/lib/control/jobber.py @@ -0,0 +1,182 @@ +str_about = ''' + This module provides the Jobber class -- an object designed to simplify/ + abstract running CLI. The actual command to run is specified as a string, + and the Jobber class executes the command, returning to the caller a + dictionary structure containing misc info such as , , and + . +''' + +import subprocess +import os +os.environ['XDG_CONFIG_HOME'] = '/tmp' +import pudb +import json + +class Jobber: + + def __init__(self, d_args : dict): + """Constructor for the jobber class. + + Args: + d_args (dict): a dictionary of "arguments" (parameters) for the + object. + """ + self.args = d_args.copy() + if not 'verbosity' in self.args.keys(): self.args['verbosity'] = 0 + if not 'noJobLogging' in self.args.keys(): self.args['noJobLogging'] = False + + def dict2JSONcli(self, d_dict : dict) -> str: + """Convert a dictionary into a CLI conformant JSON string. + + An input dictionary of + + { + 'key1': 'value1', + 'key2': 'value2' + } + + is converted to a string: + + "{\"key1\":\"value1\",\"key2\":\"value2\"}" + + Args: + d_dict (dict): a python dictionary to convert + + Returns: + str: CLI equivalent string. + """ + + str_JSON = json.dumps(d_dict) + str_JSON = str_JSON.replace('"', r'\"') + return str_JSON + + def dict2cli(self, d_dict : dict) -> str: + """Convert a dictionary into a CLI conformant JSON string. + + An input dictionary of + + { + 'key1': 'value1', + 'key2': 'value2', + 'key3': true, + 'key4': false + } + + is converted to a string: + + "--key1 value1 --key2 value2 --key3" + + Args: + d_dict (dict): a python dictionary to convert + + Returns: + str: CLI equivalent string. + """ + str_cli : str = "" + for k,v in d_dict.items(): + if type(v) == bool: + if v: + str_cli += '--%s ' % k + elif len(v): + str_cli += '--%s %s ' % (k, v) + return str_cli + + def job_run(self, str_cmd): + """ + Running some CLI process via python is cumbersome. The typical/easy + path of + + os.system(str_cmd) + + is deprecated and prone to hidden complexity. The preferred + method is via subprocess, which has a cumbersome processing + syntax. Still, this method runs the `str_cmd` and returns the + stderr and stdout strings as well as a returncode. + Providing readtime output of both stdout and stderr seems + problematic. The approach here is to provide realtime + output on stdout and only provide stderr on process completion. + """ + d_ret : dict = { + 'stdout': "", + 'stderr': "", + 'cmd': "", + 'cwd': "", + 'returncode': 0 + } + str_stdoutLine : str = "" + str_stdout : str = "" + + p = subprocess.Popen( + str_cmd.split(), + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + ) + + # Realtime output on stdout + while True: + stdout = p.stdout.readline() + if p.poll() is not None: + break + if stdout: + str_stdoutLine = stdout.decode() + if int(self.args['verbosity']): + print(str_stdoutLine, end = '') + str_stdout += str_stdoutLine + d_ret['cmd'] = str_cmd + d_ret['cwd'] = os.getcwd() + d_ret['stdout'] = str_stdout + d_ret['stderr'] = p.stderr.read().decode() + d_ret['returncode'] = p.returncode + with open('/tmp/job.json', 'w') as f: + json.dump(d_ret, f, indent=4) + if int(self.args['verbosity']) and len(d_ret['stderr']): + print('\nstderr: \n%s' % d_ret['stderr']) + return d_ret + + def job_runbg(self, str_cmd : str) -> dict: + """Run a job in the background + + Args: + str_cmd (str): CLI string to run + + Returns: + dict: a dictionary of exec state + """ + d_ret : dict = { + 'uid' : "", + 'cmd' : "", + 'cwd' : "" + } + # str_stdoutLine : str = "" + # str_stdout : str = "" + + p = subprocess.Popen( + str_cmd.split(), + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + ) + + d_ret['uid'] = str(os.getuid()) + d_ret['cmd'] = str_cmd + d_ret['cwd'] = os.getcwd() + # d_ret['stdout'] = str_stdout + # d_ret['stderr'] = p.stderr.read().decode() + # d_ret['returncode'] = p.returncode + # if int(self.args['verbosity']) and len(d_ret['stderr']): + # print('\nstderr: \n%s' % d_ret['stderr']) + return d_ret + + def job_stdwrite(self, d_job, str_outputDir, str_prefix = ""): + """ + Capture the d_job entries to respective files. + """ + if not self.args['noJobLogging']: + for key in d_job.keys(): + with open( + '%s/%s%s' % (str_outputDir, str_prefix, key), "w" + ) as f: + f.write(str(d_job[key])) + f.close() + return { + 'status': True + } diff --git a/build/lib/dylld.py b/build/lib/dylld.py new file mode 100644 index 0000000..a24b1b5 --- /dev/null +++ b/build/lib/dylld.py @@ -0,0 +1,380 @@ +#!/usr/bin/env python +from collections.abc import Iterator +from pathlib import Path +from argparse import ArgumentParser, Namespace, ArgumentDefaultsHelpFormatter + +from chris_plugin import chris_plugin, PathMapper + +from pathlib import Path + +from io import TextIOWrapper +import os, sys + +os.environ["XDG_CONFIG_HOME"] = "/tmp" +import pudb +from pudb.remote import set_trace + +from loguru import logger +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +from threading import current_thread, get_native_id + +from typing import Callable, Any + +from datetime import datetime, timezone +import json + +from state import data +from logic import behavior +from control import action +from control.filter import PathFilter +from pftag import pftag +from pflog import pflog + +LOG = logger.debug + +logger_format = ( + "{time:YYYY-MM-DD HH:mm:ss} │ " + "{level: <5} │ " + "{name: >28}::" + "{function: <30} @" + "{line: <4} ║ " + "{message}" +) +logger.remove() +logger.add(sys.stderr, format=logger_format) + +pluginInputDir: Path +pluginOutputDir: Path +ld_forestResult: list = [] + +__version__ = "4.4.40" + +DISPLAY_TITLE = r""" + _ _ _ _ _ + | | | | | | | | | + _ __ | |______ __| |_ _| | | __| | +| '_ \| |______/ _` | | | | | |/ _` | +| |_) | | | (_| | |_| | | | (_| | +| .__/|_| \__,_|\__, |_|_|\__,_| +| | __/ | +|_| |___/ +""" + + +parser: ArgumentParser = ArgumentParser( + description=""" +A ChRIS plugin that dynamically builds a workflow to compute length +discrepencies from extremity X-Rays +""", + formatter_class=ArgumentDefaultsHelpFormatter, +) + + +parser.add_argument( + "-V", "--version", action="version", version=f"%(prog)s {__version__}" +) + +parser.add_argument( + "--pattern", + default="**/*dcm", + help=""" + pattern for file names to include (you should quote this!) + (this flag triggers the PathMapper on the inputdir).""", +) +parser.add_argument( + "--pluginInstanceID", + default="", + help="plugin instance ID from which to start analysis", +) +parser.add_argument( + "--CUBEurl", default="http://localhost:8000/api/v1/", help="CUBE URL" +) +parser.add_argument("--CUBEuser", default="chris", help="CUBE/ChRIS username") +parser.add_argument("--CUBEpassword", default="chris1234", help="CUBE/ChRIS password") +parser.add_argument( + "--orthancURL", + default="https://orthanc-chris-public.apps.ocp-prod.massopen.cloud/", + help="IP of the orthanc to receive analysis results", +) +parser.add_argument("--orthancuser", default="fnndsc", help="Orthanc username") +parser.add_argument( + "--orthancpassword", default="Lerkyacyids5", help="Orthanc password" +) +parser.add_argument("--orthancremote", default="", help="remote orthanc modality") +parser.add_argument("--verbosity", default="0", help="verbosity level of app") +parser.add_argument( + "--thread", + help="use threading to branch in parallel", + dest="thread", + action="store_true", + default=False, +) +parser.add_argument( + "--pftelDB", + help="an optional pftel telemetry logger, of form '/api/v1///'", + default="", +) +parser.add_argument( + "--inNode", + help="perform in-node implicit parallelization in conjunction with --thread", + dest="inNode", + action="store_true", + default=False, +) +parser.add_argument( + "--notimeout", + help="if specified, then controller never timesout while waiting on nodes to complete", + dest="notimeout", + action="store_true", + default=False, +) +parser.add_argument( + "--debug", + help="if true, toggle telnet pudb debugging", + dest="debug", + action="store_true", + default=False, +) +parser.add_argument( + "--debugTermSize", + help="the terminal 'cols,rows' size for debugging", + default="253,62", +) +parser.add_argument("--debugPort", help="the debugging telnet port", default="7900") +parser.add_argument("--debugHost", help="the debugging telnet host", default="0.0.0.0") + + +def Env_setup( + options: Namespace, inputdir: Path, outputdir: Path, debugPortOffset: int = 0 +) -> data.env: + """ + Setup the environment + + Args: + options (Namespace): options passed from the CLI caller + inputdir (Path): plugin global input directory + outputdir (Path): plugin global output directory + debugPortOffset (int, optional): offset added to debug port -- useful for multithreading. Defaults to 0. + + Returns: + data.env: an instantiated environment object. Note in multithreaded + runs, each thread gets its own object. + """ + Env: data.env = data.env() + Env.CUBE.set(inputdir=str(inputdir)) + Env.CUBE.set(outputdir=str(outputdir)) + Env.CUBE.set(url=str(options.CUBEurl)) + Env.CUBE.set(username=str(options.CUBEuser)) + Env.CUBE.set(password=str(options.CUBEpassword)) + Env.orthanc.set(url=str(options.orthancURL)) + Env.orthanc.set(username=str(options.orthancuser)) + Env.orthanc.set(password=str(options.orthancpassword)) + Env.orthanc.set(remote=str(options.orthancremote)) + Env.set(inputdir=inputdir) + Env.set(outputdir=outputdir) + Env.debug_setup( + debug=options.debug, + termsize=options.debugTermSize, + port=int(options.debugPort) + debugPortOffset, + host=options.debugHost, + ) + return Env + + +def preamble(options: Namespace) -> str: + """ + Just show some preamble "noise" in the output terminal and also process + the --pftelDB if provided. + + Args: + options (Namespace): CLI options namespace + + Returns: + str: the parsed string + """ + + print(DISPLAY_TITLE) + pftelDB: str = "" + + if options.pftelDB: + tagger: pftag.Pftag = pftag.Pftag({}) + pftelDB = tagger(options.pftelDB)["result"] + + LOG("plugin arguments...") + for k, v in options.__dict__.items(): + LOG("%25s: [%s]" % (k, v)) + LOG("") + + LOG("base environment...") + for k, v in os.environ.items(): + LOG("%25s: [%s]" % (k, v)) + LOG("") + + LOG("Starting growth cycle...") + return pftelDB + + +def ground_prep(options: Namespace, Env: data.env) -> action.PluginRun: + """ + Do some per-tree setup -- prepare the ground! + + Args: + options (Namespace): options namespace + Env (data.env): the environment for this tree + + Returns: + action.PluginRun: A filter specific to this tree that will + filter a study of interest in the parent + space -- analogously akin to choosing a + seed. + """ + + LOG("Prepping ground for tree in thread %s..." % get_native_id()) + LOG("Constructing object to filter parent field") + PLinputFilter: action.PluginRun = action.PluginRun(env=Env, options=options) + + if len(options.pluginInstanceID): + Env.CUBE.parentPluginInstanceID = options.pluginInstanceID + else: + Env.CUBE.parentPluginInstanceID = Env.CUBE.parentPluginInstanceID_discover()[ + "parentPluginInstanceID" + ] + return PLinputFilter + + +def replantSeed_catchError(PLseed: action.PluginRun, input: Path) -> dict: + """ + Re-run a failed filter (pl-shexec) with explicit error catching + + Args: + PLseed (action.Pluginrun): the plugin run object to re-execute + input (Path): the input on which the seed failed + + Returns: + dict: the detailed error log from the failed run + """ + global LOG + LOG("Some error was returned when planting the seed!") + LOG("Replanting seed with error catching on...") + d_seedreplant: dict = PLseed(str(input), append="--jsonReturn") + return d_seedreplant + + +def tree_grow(options: Namespace, input: Path, output: Path = None) -> dict: + """ + Based on some conditional applied to the file space, direct the + dynamic "growth" of this feed tree from the parent node of *this* plugin. + + Args: + options (Namespace): CLI options + input (Path): input path returned by mapper + output (Path, optional): ouptut path returned by mapper. Defaults to None. + + Returns: + dict: resulant object dictionary of this (threaded) growth + """ + global pluginInputDir, pluginOutputDir, LOG, ld_forestResult + + # set_trace(term_size=(253, 62), host = '0.0.0.0', port = 7900) + + Env: data.env = Env_setup(options, pluginInputDir, pluginOutputDir, get_native_id()) + Env.set_telnet_trace_if_specified() + + timenow: Callable[[], str] = ( + lambda: datetime.now(timezone.utc).astimezone().isoformat() + ) + conditional: behavior.Filter = behavior.Filter() + conditional.obj_pass = behavior.unconditionalPass + PLinputFilter: action.PluginRun = ground_prep(options, Env) + LLD: action.LLDcomputeflow = action.LLDcomputeflow(env=Env, options=options) + str_threadName: str = current_thread().getName() + d_seedGet: dict = {"status": False, "message": "unable to plant seed"} + d_treeGrow: dict = {"status": False, "message": "unable to grow tree"} + d_ret: dict = {"seed": {}, "tree": {}} + + Path("%s/start-%s.touch" % (Env.outputdir.touch(), str_threadName)) + LOG("Growing a new tree in thread %s..." % str_threadName) + str_heartbeat: str = str( + Env.outputdir.joinpath("heartbeat-%s.log" % str_threadName) + ) + fl: TextIOWrapper = open(str_heartbeat, "w") + fl.write("Start time: {}\n".format(timenow())) + if conditional.obj_pass(str(input)): + LOG("Planting seed off %s" % str(input)) + d_seedGet = PLinputFilter(str(input)) + if d_seedGet["status"]: + d_treeGrow = LLD(d_seedGet["branchInstanceID"]) + else: + d_seedGet["failed"] = replantSeed_catchError(PLinputFilter, input) + fl.write("End time: {}\n".format(timenow())) + fl.close() + d_ret["seed"] = d_seedGet + d_ret["tree"] = d_treeGrow + ld_forestResult.append(d_ret) + return d_ret + + +def treeGrowth_savelog(outputdir: Path) -> None: + """ + Write the global log file on the tree growth to the passed + + + Args: + outputdir (Path): the plugin base output directory + """ + global ld_forestResult + + with open(str(outputdir.joinpath("treeLog.json")), "w") as f: + f.write(json.dumps(ld_forestResult, indent=4)) + f.close() + + +# documentation: https://fnndsc.github.io/chris_plugin/chris_plugin.html#chris_plugin +@chris_plugin( + parser=parser, + title="Leg-Length Discrepency - Dynamic Compute Flow", + category="", # ref. https://chrisstore.co/plugins + min_memory_limit="100Mi", # supported units: Mi, Gi + min_cpu_limit="1000m", # millicores, e.g. "1000m" = 1 CPU core + min_gpu_limit=0, # set min_gpu_limit=1 to enable GPU +) +@pflog.tel_logTime( + event="dylld", log="Leg Length Discepency Dynamic Workflow controller" +) +def main(options: Namespace, inputdir: Path, outputdir: Path): + """ + :param options: non-positional arguments parsed by the parser given to @chris_plugin + :param inputdir: directory containing input files (read-only) + :param outputdir: directory where to write output files + """ + # set_trace(term_size=(253, 62), host = '0.0.0.0', port = 7900) + global pluginInputDir, pluginOutputDir + pluginInputDir = inputdir + pluginOutputDir = outputdir + + options.pftelDB = preamble(options) + + output: Path + if not options.inNode: + mapper: PathMapper = PathMapper.file_mapper( + inputdir, outputdir, glob=options.pattern + ) + else: + mapper: PathMapper = PathMapper.dir_mapper_deep(inputdir, outputdir) + if int(options.thread): + with ThreadPoolExecutor(max_workers=len(os.sched_getaffinity(0))) as pool: + results: Iterator = pool.map(lambda t: tree_grow(options, *t), mapper) + + # raise any Exceptions which happened in threads + for _ in results: + pass + else: + for input, output in mapper: + d_results: dict = tree_grow(options, input, output) + + LOG("Ending growth cycle...") + treeGrowth_savelog(outputdir) + + +if __name__ == "__main__": + main() diff --git a/build/lib/logic/behavior.py b/build/lib/logic/behavior.py new file mode 100644 index 0000000..8901d1a --- /dev/null +++ b/build/lib/logic/behavior.py @@ -0,0 +1,30 @@ +str_about = ''' + The behavior module codifies the operational/dynamic behavior of this + plugin. + + Primary behaviors include some conditional logic over the space of + input objects, and triggering a resultant operation -- usually this means + copying the object into a child plugin, and then running a pipeline + on the result of that copy-to-child. + + For now, this class is mostly dummy filler. +''' + +def unconditionalPass(str_object: str) -> bool: + ''' + A dummy fall through function that always returns True. + ''' + return True + +class Filter: + ''' + An abstraction for evaluating a "condition" on some "object". + ''' + + def __init__(self, *args, **kwargs): + # point this to a function with signature + # ( : str) : bool + self.filterOp = None + + def obj_pass(self, str_object: str) -> bool: + return self.filterOp(str_object) \ No newline at end of file diff --git a/build/lib/state/data.py b/build/lib/state/data.py new file mode 100644 index 0000000..7008373 --- /dev/null +++ b/build/lib/state/data.py @@ -0,0 +1,311 @@ +str_about = ''' + This module is responsible for handling some state related information + which is mostly information about the ChRIS/CUBE instance. + + Core data includes information on the ChRIS/CUBE instances as well as + information relevant to the pipeline to be scheduled. +''' +import os +os.environ['XDG_CONFIG_HOME'] = '/tmp' +from pudb.remote import set_trace +from curses import meta +from pathlib import Path +import pudb +import json +from urllib.parse import urlparse +import logging +logging.basicConfig(level=logging.CRITICAL) + +class env: + ''' + A class that contains environmental data -- mostly information about CUBE + as well as data pertaining to the orthanc instance + ''' + + def __init__(self, *args, **kwargs): + ''' + Constructor + ''' + self.inputdir : Path = None + self.outputdir : Path = None + self.CUBE : CUBEinstance = CUBEinstance() + self.orthanc : Orthancinstance = Orthancinstance() + self.debug : dict = { + 'do' : False, + 'termsize' : (80,25), + 'port' : 7900, + 'host' : '0.0.0.0' + } + + def debug_setup(self, **kwargs) -> dict: + """ + Setup the debugging structure based on + + Returns: + dict: the debug structure + """ + str_termsize : str = "" + str_port : str = "" + str_host : str = "0.0.0.0" + b_debug : bool = False + for k,v in kwargs.items(): + if k == 'debug' : b_debug = v + if k == 'termsize' : str_termsize = v + if k == 'port' : str_port = v + if k == 'host' : str_host = v + + cols, rows = str_termsize.split(',') + self.debug['do'] = b_debug + self.debug['termsize'] = (int(cols), int(rows)) + self.debug['port'] = int(str_port) + self.debug['host'] = str_host + return self.debug + + def set_telnet_trace_if_specified(self): + """ + If specified in the env, pause for a telnet debug. + + If you are debugging, just "step" to return to the location + in your code where you specified to break! + """ + if self.debug['do']: + set_trace( + term_size = self.debug['termsize'], + host = self.debug['host'], + port = self.debug['port'] + ) + + def set_trace(self): + """ + Simple "override" for setting a trace. If the Env is configured + for debugging, then this set_trace will be called. Otherwise it + will be skipped. + + This is useful for leaving debugging set_traces in the code, and + being able to at runtime choose to debug or not. + + If you are debugging, just "step" to return to the location + in your code where you specified to break! + + Returns: + _type_: _description_ + """ + if self.debug['do']: + pudb.set_trace() + + def __call__(self, str_key) -> str | None: + ''' + get a value for a str_key + ''' + if str_key in ['inputdir', 'outputdir']: + return getattr(self, str_key) + else: + return None + + def set(self, **kwargs) -> bool: + """ + A custom setter -- the normal python getter/setter approach suffers + IMHO from implicitly adding members directly to the object. + + Returns: + bool: True or False on setting + """ + b_status:bool = False + for k,v in kwargs.items(): + if k == 'inputdir' : self.inputdir = v ; b_status = True + if k == 'outputdir' : self.outputdir = v ; b_status = True + return b_status + +class CUBEinstance: + ''' + A class that contains data pertinent to a specific CUBE instance + ''' + + def __init__(self, *args, **kwargs): + self.d_CUBE = { + 'username' : 'chris', + 'password' : 'chris1234', + 'address' : '192.168.1.200', + 'port' : '8000', + 'route' : '/api/v1/', + 'protocol' : 'http', + 'url' : '' + } + self.parentPluginInstanceID : str = '' + self.str_inputdir : str = None + self.str_outputdir : str = None + + def setCUBE(self, str_key, str_val): + ''' + set str_key to str_val + ''' + if str_key in self.d_CUBE.keys(): + self.d_CUBE[str_key] = str_val + + def __call__(self, str_key) -> str | None: + ''' + get a value for a str_key + ''' + if str_key in self.d_CUBE.keys(): + return self.d_CUBE[str_key] + else: + if str_key in ['inputdir', 'outputdir']: + return getattr(self, str_key) + else: + return None + + def set(self, **kwargs) -> bool: + """ + A custom setter -- the normal python getter/setter approach suffers + IMHO from implicitly adding members directly to the object. + + Returns: + bool: True or False on setting + """ + b_status:bool = False + for k,v in kwargs.items(): + if k == 'inputdir' : self.str_inputdir = v ; b_status = True + if k == 'outputdir' : self.str_outputdir = v ; b_status = True + if k == 'parentPluginInstanceID' : + self.parentPluginInstanceID = v ; b_status = True + if k == 'username' : self.setCUBE(k, v) ; b_status = True + if k == 'password' : self.setCUBE(k, v) ; b_status = True + if k == 'addreses' : self.setCUBE(k, v) ; b_status = True + if k == 'port' : self.setCUBE(k, v) ; b_status = True + if k == 'route' : self.setCUBE(k, v) ; b_status = True + if k == 'protocol' : self.setCUBE(k, v) ; b_status = True + if k == 'url' : + self.setCUBE(k, v) + self.url_decompose() + b_status = True + return b_status + + def onCUBE(self) -> dict: + ''' + Return a dictionary that is a subset of self.d_CUBE + suitable for using in calls to the CLI tool 'chrispl-run' + ''' + return { + 'protocol': self('protocol'), + 'port': self('port'), + 'address': self('address'), + 'user': self('username'), + 'password': self('password') + } + + def parentPluginInstanceID_discover(self) -> dict: + ''' + Determine the pluginInstanceID of the parent plugin. CUBE provides + several environment variables: + + CHRIS_JID + CHRIS_PLG_INST_ID + CHRIS_PREV_JID + CHRIS_PREV_PLG_INST_ID + + ''' + + self.parentPluginInstanceID = os.environ['CHRIS_PREV_PLG_INST_ID'] + + return { + 'parentPluginInstanceID': self.parentPluginInstanceID + } + + def url_decompose(self, *args): + ''' + Decompose the internal URL into constituent parts + ''' + + o = urlparse(self.d_CUBE['url']) + self.d_CUBE['protocol'] = o.scheme + self.d_CUBE['address'] = o.hostname + self.d_CUBE['port'] = o.port + if not self.d_CUBE['port']: + self.d_CUBE['port'] = '' + self.d_CUBE['route'] = o.path + return self.d_CUBE['url'] + +class Orthancinstance: + ''' + A class that contains data pertinent to a specific Orthanc instance + ''' + + def __init__(self, *args, **kwargs) -> None: + self.d_orthanc: dict[str, str] = { + 'username' : 'orthanc', + 'password' : 'orthanc', + 'IP' : '192.168.1.200', + 'port' : '4242', + 'remote' : '', + 'protocol' : 'http', + 'route' : '', + 'url' : '' + } + + def setOrthanc(self, str_key, str_val) -> bool: + ''' + set str_key to str_val + ''' + b_status:bool = False + if str_key in self.d_orthanc.keys(): + b_status = True + self.d_orthanc[str_key] = str_val + return b_status + + def __call__(self, str_key) -> str | None: + ''' + get a value for a str_key + ''' + if str_key in self.d_orthanc.keys(): + return self.d_orthanc[str_key] + else: + return None + + def set(self, **kwargs) -> bool: + """ + A custom setter -- the normal python getter/setter approach suffers + IMHO from implicitly adding members directly to the object. + + Returns: + bool: True or False on setting + """ + b_status:bool = False + for k,v in kwargs.items(): + if k == 'username' : self.setOrthanc(k, v) ; b_status = True + if k == 'password' : self.setOrthanc(k, v) ; b_status = True + if k == 'IP' : self.setOrthanc(k, v) ; b_status = True + if k == 'port' : self.setOrthanc(k, v) ; b_status = True + if k == 'remote' : self.setOrthanc(k, v) ; b_status = True + if k == 'protocol' : self.setOrthanc(k, v) ; b_status = True + if k == 'route' : self.setOrthanc(k, v) ; b_status = True + if k == 'url' : + self.setOrthanc(k, v) + self.url_decompose() + b_status = True + return b_status + + def url_decompose(self, *args): + ''' + Decompose the internal URL into constituent parts + ''' + o = urlparse(self.d_orthanc['url']) + + self.d_orthanc['protocol'] = o.scheme + self.d_orthanc['IP'] = o.hostname + self.d_orthanc['port'] = o.port + if not self.d_orthanc['port']: + self.d_orthanc['port'] = '' + self.d_orthanc['route'] = o.path + return self.d_orthanc['url'] + +class Pipeline: + ''' + Information pertinent to the pipline being scheduled. This is + encapsulated with a class object to allow for possible future + expansion. + ''' + + def __init__(self, *args, **kwargs): + + self.str_pipelineName = '' + diff --git a/control/action.py b/control/action.py index 2a3b26b..e0c11d8 100644 --- a/control/action.py +++ b/control/action.py @@ -260,7 +260,7 @@ def waitForNodeInWorkflow(self, if totalPolls: pollCount += 1 if 'finished' in d_plinfo['status']: b_finished = d_plinfo['status'] == 'finishedSuccessfully' - #self.QA_check(d_plinfo) + self.QA_check(d_plinfo) return { 'finished' : b_finished, 'status' : str_pluginStatus, @@ -270,9 +270,20 @@ def waitForNodeInWorkflow(self, 'plid' : waitOnPluginID } def QA_check(self,plugin_info): - print(plugin_info) + """ + A method to check the QA of the output of LLD analysis. This method checks + the output of the QA plugin `pl-lld_chxr` and determines if a QA failure as occured. + As an effect of this check, the name of the feed of which this plugin, `pl-lld_chxr` is + a part, is changed. The updated name becomes `QA-failed-`. + Args: + plugin_info: A dictionary representing a plugin instance of CUBE + + Returns: + + """ QA_plugin = 'pl-lld_chxr' - if QA_plugin in plugin_info['plugin_name']: + error_code = '' + if QA_plugin in plugin_info['plugin_name'] and error_code in plugin_info['summary']: feed_id = plugin_info['feed_id'] feed = self.cl.get_feeds({'feed_id': feed_id}) name = feed['data'][0]['name'] @@ -691,7 +702,7 @@ def computeFlow_build(self) -> dict: topoJoinArgs = '\.dcm$,\.*$' ), workflowTitle = 'PNG-to-DICOM and push to PACS v20230324', - waitForNodeWithTitle = 'pacs-push', + waitForNodeWithTitle = 'QA-Check', totalPolls = totalPolls, pluginParameters = { 'image-to-DICOM' : { @@ -706,6 +717,7 @@ def computeFlow_build(self) -> dict: } } ) + # pudb.set_trace() return d_ret diff --git a/dylld.py b/dylld.py index a24b1b5..5d1e69d 100644 --- a/dylld.py +++ b/dylld.py @@ -271,7 +271,7 @@ def tree_grow(options: Namespace, input: Path, output: Path = None) -> dict: output (Path, optional): ouptut path returned by mapper. Defaults to None. Returns: - dict: resulant object dictionary of this (threaded) growth + dict: resultant object dictionary of this (threaded) growth """ global pluginInputDir, pluginOutputDir, LOG, ld_forestResult