diff --git a/assets/topp-workflow-default-params.json b/assets/topp-workflow-default-params.json new file mode 100644 index 0000000..784fc4b --- /dev/null +++ b/assets/topp-workflow-default-params.json @@ -0,0 +1,4 @@ +{ + "mzML_files": [ + ] +} \ No newline at end of file diff --git a/environment.yml b/environment.yml index 452cbfd..ef14bf4 100644 --- a/environment.yml +++ b/environment.yml @@ -11,7 +11,7 @@ dependencies: - mono==6.12.0.90 - pip: # dependencies only available through pip # streamlit dependencies - - streamlit==1.28.0 + - streamlit==1.29.0 - streamlit-plotly-events==0.0.6 - streamlit-aggrid==0.3.4.post3 - captcha==0.5.0 diff --git a/pages/2_Simple_Workflow.py b/pages/2_Simple_Workflow.py index eb3acd3..ec55445 100755 --- a/pages/2_Simple_Workflow.py +++ b/pages/2_Simple_Workflow.py @@ -1,7 +1,7 @@ import streamlit as st from src.common import page_setup, save_params, show_table -from src import workflow +from src import simpleworkflow from src.captcha_ import captcha_control # Page name "workflow" will show mzML file selector in sidebar @@ -40,7 +40,7 @@ # Get a dataframe with x and y dimensions via time consuming (sleep) cached function # If the input has been given before, the function does not run again # Input x from local variable, input y from session state via key -df = workflow.generate_random_table(xdimension, st.session_state["example-y-dimension"]) +df = simpleworkflow.generate_random_table(xdimension, st.session_state["example-y-dimension"]) # Display dataframe via custom show_table function, which will render a download button as well show_table(df, download_name="random-table") diff --git a/pages/5_TOPP-Workflow.py b/pages/5_TOPP-Workflow.py new file mode 100644 index 0000000..0f5f53c --- /dev/null +++ b/pages/5_TOPP-Workflow.py @@ -0,0 +1,26 @@ +import streamlit as st +from src.common import page_setup +from src.Workflow import Workflow + +# The rest of the page can, but does not have to be changed +if __name__ == "__main__": + + params = page_setup() + + wf = Workflow() + + st.title(wf.name) + + t = st.tabs(["📁 **File Upload**", "⚙️ **Configure**", "🚀 **Run**", "📊 **Results**"]) + with t[0]: + wf.show_file_upload_section() + + with t[1]: + wf.show_parameter_section() + + with t[2]: + wf.show_execution_section() + + with t[3]: + wf.show_results_section() + diff --git "a/pages/6_\360\237\223\226_TOPP-Workflow_Docs.py" "b/pages/6_\360\237\223\226_TOPP-Workflow_Docs.py" new file mode 100644 index 0000000..e3405f2 --- /dev/null +++ "b/pages/6_\360\237\223\226_TOPP-Workflow_Docs.py" @@ -0,0 +1,399 @@ +import streamlit as st +from src.Workflow import Workflow +from src.workflow.StreamlitUI import StreamlitUI +from src.workflow.FileManager import FileManager +from src.workflow.CommandExecutor import CommandExecutor +from src.common import page_setup +from inspect import getsource + +page_setup() + +wf = Workflow() + +st.title("📖 Workflow Framework Docs") + +st.markdown( +""" +## Features + +- streamlined methods for uploading files, setting parameters, and executing workflows +- automatic parameter handling +- quickly build parameter interface for TOPP tools with all parameters from *ini* files +- automatically create a log file for each workflow run with stdout and stderr +- workflow output updates automatically in short intervalls +- user can leave the app and return to the running workflow at any time +- quickly build a workflow with multiple steps channelling files between steps +# +""" +) + +with st.expander("**Example User Interface**", True): + t = st.tabs(["📁 **File Upload**", "⚙️ **Configure**", "🚀 **Run**", "📊 **Results**"]) + with t[0]: + wf.show_file_upload_section() + + with t[1]: + wf.show_parameter_section() + + with t[2]: + wf.show_execution_section() + + with t[3]: + wf.show_results_section() + +st.markdown( +""" +## Quickstart + +This repository contains a module in `src/workflow` that provides a framework for building and running analysis workflows. + +The `WorkflowManager` class provides the core workflow logic. It uses the `Logger`, `FileManager`, `ParameterManager`, and `CommandExecutor` classes to setup a complete workflow logic. + +To build your own workflow edit the file `src/TOPPWorkflow.py`. Use any streamlit components such as tabs (as shown in example), columns, or even expanders to organize the helper functions for displaying file upload and parameter widgets. + +> 💡 Simply set a name for the workflow and overwrite the **`upload`**, **`configure`**, **`execution`** and **`results`** methods in your **`Workflow`** class. + +The file `pages/6_TOPP-Workflow.py` displays the workflow content and can, but does not have to be modified. + +The `Workflow` class contains four important members, which you can use to build your own workflow: + +> **`self.params`:** dictionary of parameters stored in a JSON file in the workflow directory. Parameter handling is done automatically. Default values are defined in input widgets and non-default values are stored in the JSON file. + +> **`self.ui`:** object of type `StreamlitUI` contains helper functions for building the parameter and file upload widgets. + +> **`self.executor`:** object of type `CommandExecutor` can be used to run any command line tool alone or in parallel and includes a convenient method for running TOPP tools. + +> **`self.logger`:** object of type `Logger` to write any output to a log file during workflow execution. + +> **`self.file_manager`:** object of type `FileManager` to handle file types and creation of output directories. +""" +) + +with st.expander("**Complete example for custom Workflow class**", expanded=False): + st.code(getsource(Workflow)) + +st.markdown( +""" +## File Upload + +All input files for the workflow will be stored within the workflow directory in the subdirectory `input-files` within it's own subdirectory for the file type. + +The subdirectory name will be determined by a **key** that is defined in the `self.ui.upload_widget` method. The uploaded files are available by the specific key for parameter input widgets and accessible while building the workflow. + +Calling this method will create a complete file upload widget section with the following components: + +- file uploader +- list of currently uploaded files with this key (or a warning if there are none) +- button to delete all files + +Fallback files(s) can be specified, which will be used if the user doesn't upload any files. This can be useful for example for database files where a default is provided. +""") + +st.code(getsource(Workflow.upload)) + +st.info("💡 Use the same **key** for parameter widgets, to select which of the uploaded files to use for analysis.") + +with st.expander("**Code documentation:**", expanded=True): + st.help(StreamlitUI.upload_widget) + +st.markdown( + """ +## Parameter Input + +The paramter section is already pre-defined as a form with buttons to **save parameters** and **load defaults** and a toggle to show TOPP tool parameters marked as advanced. + +Generating parameter input widgets is done with the `self.ui.input` method for any parameter and the `self.ui.input_TOPP` method for TOPP tools. + +**1. Choose `self.ui.input_widget` for any paramter not-related to a TOPP tool or `self.ui.select_input_file` for any input file:** + +It takes the obligatory **key** parameter. The key is used to access the parameter value in the workflow parameters dictionary `self.params`. Default values do not need to be specified in a separate file. Instead they are determined from the widgets default value automatically. Widget types can be specified or automatically determined from **default** and **options** parameters. It's suggested to add a **help** text and other parameters for numerical input. + +Make sure to match the **key** of the upload widget when calling `self.ui.input_TOPP`. + +**2. Choose `self.ui.input_TOPP` to automatically generate complete input sections for a TOPP tool:** + +It takes the obligatory **topp_tool_name** parameter and generates input widgets for each parameter present in the **ini** file (automatically created) except for input and output file parameters. For all input file parameters a widget needs to be created with `self.ui.select_input_file` with an appropriate **key**. For TOPP tool parameters only non-default values are stored. + +**3. Choose `self.ui.input_python` to automatically generate complete input sections for a custom Python tool:** + +Takes the obligatory **script_file** argument. The default location for the Python script files is in `src/python-tools` (in this case the `.py` file extension is optional in the **script_file** argument), however, any other path can be specified as well. Parameters need to be specified in the Python script in the **DEFAULTS** variable with the mandatory **key** and **value** parameters. + +Here are the options to use as dictionary keys for parameter definitions (see `src/python-tools/example.py` for an example): + +Mandatory keys for each parameter +- **key:** a unique identifier +- **value:** the default value + +Optional keys for each parameter +- **name:** the name of the parameter +- **hide:** don't show the parameter in the parameter section (e.g. for **input/output files**) +- **options:** a list of valid options for the parameter +- **min:** the minimum value for the parameter (int and float) +- **max:** the maximum value for the parameter (int and float) +- **step_size:** the step size for the parameter (int and float) +- **help:** a description of the parameter +- **widget_type:** the type of widget to use for the parameter (default: auto) +- **advanced:** whether or not the parameter is advanced (default: False) + +""") + +st.code( +getsource(Workflow.configure) +) +st.info("💡 Access parameter widget values by their **key** in the `self.params` object, e.g. `self.params['mzML-files']` will give all selected mzML files.") + +with st.expander("**Code documentation**", expanded=True): + st.help(StreamlitUI.input_widget) + st.help(StreamlitUI.select_input_file) + st.help(StreamlitUI.input_TOPP) + st.help(StreamlitUI.input_python) +st.markdown( + """ +## Building the Workflow + +Building the workflow involves **calling all (TOPP) tools** using **`self.executor`** with **input and output files** based on the **`FileManager`** class. For TOPP tools non-input-output parameters are handled automatically. Parameters for other processes and workflow logic can be accessed via widget keys (set in the parameter section) in the **`self.params`** dictionary. + +### FileManager + +The `FileManager` class serves as an interface for unified input and output files with useful functionality specific to building workflows, such as **setting a (new) file type** and **subdirectory in the workflows result directory**. + +Use the **`get_files`** method to get a list of all file paths as strings. + +Optionally set the following parameters modify the files: + +- **set_file_type** (str): set new file types and result subdirectory. +- **set_results_dir** (str): set a new subdirectory in the workflows result directory. +- **collect** (bool): collect all files into a single list. Will return a list with a single entry, which is a list of all files. Useful to pass to tools which can handle multiple input files at once. +""") + +st.code( + """ +# Get all file paths as strings from self.param entry. +mzML_files = self.file_manager.get_files(self.params["mzML-files]) +# mzML_files = ['../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML', '../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML'] + +# Creating output files for a TOPP tool, setting a new file type and result subdirectory name. +feature_detection_out = self.file_manager.get_files(mzML_files, set_file_type="featureXML", set_results_dir="feature-detection") +# feature_detection_out = ['../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML', '../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML'] + +# Setting a name for the output directory automatically (useful if you never plan to access these files in the results section). +feature_detection_out = self.file_manager.get_files(mzML_files, set_file_type="featureXML", set_results_dir="auto") +# feature_detection_out = ['../workspaces-streamlit-template/default/topp-workflow/results/6DUd/Control.featureXML', '../workspaces-streamlit-template/default/topp-workflow/results/6DUd/Treatment.featureXML'] + +# Combining all mzML files to be passed to a TOPP tool in a single run. Using "collected" files as argument for self.file_manager.get_files will "un-collect" them. +mzML_files = self.file_manager.get_files(mzML_files, collect=True) +# mzML_files = [['../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML', '../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML']] + """ +) + +with st.expander("**Code documentation**", expanded=True): + st.help(FileManager.get_files) + +st.markdown( + """ +### Running commands + +It is possible to execute any command line command using the **`self.executor`** object, either a single command or a list of commands in parallel. Furthermore a method to run TOPP tools is included. + +**1. Single command** + +The `self.executor.run_command` method takes a single command as input and optionally logs stdout and stderr to the workflow log (default True). +""") + +st.code(""" +self.executor.run_command(["command", "arg1", "arg2", ...], write_log=True) +""") + +st.markdown( + """ +**2. Run multiple commands in parallel** + +The `self.executor.run_multiple_commands` method takes a list of commands as inputs. + +**3. Run TOPP tools** + +The `self.executor.run_topp` method takes a TOPP tool name as input and a dictionary of input and output files as input. The **keys** need to match the actual input and output parameter names of the TOPP tool. The **values** should be of type `FileManager`. All other **non-default parameters (from input widgets)** will be passed to the TOPP tool automatically. + +Depending on the number of input files, the TOPP tool will be run either in parallel or in a single run (using **`FileManager.collect`**). +""") + +st.info("""💡 **Input and output file order** + +In many tools, a single input file is processed to produce a single output file. +When dealing with lists of input or output files, the convention is that +files are paired based on their order. For instance, the n-th input file is +assumed to correspond to the n-th output file, maintaining a structured +relationship between input and output data. +""") +st.code(""" +# e.g. FeatureFinderMetabo takes single input files +in_files = self.file_manager.get_files(["sample1.mzML", "sample2.mzML"]) +out_files = self.file_manager.get_files(in_files, set_file_type="featureXML", set_results_dir="feature-detection") + +# Run FeatureFinderMetabo tool with input and output files in parallel for each pair of input/output files. +self.executor.run_topp("FeatureFinderMetabo", input_output={"in": in_files, "out": out_files}) +# FeaturFinderMetabo -in sample1.mzML -out workspace-dir/results/feature-detection/sample1.featureXML +# FeaturFinderMetabo -in sample2.mzML -out workspace-dir/results/feature-detection/sample2.featureXML + +# Run SiriusExport tool with mutliple input and output files. +out = self.file_manager.get_files("sirius.ms", set_results_dir="sirius-export") +self.executor.run_topp("SiriusExport", {"in": self.file_manager.get_files(in_files, collect=True), + "in_featureinfo": self.file_manager.get_files(out_files, collect=True), + "out": out_se}) +# SiriusExport -in sample1.mzML sample2.mzML -in_featureinfo sample1.featureXML sample2.featureXML -out sirius.ms + """) + +st.markdown(""" +**4. Run custom Python scripts** + +Sometimes it is useful to run custom Python scripts, for example for extra functionality which is not included in a TOPP tool. + +`self.executor.run_python` works similar to `self.executor.run_topp`, but takes a single Python script as input instead of a TOPP tool name. The default location for the Python script files is in `src/python-tools` (in this case the `.py` file extension is optional in the **script_file** argument), however, any other path can be specified as well. Input and output file parameters need to be specified in the **input_output** dictionary. +""") + +st.code(""" +# e.g. example Python tool which modifies mzML files in place based on experimental design +self.ui.input_python(script_file="example", input_output={"in": in_mzML, "in_experimantal_design": FileManager(["path/to/experimantal-design.tsv"])}) + """) + +st.markdown("**Example for a complete workflow section:**") + +st.code( +getsource(Workflow.execution) +) + +with st.expander("**Code documentation**", expanded=True): + st.help(CommandExecutor.run_command) + st.help(CommandExecutor.run_multiple_commands) + st.help(CommandExecutor.run_topp) + st.help(CommandExecutor.run_python) + +with st.expander("**Example output of the complete example workflow**"): + st.code(""" +STARTING WORKFLOW + +Number of input mzML files: 2 + +Running 2 commands in parallel... + +Running command: +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 +Waiting for command to finish... + +Running command: +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 +Waiting for command to finish... + +Process finished: +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 +Total time to run command: 0.55 seconds + +Progress of 'loading mzML': + Progress of 'loading spectra list': + + 89.06 % + -- done [took 0.17 s (CPU), 0.17 s (Wall)] -- + Progress of 'loading chromatogram list': + + -- done [took 0.00 s (CPU), 0.00 s (Wall)] -- + +-- done [took 0.18 s (CPU), 0.18 s (Wall) @ 40.66 MiB/s] -- +Progress of 'mass trace detection': + +-- done [took 0.01 s (CPU), 0.01 s (Wall)] -- +Progress of 'elution peak detection': + +-- done [took 0.07 s (CPU), 0.07 s (Wall)] -- +Progress of 'assembling mass traces to features': +Loading metabolite isotope model with 5% RMS error + +-- done [took 0.04 s (CPU), 0.04 s (Wall)] -- +-- FF-Metabo stats -- +Input traces: 1382 +Output features: 1095 (total trace count: 1382) +FeatureFinderMetabo took 0.47 s (wall), 0.90 s (CPU), 0.43 s (system), 0.47 s (user); Peak Memory Usage: 88 MB. + + +Process finished: +FeatureFinderMetabo -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML -out ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML -algorithm:common:chrom_peak_snr 4.0 -algorithm:common:noise_threshold_int 1000.0 +Total time to run command: 0.60 seconds + +Progress of 'loading mzML': + Progress of 'loading spectra list': + + 77.09 % + -- done [took 0.16 s (CPU), 0.16 s (Wall)] -- + Progress of 'loading chromatogram list': + + -- done [took 0.00 s (CPU), 0.00 s (Wall)] -- + +-- done [took 0.17 s (CPU), 0.17 s (Wall) @ 43.38 MiB/s] -- +Progress of 'mass trace detection': + +-- done [took 0.02 s (CPU), 0.02 s (Wall)] -- +Progress of 'elution peak detection': + +-- done [took 0.07 s (CPU), 0.07 s (Wall)] -- +Progress of 'assembling mass traces to features': +Loading metabolite isotope model with 5% RMS error + +-- done [took 0.05 s (CPU), 0.05 s (Wall)] -- +-- FF-Metabo stats -- +Input traces: 1521 +Output features: 1203 (total trace count: 1521) +FeatureFinderMetabo took 0.51 s (wall), 0.90 s (CPU), 0.45 s (system), 0.45 s (user); Peak Memory Usage: 88 MB. + + +Total time to run 2 commands: 0.60 seconds + +Running command: +python src/python-tools/example.py ../workspaces-streamlit-template/default/topp-workflow/example.json +Waiting for command to finish... + +Process finished: +python src/python-tools/example.py ../workspaces-streamlit-template/default/topp-workflow/example.json +Total time to run command: 0.04 seconds + +Writing stdout which will get logged... +Parameters for this example Python tool: +{ + "in": [ + "../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML", + "../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML" + ], + "out": [], + "number-slider": 6, + "selectbox-example": "c", + "adavanced-input": 5, + "checkbox": true +} + + +Running command: +SiriusExport -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -in_featureinfo ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -out ../workspaces-streamlit-template/default/topp-workflow/results/sirius-export/sirius.ms +Waiting for command to finish... + +Process finished: +SiriusExport -in ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Control.mzML ../workspaces-streamlit-template/default/topp-workflow/input-files/mzML-files/Treatment.mzML -in_featureinfo ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Control.featureXML ../workspaces-streamlit-template/default/topp-workflow/results/feature-detection/Treatment.featureXML -out ../workspaces-streamlit-template/default/topp-workflow/results/sirius-export/sirius.ms +Total time to run command: 0.65 seconds + +Number of features to be processed: 0 +Number of additional MS2 spectra to be processed: 0 +No MS1 spectrum for this precursor. Occurred 0 times. +0 spectra were skipped due to precursor charge below -1 and above +1. +Mono charge assumed and set to charge 1 with respect to current polarity 0 times. +0 features were skipped due to feature charge below -1 and above +1. +No MS1 spectrum for this precursor. Occurred 0 times. +0 spectra were skipped due to precursor charge below -1 and above +1. +Mono charge assumed and set to charge 1 with respect to current polarity 0 times. +0 features were skipped due to feature charge below -1 and above +1. + occurred 2 times +SiriusExport took 0.61 s (wall), 1.71 s (CPU), 1.06 s (system), 0.65 s (user); Peak Memory Usage: 88 MB. + occurred 2 times + + +WORKFLOW FINISHED + """, language="neon") + + + diff --git a/requirements.txt b/requirements.txt index f3cb5f4..67f3b39 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ # the requirements.txt file is intended for deployment on streamlit cloud and if the simple container is built # note that it is much more restricted in terms of installing third-parties / etc. # preferably use the batteries included or simple docker file for local hosting -streamlit==1.28.0 +streamlit==1.29.0 streamlit-plotly-events==0.0.6 streamlit-aggrid==0.3.4.post3 pandas==2.1.2 diff --git a/src/Workflow.py b/src/Workflow.py new file mode 100644 index 0000000..0cc3ccd --- /dev/null +++ b/src/Workflow.py @@ -0,0 +1,79 @@ +import streamlit as st +from .workflow.WorkflowManager import WorkflowManager + +class Workflow(WorkflowManager): + # Setup pages for upload, parameter, execution and results. + # For layout use any streamlit components such as tabs (as shown in example), columns, or even expanders. + def __init__(self) -> None: + # Initialize the parent class with the workflow name. + super().__init__("TOPP Workflow", st.session_state["workspace"]) + + def upload(self)-> None: + t = st.tabs(["MS data", "Example with fallback data"]) + with t[0]: + # Use the upload method from StreamlitUI to handle mzML file uploads. + self.ui.upload_widget(key="mzML-files", name="MS data", file_type="mzML") + with t[1]: + # Example with fallback data (not used in workflow) + self.ui.upload_widget(key="image", file_type="png", fallback="assets/OpenMS.png") + + def configure(self) -> None: + # Allow users to select mzML files for the analysis. + self.ui.select_input_file("mzML-files", multiple=True) + + # Create tabs for different analysis steps. + t = st.tabs( + ["**Feature Detection**", "**Adduct Detection**", "**SIRIUS Export**", "**Python Custom Tool**"] + ) + with t[0]: + # Parameters for FeatureFinderMetabo TOPP tool. + self.ui.input_TOPP("FeatureFinderMetabo") + with t[1]: + # A single checkbox widget for workflow logic. + self.ui.input_widget("run-adduct-detection", False, "Adduct Detection") + # Paramters for MetaboliteAdductDecharger TOPP tool. + self.ui.input_TOPP("MetaboliteAdductDecharger") + with t[2]: + # Paramters for SiriusExport TOPP tool + self.ui.input_TOPP("SiriusExport") + with t[3]: + # Generate input widgets for a custom Python tool, located at src/python-tools. + # Parameters are specified within the file in the DEFAULTS dictionary. + self.ui.input_python("example") + + def execution(self) -> None: + # Get mzML input files from self.params. + # Can be done without file manager, however, it ensures everything is correct. + in_mzML = self.file_manager.get_files(self.params["mzML-files"]) + + # Log any messages. + self.logger.log(f"Number of input mzML files: {len(in_mzML)}") + + # Prepare output files for feature detection. + out_ffm = self.file_manager.get_files(in_mzML, "featureXML", "feature-detection") + + # Run FeatureFinderMetabo tool with input and output files. + self.executor.run_topp( + "FeatureFinderMetabo", input_output={"in": in_mzML, "out": out_ffm} + ) + + # Check if adduct detection should be run. + if self.params["run-adduct-detection"]: + + # Run MetaboliteAdductDecharger for adduct detection, with disabled logs. + # Without a new file list for output, the input files will be overwritten in this case. + self.executor.run_topp( + "MetaboliteAdductDecharger", {"in": out_ffm, "out_fm": out_ffm}, write_log=False + ) + + # Example for a custom Python tool, which is located in src/python-tools. + self.executor.run_python("example", {"in": in_mzML}) + + # Prepare output file for SiriusExport. + out_se = self.file_manager.get_files("sirius.ms", set_results_dir="sirius-export") + self.executor.run_topp("SiriusExport", {"in": self.file_manager.get_files(in_mzML, collect=True), + "in_featureinfo": self.file_manager.get_files(out_ffm, collect=True), + "out": out_se}) + + def results(self) -> None: + st.warning("Not implemented yet.") \ No newline at end of file diff --git a/src/python-tools/.gitignore b/src/python-tools/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/src/python-tools/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/src/python-tools/example.py b/src/python-tools/example.py new file mode 100644 index 0000000..21431dc --- /dev/null +++ b/src/python-tools/example.py @@ -0,0 +1,65 @@ +import json +import sys + +############################ +# default paramter values # +########################### +# +# Mandatory keys for each parameter +# key: a unique identifier +# value: the default value +# +# Optional keys for each parameter +# name: the name of the parameter +# hide: don't show the parameter in the parameter section (e.g. for input/output files) +# options: a list of valid options for the parameter +# min: the minimum value for the parameter (int and float) +# max: the maximum value for the parameter (int and float) +# step_size: the step size for the parameter (int and float) +# help: a description of the parameter +# widget_type: the type of widget to use for the parameter (default: auto) +# advanced: whether or not the parameter is advanced (default: False) + +DEFAULTS = [ + {"key": "in", "value": [], "help": "Input files for Python Script.", "hide": True}, + {"key": "out", "value": [], "help": "Output files for Python Script.", "hide": True}, + { + "key": "number-slider", + "name": "number of features", + "value": 6, + "min": 2, + "max": 10, + "help": "How many features to consider.", + "widget_type": "slider", + "step_size": 2, + }, + { + "key": "selectbox-example", + "value": "a", + "options": ["a", "b", "c"], + }, + { + "key": "adavanced-input", + "value": 5, + "step_size": 5, + "help": "An advanced example parameter.", + "advanced": True, + }, + { + "key": "checkbox", "value": True + } +] + +def get_params(): + if len(sys.argv) > 1: + with open(sys.argv[1], "r") as f: + return json.load(f) + else: + return {} + +if __name__ == "__main__": + params = get_params() + # Add code here: + print("Writing stdout which will get logged...") + print("Parameters for this example Python tool:") + print(json.dumps(params, indent=4)) \ No newline at end of file diff --git a/src/workflow.py b/src/simpleworkflow.py similarity index 100% rename from src/workflow.py rename to src/simpleworkflow.py diff --git a/src/workflow/.gitignore b/src/workflow/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/src/workflow/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py new file mode 100644 index 0000000..8e33cd7 --- /dev/null +++ b/src/workflow/CommandExecutor.py @@ -0,0 +1,260 @@ +import time +import os +import shutil +import subprocess +import threading +from pathlib import Path +from .Logger import Logger +from .ParameterManager import ParameterManager +import sys +import importlib.util +import json + +class CommandExecutor: + """ + Manages the execution of external shell commands such as OpenMS TOPP tools within a Streamlit application. + + This class provides a structured approach to executing shell commands, capturing + their output, and handling errors. It is designed to facilitate running both single + commands and batches of commands in parallel, leveraging Python's subprocess module + for execution. + """ + # Methods for running commands and logging + def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: ParameterManager): + self.pid_dir = Path(workflow_dir, "pids") + self.logger = logger + self.parameter_manager = parameter_manager + + def run_multiple_commands( + self, commands: list[str], write_log: bool = True + ) -> None: + """ + Executes multiple shell commands concurrently in separate threads. + + This method leverages threading to run each command in parallel, improving + efficiency for batch command execution. Execution time and command results are + logged if specified. + + Args: + commands (list[str]): A list where each element is a list representing + a command and its arguments. + write_log (bool): If True, logs the execution details and outcomes of the commands. + """ + # Log the start of command execution + self.logger.log(f"Running {len(commands)} commands in parallel...") + start_time = time.time() + + # Initialize a list to keep track of threads + threads = [] + + # Start a new thread for each command + for cmd in commands: + thread = threading.Thread(target=self.run_command, args=(cmd, write_log)) + thread.start() + threads.append(thread) + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Calculate and log the total execution time + end_time = time.time() + self.logger.log( + f"Total time to run {len(commands)} commands: {end_time - start_time:.2f} seconds" + ) + + def run_command(self, command: list[str], write_log: bool = True) -> None: + """ + Executes a specified shell command and logs its execution details. + + Args: + command (list[str]): The shell command to execute, provided as a list of strings. + write_log (bool): If True, logs the command's output and errors. + + Raises: + Exception: If the command execution results in any errors. + """ + # Ensure all command parts are strings + command = [str(c) for c in command] + + # Log the execution start + self.logger.log(f"Running command:\n"+' '.join(command)+"\nWaiting for command to finish...") + + start_time = time.time() + + # Execute the command + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + child_pid = process.pid + + # Record the PID to keep track of running processes associated with this workspace/workflow + # User can close the Streamlit app and return to a running workflow later + pid_file_path = self.pid_dir / str(child_pid) + pid_file_path.touch() + + # Wait for command completion and capture output + stdout, stderr = process.communicate() + + # Cleanup PID file + pid_file_path.unlink() + + end_time = time.time() + execution_time = end_time - start_time + + # Format the logging prefix + self.logger.log(f"Process finished:\n"+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds") + + # Log stdout if present + if stdout and write_log: + self.logger.log(stdout.decode()) + + # Log stderr and raise an exception if errors occurred + if stderr or process.returncode != 0: + error_message = stderr.decode().strip() + self.logger.log(f"ERRORS OCCURRED:\n{error_message}") + raise Exception(f"Errors occurred while running command: {' '.join(command)}\n{error_message}") + + def run_topp(self, tool: str, input_output: dict, write_log: bool = True) -> None: + """ + Constructs and executes commands for the specified tool OpenMS TOPP tool based on the given + input and output configurations. Ensures that all input/output file lists + are of the same length, or single strings, to maintain consistency in command + execution. + In many tools, a single input file is processed to produce a single output file. + When dealing with lists of input or output files, the convention is that + files are paired based on their order. For instance, the n-th input file is + assumed to correspond to the n-th output file, maintaining a structured + relationship between input and output data. + Supports executing commands either as single or multiple processes + based on the input size. + + Args: + tool (str): The executable name or path of the tool. + input_output (dict): A dictionary specifying the input/output parameter names (as key) and their corresponding file paths (as value). + write_log (bool): If True, enables logging of command execution details. + + Raises: + ValueError: If the lengths of input/output file lists are inconsistent, + except for single string inputs. + """ + # check input: any input lists must be same length, other items can be a single string + # e.g. input_mzML : [list of n mzML files], output_featureXML : [list of n featureXML files], input_database : database.tsv + io_lengths = [len(v) for v in input_output.values() if len(v) > 1] + + if len(set(io_lengths)) > 1: + raise ValueError(f"ERROR in {tool} input/output.\nFile list lengths must be 1 and/or the same. They are {io_lengths}.") + + if len(io_lengths) == 0: # all inputs/outputs are length == 1 + n_processes = 1 + else: + n_processes = max(io_lengths) + + commands = [] + + # Load parameters for non-defaults + params = self.parameter_manager.get_parameters_from_json() + # Construct commands for each process + for i in range(n_processes): + command = [tool] + # Add input/output files + for k in input_output.keys(): + # add key as parameter name + command += [f"-{k}"] + # get value from input_output dictionary + value = input_output[k] + # when multiple input/output files exist (e.g., multiple mzMLs and featureXMLs), but only one additional input file (e.g., one input database file) + if len(value) == 1: + i = 0 + # when the entry is a list of collected files to be passed as one [["sample1", "sample2"]] + if isinstance(value[i], list): + command += value[i] + # standard case, files was a list of strings, take the file name at index + else: + command += [value[i]] + # Add non-default TOPP tool parameters + if tool in params.keys(): + for k, v in params[tool].items(): + command += [f"-{k}", str(v)] + commands.append(command) + + # Run command(s) + if len(commands) == 1: + self.run_command(commands[0], write_log) + elif len(commands) > 1: + self.run_multiple_commands(commands, write_log) + else: + raise Exception("No commands to execute.") + + def stop(self) -> None: + """ + Terminates all processes initiated by this executor by killing them based on stored PIDs. + """ + self.logger.log("Stopping all running processes...") + pids = [Path(f).stem for f in self.pid_dir.iterdir()] + + for pid in pids: + try: + os.kill(int(pid), 9) + except OSError as e: + self.logger.log(f"Failed to kill process {pid}: {e}") + + shutil.rmtree(self.pid_dir, ignore_errors=True) + self.logger.log("Workflow stopped.") + + def run_python(self, script_file: str, input_output: dict = {}, write_log: bool = True) -> None: + """ + Executes a specified Python script with dynamic input and output parameters, + optionally logging the execution process. The method identifies and loads + parameter defaults from the script, updates them with any user-specified + parameters and file paths, and then executes the script via a subprocess + call. + + This method facilitates the integration of standalone Python scripts into + a larger application or workflow, allowing for the execution of these scripts + with varying inputs and outputs without modifying the scripts themselves. + + Args: + script_file (str): The name or path of the Python script to be executed. + If the path is omitted, the method looks for the script in 'src/python-tools/'. + The '.py' extension is appended if not present. + input_output (dict, optional): A dictionary specifying the input/output parameter names (as key) and their corresponding file paths (as value). Defaults to {}. + write_log (bool, optional): If True, the execution process is logged. This + includes any output generated by the script as well as any errors. Defaults to True. + """ + # Check if script file exists (can be specified without path and extension) + # default location: src/python-tools/script_file + if not script_file.endswith(".py"): + script_file += ".py" + path = Path(script_file) + if not path.exists(): + path = Path("src", "python-tools", script_file) + if not path.exists(): + self.logger.log(f"Script file not found: {script_file}") + + # load DEFAULTS + if path.parent not in sys.path: + sys.path.append(str(path.parent)) + spec = importlib.util.spec_from_file_location(path.stem, path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + defaults = getattr(module, "DEFAULTS", None) + if defaults is None: + self.logger.log(f"WARNING: No DEFAULTS found in {path.name}") + # run command without params + self.run_command(["python", str(path)], write_log) + elif isinstance(defaults, list): + defaults = {entry["key"]: entry["value"] for entry in defaults} + # load paramters from JSON file + params = {k: v for k, v in self.parameter_manager.get_parameters_from_json().items() if path.name in k} + # update defaults + for k, v in params.items(): + defaults[k.replace(f"{path.name}:", "")] = v + for k, v in input_output.items(): + defaults[k] = v + # save parameters to temporary JSON file + tmp_params_file = Path(self.pid_dir.parent, f"{path.stem}.json") + with open(tmp_params_file, "w", encoding="utf-8") as f: + json.dump(defaults, f, indent=4) + # run command + self.run_command(["python", str(path), str(tmp_params_file)], write_log) + # remove tmp params file + tmp_params_file.unlink() \ No newline at end of file diff --git a/src/workflow/FileManager.py b/src/workflow/FileManager.py new file mode 100644 index 0000000..923ff6b --- /dev/null +++ b/src/workflow/FileManager.py @@ -0,0 +1,180 @@ +from pathlib import Path +import string +import random +import shutil +from typing import Union, List + +class FileManager: + """ + Manages file paths for operations such as changing file extensions, organizing files + into result directories, and handling file collections for processing tools. Designed + to be flexible for handling both individual files and lists of files, with integration + into a Streamlit workflow. + + Methods: + get_files: Returns a list of file paths as strings for the specified files, optionally with new file type and results subdirectory. + collect: Collects all files in a single list (e.g. to pass to tools which can handle multiple input files at once). + """ + + def __init__( + self, + workflow_dir: Path, + ): + """ + Initializes the FileManager object with a the current workflow results directory. + """ + self.workflow_dir = workflow_dir + + def get_files( + self, + files: Union[List[Union[str, Path]], Path, str, List[List[str]]], + set_file_type: str = None, + set_results_dir: str = None, + collect: bool = False, + ) -> Union[List[str], List[List[str]]]: + """ + Returns a list of file paths as strings for the specified files. + Otionally sets or changes the file extension for all files to the + specified file type and changes the directory to a new subdirectory + in the workflow results directory. + + Args: + files (Union[List[Union[str, Path]], Path, str, List[List[str]]]): The list of file + paths to change the type for. + set_file_type (str): The file extension to set for all files. + set_results_dir (str): The name of a subdirectory in the workflow + results directory to change to. If "auto" or "" a random name will be generated. + collect (bool): Whether to collect all files into a single list. Will return a list + with a single entry, which is a list of all files. Useful to pass to tools which + can handle multiple input files at once. + + Returns: + Union[List[str], List[List[str]]]: The (modified) files list. + """ + # Handle input single string + if isinstance(files, str): + files = [files] + # Handle input single Path object, can be directory or file + elif isinstance(files, Path): + if files.is_dir(): + files = [str(f) for f in files.iterdir()] + else: + files = [str(files)] + # Handle input list + elif isinstance(files, list) and files: + # Can have one entry of strings (e.g. if has been collected before by FileManager) + if isinstance(files[0], list): + files = files[0] + # Make sure ever file path is a string + files = [str(f) for f in files if isinstance(f, Path) or isinstance(f, str)] + # Raise error if no files have been detected + if not files: + raise ValueError( + f"No files found, can not set file type **{set_file_type}**, results_dir **{set_results_dir}** and collect **{collect}**." + ) + # Set new file type if required + if set_file_type is not None: + files = self._set_type(files, set_file_type) + # Set new results subdirectory if required + if set_results_dir is not None: + if set_results_dir == "auto": + set_results_dir = "" + files = self._set_dir(files, set_results_dir) + # Collect files into a single list if required + if collect: + files = [files] + return files + + def _set_type(self, files: List[str], set_file_type: str) -> List[str]: + """ + Sets or changes the file extension for all files in the collection to the + specified file type. + + Args: + files (List[str]): The list of file paths to change the type for. + set_file_type (str): The file extension to set for all files. + + Returns: + List[str]: The files list with new type. + """ + + def change_extension(file_path, new_ext): + return Path(file_path).with_suffix("." + new_ext) + + for i in range(len(files)): + if isinstance(files[i], list): # If the item is a list + files[i] = [ + str(change_extension(file, set_file_type)) for file in files[i] + ] + elif isinstance(files[i], str): # If the item is a string + files[i] = str(change_extension(files[i], set_file_type)) + return files + + def _set_dir(self, files: List[str], subdir_name: str) -> List[str]: + """ + Sets the subdirectory within the results directory to store files. If the + subdirectory name is 'auto' or empty, generates a random subdirectory name. + Warns and overwrites if the subdirectory already exists. + + Args: + files (List[str]): The list of file paths to change the type for. + subdir_name (str): The name of the subdirectory within the results directory. + + Returns: + List[str]: The files list with new directory. + """ + if not subdir_name: + subdir_name = self._create_results_sub_dir(subdir_name) + else: + subdir_name = self._create_results_sub_dir(subdir_name) + + def change_subdir(file_path, subdir): + return Path(subdir, Path(file_path).name) + + for i in range(len(files)): + if isinstance(files[i], list): # If the item is a list + files[i] = [str(change_subdir(file, subdir_name)) for file in files[i]] + elif isinstance(files[i], str): # If the item is a string + files[i] = str(change_subdir(files[i], subdir_name)) + return files + + def _generate_random_code(self, length: int) -> str: + """Generate a random code of the specified length. + + Args: + length (int): Length of the random code. + + Returns: + str: Random code of the specified length. + """ + # Define the characters that can be used in the code + # Includes both letters and numbers + characters = string.ascii_letters + string.digits + + # Generate a random code of the specified length + random_code = "".join(random.choice(characters) for _ in range(length)) + + return random_code + + def _create_results_sub_dir(self, name: str = "") -> str: + """ + Creates a subdirectory within the results directory for storing files. If the + name is not specified or empty, generates a random name for the subdirectory. + + Args: + name (str, optional): The desired name for the subdirectory. + + Returns: + str: The path to the created subdirectory as a string. + """ + # create a directory (e.g. for results of a TOPP tool) within the results directory + # if name is empty string, auto generate a name + if not name: + name = self._generate_random_code(4) + # make sure the subdirectory does not exist in results yet + while Path(self.workflow_dir, "results", name).exists(): + name = self._generate_random_code(4) + path = Path(self.workflow_dir, "results", name) + shutil.rmtree(path, ignore_errors=True) + path.mkdir() + return str(path) diff --git a/src/workflow/Logger.py b/src/workflow/Logger.py new file mode 100644 index 0000000..8529938 --- /dev/null +++ b/src/workflow/Logger.py @@ -0,0 +1,28 @@ +from pathlib import Path + +class Logger: + """ + A simple logging class for writing messages to a log file. This class is designed + to append messages to a log file in the current workflow directory, facilitating + easy tracking of events, errors, or other significant occurrences in processes called + during workflow execution. + + Attributes: + log_file (Path): The file path of the log file where messages will be written. + """ + def __init__(self, workflow_dir: Path) -> None: + self.workflow_dir = workflow_dir + self.log_file = Path(self.workflow_dir, "log.txt") + + def log(self, message: str) -> None: + """ + Appends a given message to the log file, followed by two newline characters + for readability. This method ensures that each logged message is separated + for clear distinction in the log file. + + Args: + message (str): The message to be logged to the file. + """ + # Write the message to the log file. + with open(self.log_file, "a", encoding="utf-8") as f: + f.write(f"{message}\n\n") diff --git a/src/workflow/ParameterManager.py b/src/workflow/ParameterManager.py new file mode 100644 index 0000000..b8106f2 --- /dev/null +++ b/src/workflow/ParameterManager.py @@ -0,0 +1,104 @@ +import pyopenms as poms +import json +import shutil +import streamlit as st +from pathlib import Path + +class ParameterManager: + """ + Manages the parameters for a workflow, including saving parameters to a JSON file, + loading parameters from the file, and resetting parameters to defaults. This class + specifically handles parameters related to TOPP tools in a pyOpenMS context and + general parameters stored in Streamlit's session state. + + Attributes: + ini_dir (Path): Directory path where .ini files for TOPP tools are stored. + params_file (Path): Path to the JSON file where parameters are saved. + param_prefix (str): Prefix for general parameter keys in Streamlit's session state. + topp_param_prefix (str): Prefix for TOPP tool parameter keys in Streamlit's session state. + """ + # Methods related to parameter handling + def __init__(self, workflow_dir: Path): + self.ini_dir = Path(workflow_dir, "ini") + self.ini_dir.mkdir(parents=True, exist_ok=True) + self.params_file = Path(workflow_dir, "params.json") + self.param_prefix = f"{workflow_dir.stem}-param-" + self.topp_param_prefix = f"{workflow_dir.stem}-TOPP-" + + def save_parameters(self) -> None: + """ + Saves the current parameters from Streamlit's session state to a JSON file. + It handles both general parameters and parameters specific to TOPP tools, + ensuring that only non-default values are stored. + """ + # Everything in session state which begins with self.param_prefix is saved to a json file + json_params = { + k.replace(self.param_prefix, ""): v + for k, v in st.session_state.items() + if k.startswith(self.param_prefix) + } + # get a list of TOPP tools which are in session state + current_topp_tools = list( + set( + [ + k.replace(self.topp_param_prefix, "").split(":1:")[0] + for k in st.session_state.keys() + if k.startswith(f"{self.topp_param_prefix}") + ] + ) + ) + # for each TOPP tool, open the ini file + for tool in current_topp_tools: + json_params[tool] = {} + # load the param object + param = poms.Param() + poms.ParamXMLFile().load(str(Path(self.ini_dir, f"{tool}.ini")), param) + # get all session state param keys and values for this tool + for key, value in st.session_state.items(): + if key.startswith(f"{self.topp_param_prefix}{tool}:1:"): + # get ini_key + ini_key = key.replace(self.topp_param_prefix, "").encode() + # get ini (default) value by ini_key + ini_value = param.getValue(ini_key) + # need to convert bool values to string values + if isinstance(value, bool): + value = "true" if value else "false" + # convert strings with newlines to list + if isinstance(value, str): + if "\n" in value: + value = [v.encode() for v in value.split("\n")] + # check if value is different from default + if ini_value != value: + # store non-default value + json_params[tool][key.split(":1:")[1]] = value + # Save to json file + with open(self.params_file, "w", encoding="utf-8") as f: + json.dump(json_params, f, indent=4) + + def get_parameters_from_json(self) -> None: + """ + Loads parameters from the JSON file if it exists and returns them as a dictionary. + If the file does not exist, it returns an empty dictionary. + + Returns: + dict: A dictionary containing the loaded parameters. Keys are parameter names, + and values are parameter values. + """ + # Check if parameter file exists + if not Path(self.params_file).exists(): + return {} + else: + # Load parameters from json file + with open(self.params_file, "r", encoding="utf-8") as f: + return json.load(f) + + def reset_to_default_parameters(self) -> None: + """ + Resets the parameters to their default values by deleting the custom parameters + JSON file and the directory containing .ini files for TOPP tools. This method + also triggers a Streamlit rerun to refresh the application state. + """ + # Delete custom params json file + self.params_file.unlink(missing_ok=True) + shutil.rmtree(self.ini_dir) + st.rerun() diff --git a/src/workflow/StreamlitUI.py b/src/workflow/StreamlitUI.py new file mode 100644 index 0000000..f883b00 --- /dev/null +++ b/src/workflow/StreamlitUI.py @@ -0,0 +1,690 @@ +import streamlit as st +import pyopenms as poms +from pathlib import Path +import shutil +import subprocess +from typing import Any, Union, List +import json +import sys +import importlib.util +import time +from io import BytesIO +import zipfile + +class StreamlitUI: + """ + Provides an interface for Streamlit applications to handle file uploads, + input selection, and parameter management for analysis workflows. It includes + methods for uploading files, selecting input files from available ones, and + generating various input widgets dynamically based on the specified parameters. + """ + + # Methods for Streamlit UI components + def __init__(self, workflow_dir, logger, executor, paramter_manager): + self.workflow_dir = workflow_dir + self.logger = logger + self.executor = executor + self.parameter_manager = paramter_manager + self.params = self.parameter_manager.get_parameters_from_json() + + def upload_widget( + self, + key: str, + file_type: str, + name: str = "", + fallback: Union[List, str] = None, + ) -> None: + """ + Handles file uploads through the Streamlit interface, supporting both direct + uploads and local directory copying for specified file types. It allows for + specifying fallback files to ensure essential files are available. + + Args: + key (str): A unique identifier for the upload component. + file_type (str): Expected file type for the uploaded files. + name (str, optional): Display name for the upload component. Defaults to the key if not provided. + fallback (Union[List, str], optional): Default files to use if no files are uploaded. + """ + # streamlit uploader can't handle file types with upper and lower case letters + files_dir = Path(self.workflow_dir, "input-files", key) + + if not name: + name = key.replace("-", " ") + + c1, c2 = st.columns(2) + c1.markdown("**Upload file(s)**") + with c1.form(f"{key}-upload", clear_on_submit=True): + if any(c.isupper() for c in file_type) and (c.islower() for c in file_type): + file_type_for_uploader = None + else: + file_type_for_uploader = [file_type] + files = st.file_uploader( + f"{name}", + accept_multiple_files=(st.session_state.location == "local"), + type=file_type_for_uploader, + label_visibility="collapsed", + ) + if st.form_submit_button( + f"Add **{name}**", use_container_width=True, type="primary" + ): + if files: + files_dir.mkdir(parents=True, exist_ok=True) + for f in files: + if f.name not in [ + f.name for f in files_dir.iterdir() + ] and f.name.endswith(file_type): + with open(Path(files_dir, f.name), "wb") as fh: + fh.write(f.getbuffer()) + st.success("Successfully added uploaded files!") + else: + st.error("Nothing to add, please upload file.") + + # Local file upload option: via directory path + if st.session_state.location == "local": + c2.markdown("**OR copy files from local folder**") + with c2.form(f"{key}-local-file-upload"): + local_dir = st.text_input(f"path to folder with **{name}** files") + if st.form_submit_button( + f"Copy **{name}** files from local folder", use_container_width=True + ): + # raw string for file paths + if not any(Path(local_dir).glob(f"*.{file_type}")): + st.warning( + f"No files with type **{file_type}** found in specified folder." + ) + else: + files_dir.mkdir(parents=True, exist_ok=True) + # Copy all mzML files to workspace mzML directory, add to selected files + files = list(Path(local_dir).glob("*.mzML")) + my_bar = st.progress(0) + for i, f in enumerate(files): + my_bar.progress((i + 1) / len(files)) + shutil.copy(f, Path(files_dir, f.name)) + my_bar.empty() + st.success("Successfully copied files!") + + if fallback: + files_dir.mkdir(parents=True, exist_ok=True) + if isinstance(fallback, str): + fallback = [fallback] + for f in fallback: + if not Path(files_dir, f).exists(): + shutil.copy(f, Path(files_dir, Path(f).name)) + st.info(f"Adding default file: **{f}**") + current_files = [ + f.name + for f in files_dir.iterdir() + if f.name not in [Path(f).name for f in fallback] + ] + else: + if files_dir.exists(): + current_files = [f.name for f in files_dir.iterdir()] + else: + current_files = [] + + if files_dir.exists() and not any(files_dir.iterdir()): + shutil.rmtree(files_dir) + + c1, c2 = st.columns(2) + if current_files: + c1.info(f"Current **{name}** files:\n\n" + "\n\n".join(current_files)) + if c2.button( + f"🗑️ Remove all **{name}** files.", + use_container_width=True, + key=f"remove-files-{key}", + ): + shutil.rmtree(files_dir) + del self.params[key] + with open(self.parameter_manager.params_file, "w", encoding="utf-8") as f: + json.dump(self.params, f, indent=4) + st.rerun() + elif not fallback: + st.warning(f"No **{name}** files!") + + def select_input_file( + self, + key: str, + name: str = "", + multiple: bool = False, + display_file_path: bool = False, + ) -> None: + """ + Presents a widget for selecting input files from those that have been uploaded. + Allows for single or multiple selections. + + Args: + key (str): A unique identifier related to the specific input files. + name (str, optional): The display name for the selection widget. Defaults to the key if not provided. + multiple (bool, optional): If True, allows multiple files to be selected. + display_file_path (bool, optional): If True, displays the full file path in the selection widget. + """ + if not name: + name = f"**{key}**" + path = Path(self.workflow_dir, "input-files", key) + if not path.exists(): + st.warning(f"No **{name}** files!") + return + options = [str(f) for f in path.iterdir()] + if key in self.params.keys(): + self.params[key] = [f for f in self.params[key] if f in options] + + widget_type = "multiselect" if multiple else "selectbox" + self.input_widget( + key, + name=name, + widget_type=widget_type, + options=options, + display_file_path=display_file_path, + ) + + def input_widget( + self, + key: str, + default: Any = None, + name: str = "input widget", + help: str = None, + widget_type: str = "auto", # text, textarea, number, selectbox, slider, checkbox, multiselect + options: List[str] = None, + min_value: Union[int, float] = None, + max_value: Union[int, float] = None, + step_size: Union[int, float] = 1, + display_file_path: bool = False, + ) -> None: + """ + Creates and displays a Streamlit widget for user input based on specified + parameters. Supports a variety of widget types including text input, number + input, select boxes, and more. Default values will be read in from parameters + if they exist. The key is modified to be recognized by the ParameterManager class + as a custom parameter (distinct from TOPP tool parameters). + + Args: + key (str): Unique identifier for the widget. + default (Any, optional): Default value for the widget. + name (str, optional): Display name of the widget. + help (str, optional): Help text to display alongside the widget. + widget_type (str, optional): Type of widget to create ('text', 'textarea', + 'number', 'selectbox', 'slider', 'checkbox', + 'multiselect', 'password', or 'auto'). + options (List[str], optional): Options for select/multiselect widgets. + min_value (Union[int, float], optional): Minimum value for number/slider widgets. + max_value (Union[int, float], optional): Maximum value for number/slider widgets. + step_size (Union[int, float], optional): Step size for number/slider widgets. + display_file_path (bool, optional): Whether to display the full file path for file options. + """ + + def format_files(input: Any) -> List[str]: + if not display_file_path and Path(input).exists(): + return Path(input).name + else: + return input + + if key in self.params.keys(): + value = self.params[key] + else: + value = default + # catch case where options are given but default is None + if options is not None and value is None: + if widget_type == "multiselect": + value = [] + elif widget_type == "selectbox": + value = options[0] + + key = f"{self.parameter_manager.param_prefix}{key}" + + if widget_type == "text": + st.text_input(name, value=value, key=key, help=help) + + elif widget_type == "textarea": + st.text_area(name, value=value, key=key, help=help) + + elif widget_type == "number": + number_type = float if isinstance(value, float) else int + step_size = number_type(step_size) + if min_value is not None: + min_value = number_type(min_value) + if max_value is not None: + max_value = number_type(max_value) + help = str(help) + st.number_input( + name, + min_value=min_value, + max_value=max_value, + value=value, + step=step_size, + format=None, + key=key, + help=help, + ) + + elif widget_type == "checkbox": + st.checkbox(name, value=value, key=key, help=help) + + elif widget_type == "selectbox": + if options is not None: + st.selectbox( + name, + options=options, + index=options.index(value) if value in options else 0, + key=key, + format_func=format_files, + help=help, + ) + else: + st.warning(f"Select widget '{name}' requires options parameter") + + elif widget_type == "multiselect": + if options is not None: + st.multiselect( + name, + options=options, + default=value, + key=key, + format_func=format_files, + help=help, + ) + else: + st.warning(f"Select widget '{name}' requires options parameter") + + elif widget_type == "slider": + if min_value is not None and max_value is not None: + slider_type = float if isinstance(value, float) else int + step_size = slider_type(step_size) + if min_value is not None: + min_value = slider_type(min_value) + if max_value is not None: + max_value = slider_type(max_value) + st.slider( + name, + min_value=min_value, + max_value=max_value, + value=value, + step=step_size, + key=key, + format=None, + help=help, + ) + else: + st.warning( + f"Slider widget '{name}' requires min_value and max_value parameters" + ) + + elif widget_type == "password": + st.text_input(name, value=value, type="password", key=key, help=help) + + elif widget_type == "auto": + # Auto-determine widget type based on value + if isinstance(value, bool): + st.checkbox(name, value=value, key=key, help=help) + elif isinstance(value, (int, float)): + self.input_widget( + key, + value, + widget_type="number", + name=name, + min_value=min_value, + max_value=max_value, + step_size=step_size, + help=help, + ) + elif (isinstance(value, str) or value == None) and options is not None: + self.input_widget( + key, + value, + widget_type="selectbox", + name=name, + options=options, + help=help, + ) + elif isinstance(value, list) and options is not None: + self.input_widget( + key, + value, + widget_type="multiselect", + name=name, + options=options, + help=help, + ) + elif isinstance(value, bool): + self.input_widget(key, value, widget_type="checkbox", name=name, help=help) + else: + self.input_widget(key, value, widget_type="text", name=name, help=help) + + else: + st.error(f"Unsupported widget type '{widget_type}'") + + def input_TOPP( + self, + topp_tool_name: str, + num_cols: int = 3, + exclude_parameters: List[str] = [], + ) -> None: + """ + Generates input widgets for TOPP tool parameters dynamically based on the tool's + .ini file. Supports excluding specific parameters and adjusting the layout. + File input and output parameters are excluded. + + Args: + topp_tool_name (str): The name of the TOPP tool for which to generate inputs. + num_cols (int, optional): Number of columns to use for the layout. Defaults to 3. + exclude_parameters (List[str], optional): List of parameter names to exclude from the widget. + """ + # write defaults ini files + ini_file_path = Path(self.parameter_manager.ini_dir, f"{topp_tool_name}.ini") + if not ini_file_path.exists(): + subprocess.call([topp_tool_name, "-write_ini", str(ini_file_path)]) + # read into Param object + param = poms.Param() + poms.ParamXMLFile().load(str(ini_file_path), param) + + excluded_keys = [ + "log", + "debug", + "threads", + "no_progress", + "force", + "version", + "test", + ] + exclude_parameters + + param_dicts = [] + for key in param.keys(): + # Determine if the parameter should be included based on the conditions + if ( + b"input file" in param.getTags(key) + or b"output file" in param.getTags(key) + ) or (key.decode().split(":")[-1] in excluded_keys): + continue + entry = param.getEntry(key) + param_dict = { + "name": entry.name.decode(), + "key": key, + "value": entry.value, + "valid_strings": [v.decode() for v in entry.valid_strings], + "description": entry.description.decode(), + "advanced": (b"advanced" in param.getTags(key)), + } + param_dicts.append(param_dict) + + # Update parameter values from the JSON parameters file + json_params = self.params + if topp_tool_name in json_params: + for p in param_dicts: + name = p["key"].decode().split(":1:")[1] + if name in json_params[topp_tool_name]: + p["value"] = json_params[topp_tool_name][name] + + # input widgets in n number of columns + cols = st.columns(num_cols) + i = 0 + + # show input widgets + for p in param_dicts: + + # skip avdanced parameters if not selected + if not st.session_state["advanced"] and p["advanced"]: + continue + + key = f"{self.parameter_manager.topp_param_prefix}{p['key'].decode()}" + + try: + # bools + if p["value"] == "true" or p["value"] == "false": + cols[i].markdown("##") + cols[i].checkbox( + p["name"], + value=(p["value"] == "true"), + help=p["description"], + key=key, + ) + + # string options + elif isinstance(p["value"], str) and p["valid_strings"]: + cols[i].selectbox( + p["name"], + options=p["valid_strings"], + index=p["valid_strings"].index(p["value"]), + help=p["description"], + key=key, + ) + + # strings + elif isinstance(p["value"], str): + cols[i].text_input( + p["name"], value=p["value"], help=p["description"], key=key + ) + + # ints + elif isinstance(p["value"], int): + cols[i].number_input( + p["name"], value=int(p["value"]), help=p["description"], key=key + ) + + # floats + elif isinstance(p["value"], float): + cols[i].number_input( + p["name"], + value=float(p["value"]), + step=1.0, + help=p["description"], + key=key, + ) + + # lists + elif isinstance(p["value"], list): + p["value"] = [ + v.decode() if isinstance(v, bytes) else v for v in p["value"] + ] + cols[i].text_area( + p["name"], + value="\n".join(p["value"]), + help=p["description"], + key=key, + ) + + # increment number of columns, create new cols object if end of line is reached + i += 1 + if i == num_cols: + i = 0 + cols = st.columns(num_cols) + except: + cols[i].error(f"Error in parameter **{p['name']}**.") + + def input_python( + self, + script_file: str, + num_cols: int = 3, + ) -> None: + """ + Dynamically generates and displays input widgets based on the DEFAULTS + dictionary defined in a specified Python script file. + + For each entry in the DEFAULTS dictionary, an input widget is displayed, + allowing the user to specify values for the parameters defined in the + script. The widgets are arranged in a grid with a specified number of + columns. Parameters can be marked as hidden or advanced within the DEFAULTS + dictionary; hidden parameters are not displayed, and advanced parameters + are displayed only if the user has selected to view advanced options. + + Args: + script_file (str): The file name or path to the Python script containing + the DEFAULTS dictionary. If the path is omitted, the method searches in + src/python-tools/'. + num_cols (int, optional): The number of columns to use for displaying input widgets. Defaults to 3. + """ + + # Check if script file exists (can be specified without path and extension) + # default location: src/python-tools/script_file + if not script_file.endswith(".py"): + script_file += ".py" + path = Path(script_file) + if not path.exists(): + path = Path("src", "python-tools", script_file) + if not path.exists(): + st.error("Script file not found.") + # load DEFAULTS from file + if path.parent not in sys.path: + sys.path.append(str(path.parent)) + spec = importlib.util.spec_from_file_location(path.stem, path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + defaults = getattr(module, "DEFAULTS", None) + if defaults is None: + st.error("No DEFAULTS found in script file.") + return + elif isinstance(defaults, list): + # display input widget for every entry in defaults + # input widgets in n number of columns + cols = st.columns(num_cols) + i = 0 + for entry in defaults: + key = f"{path.name}:{entry['key']}" if "key" in entry else None + if key is None: + st.error("Key not specified for parameter.") + continue + value = entry["value"] if "value" in entry else None + if value is None: + st.error("Value not specified for parameter.") + continue + hide = entry["hide"] if "hide" in entry else False + # no need to display input and output files widget or hidden parameters + if hide: + continue + advanced = entry["advanced"] if "advanced" in entry else False + # skip avdanced parameters if not selected + if not st.session_state["advanced"] and advanced: + continue + name = entry["name"] if "name" in entry else key + help = entry["help"] if "help" in entry else "" + min_value = entry["min"] if "min" in entry else None + max_value = entry["max"] if "max" in entry else None + step_size = entry["step_size"] if "step_size" in entry else 1 + widget_type = entry["widget_type"] if "widget_type" in entry else "auto" + options = entry["options"] if "options" in entry else None + + with cols[i]: + if isinstance(value, bool): + st.markdown("#") + self.input_widget( + key=key, + default=value, + name=name, + help=help, + widget_type=widget_type, + options=options, + min_value=min_value, + max_value=max_value, + step_size=step_size, + ) + # increment number of columns, create new cols object if end of line is reached + i += 1 + if i == num_cols: + i = 0 + cols = st.columns(num_cols) + + def zip_and_download_files(self, directory: str): + """ + Creates a zip archive of all files within a specified directory, + including files in subdirectories, and offers it as a download + button in a Streamlit application. + + Args: + directory (str): The directory whose files are to be zipped. + """ + # Ensure directory is a Path object and check if directory is empty + directory = Path(directory) + if not any(directory.iterdir()): + st.error("No files to compress.") + return + + bytes_io = BytesIO() + files = list(directory.rglob("*")) # Use list comprehension to find all files + + # Check if there are any files to zip + if not files: + st.error("Directory is empty or contains no files.") + return + + n_files = len(files) + + # Initialize Streamlit progress bar + my_bar = st.progress(0) + + with zipfile.ZipFile(bytes_io, "w", zipfile.ZIP_DEFLATED) as zip_file: + for i, file_path in enumerate(files): + if file_path.is_file(): # Ensure we're only adding files, not directories + # Preserve directory structure relative to the original directory + zip_file.write(file_path, file_path.relative_to(directory.parent)) + my_bar.progress((i + 1) / n_files) # Update progress bar + + my_bar.empty() # Clear progress bar after operation is complete + bytes_io.seek(0) # Reset buffer pointer to the beginning + + # Display a download button for the zip file in Streamlit + st.columns(2)[1].download_button( + label="⬇️ Download Now", + data=bytes_io, + file_name="input-files.zip", + mime="application/zip", + use_container_width=True + ) + + + def file_upload_section(self, custom_upload_function) -> None: + custom_upload_function() + if st.button("⬇️ Download all uploaded files", use_container_width=True): + self.ui.zip_and_download_files(Path(self.workflow_dir, "input-files")) + + def parameter_section(self, custom_paramter_function) -> None: + st.toggle("Show advanced parameters", value=False, key="advanced") + + form = st.form( + key=f"{self.workflow_dir.stem}-input-form", + clear_on_submit=True, + ) + + with form: + cols = st.columns(2) + + cols[0].form_submit_button( + label="Save parameters", + on_click=self.parameter_manager.save_parameters, + type="primary", + use_container_width=True, + ) + + if cols[1].form_submit_button( + label="Load default parameters", use_container_width=True + ): + self.parameter_manager.reset_to_default_parameters() + + custom_paramter_function() + # Save parameters + self.parameter_manager.save_parameters() + + def execution_section(self, start_workflow_function) -> None: + if self.executor.pid_dir.exists(): + if st.button("Stop Workflow", type="primary", use_container_width=True): + self.executor.stop() + st.rerun() + else: + st.button( + "Start Workflow", + type="primary", + use_container_width=True, + on_click=start_workflow_function, + ) + + if self.logger.log_file.exists(): + if self.executor.pid_dir.exists(): + with st.spinner("**Workflow running...**"): + with open(self.logger.log_file, "r", encoding="utf-8") as f: + st.code(f.read(), language="neon", line_numbers=True) + time.sleep(2) + st.rerun() + else: + st.markdown("**Workflow log file**") + with open(self.logger.log_file, "r", encoding="utf-8") as f: + st.code(f.read(), language="neon", line_numbers=True) + + def results_section(self, custom_results_function) -> None: + custom_results_function() \ No newline at end of file diff --git a/src/workflow/WorkflowManager.py b/src/workflow/WorkflowManager.py new file mode 100644 index 0000000..3f70097 --- /dev/null +++ b/src/workflow/WorkflowManager.py @@ -0,0 +1,111 @@ +from pathlib import Path +from .Logger import Logger +from .ParameterManager import ParameterManager +from .CommandExecutor import CommandExecutor +from .StreamlitUI import StreamlitUI +from .FileManager import FileManager +import multiprocessing +import shutil + +class WorkflowManager: + # Core workflow logic using the above classes + def __init__(self, name: str, workspace: str): + self.name = name + self.workflow_dir = Path(workspace, name.replace(" ", "-").lower()) + self.file_manager = FileManager(self.workflow_dir) + self.logger = Logger(self.workflow_dir) + self.parameter_manager = ParameterManager(self.workflow_dir) + self.executor = CommandExecutor(self.workflow_dir, self.logger, self.parameter_manager) + self.params = self.parameter_manager.get_parameters_from_json() + self.ui = StreamlitUI(self.workflow_dir, self.logger, self.executor, self.parameter_manager) + + def start_workflow(self) -> None: + """ + Starts the workflow process and adds its process id to the pid directory. + The workflow itself needs to be a process, otherwise streamlit will wait for everything to finish before updating the UI again. + """ + # Delete the log file if it already exists + self.logger.log_file.unlink(missing_ok=True) + # Start workflow process + workflow_process = multiprocessing.Process(target=self.workflow_process) + workflow_process.start() + # Add workflow process id to pid dir + self.executor.pid_dir.mkdir() + Path(self.executor.pid_dir, str(workflow_process.pid)).touch() + + def workflow_process(self) -> None: + """ + Workflow process. Logs start and end of the workflow and calls the execution method where all steps are defined. + """ + try: + self.logger.log("STARTING WORKFLOW") + results_dir = Path(self.workflow_dir, "results") + if results_dir.exists(): + shutil.rmtree(results_dir) + results_dir.mkdir(parents=True) + self.execution() + self.logger.log("WORKFLOW FINISHED") + except Exception as e: + self.logger.log(f"ERROR: {e}") + # Delete pid dir path to indicate workflow is done + shutil.rmtree(self.executor.pid_dir, ignore_errors=True) + + def show_file_upload_section(self) -> None: + """ + Shows the file upload section of the UI with content defined in self.upload(). + """ + self.ui.file_upload_section(self.upload) + + def show_parameter_section(self) -> None: + """ + Shows the parameter section of the UI with content defined in self.configure(). + """ + self.ui.parameter_section(self.configure) + + def show_execution_section(self) -> None: + """ + Shows the execution section of the UI with content defined in self.execution(). + """ + self.ui.execution_section(self.start_workflow) + + def show_results_section(self) -> None: + """ + Shows the results section of the UI with content defined in self.results(). + """ + self.ui.results_section(self.results) + + def upload(self) -> None: + """ + Add your file upload widgets here + """ + ################################### + # Add your file upload widgets here + ################################### + pass + + def configure(self) -> None: + """ + Add your input widgets here + """ + ################################### + # Add your input widgets here + ################################### + pass + + def execution(self) -> None: + """ + Add your workflow steps here + """ + ################################### + # Add your workflow steps here + ################################### + pass + + def results(self) -> None: + """ + Display results here + """ + ################################### + # Display results here + ################################### + pass \ No newline at end of file diff --git a/src/workflow/__init__.py b/src/workflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test.py b/test.py index 4981c50..58e36ca 100644 --- a/test.py +++ b/test.py @@ -2,7 +2,7 @@ import unittest from urllib.request import urlretrieve -from src.workflow import generate_random_table +from src.simpleworkflow import generate_random_table from src.complexworkflow import mzML_file_get_num_spectra from pathlib import Path