From febf3b712499d9bc7d60358ce26804d894bf9a4f Mon Sep 17 00:00:00 2001 From: Acribbs Date: Fri, 25 Oct 2024 15:21:01 +0100 Subject: [PATCH] tests passing for removal of imp, need to fix individual script tests --- cgat/tools/cgat2dot.py | 393 --------------------------------- cgat/tools/cgat_get_options.py | 18 +- tests/test_commandline.py | 37 +--- tests/test_import.py | 79 ++----- 4 files changed, 41 insertions(+), 486 deletions(-) delete mode 100644 cgat/tools/cgat2dot.py diff --git a/cgat/tools/cgat2dot.py b/cgat/tools/cgat2dot.py deleted file mode 100644 index 9525cba71..000000000 --- a/cgat/tools/cgat2dot.py +++ /dev/null @@ -1,393 +0,0 @@ -'''cgat2dot.py - create a graph between cgat scripts -==================================================== - -:Tags: Python - -Purpose -------- - -This script creates an rdf description of a cgat script. - -Optionally, the script outputs also a galaxy xml description of the -scripts' interface. - -Usage ------ - -Example:: - - python cgat2dot.py scripts/*.py - -Type:: - - python cgat2dot.py --help - -for command line help. - -Documentation -------------- - -Command line options --------------------- - -''' - -import os -import sys -import re -import imp - -import cgatcore.experiment as E - -BASE_URL = "https://www.cgat.org/downloads/public/cgat/documentation/" - -ORIGINAL_START = None - -PARSER = None - - -def _e(string): - return string.replace(' ', '_') - - -MAP_FORMATS = { - 'tsv': 'table', - 'table': 'table', - 'stats': 'table', - 'csv': 'table', -} - -PRINCIPAL_FORMATS = ('bam', - 'gff', - 'gtf', - 'bed', - 'wiggle', - 'fasta', - 'fastq', - 'fastqs') - -BREAK_FORMATS = {'table': 0} -MAP_TYPE2FORMAT = { - 'gff': 'gff,gtf', - 'gtf': 'gff,gtf', - 'bam': 'bam', - 'sam': 'sam', - 'bigwig': 'bigWig', - 'bed': 'bed', -} - -NODE_STYLE_DEFAULT = 'color="#A5BB00",style="filled"' -NODE_STYLE_FORMAT = 'color="#7577B8",style="filled"' - -EDGE_STYLE_CONVERSION = 'color="#7577B8",penwidth=2' -EDGE_STYLE_DEFAULT = 'color="#A5BB00",penwidth=1' - - -class DummyError(Exception): - pass - - -def LocalStart(parser, *args, **kwargs): - '''stub for E.start - set return_parser argument to true''' - global PARSER - PARSER = ORIGINAL_START(parser, - return_parser=True, - **kwargs - ) - raise DummyError() - - -def getDescription(scriptname, docstring): - '''get script description from docstring.''' - - description = scriptname - for line in docstring.split("\n"): - if line.startswith(scriptname): - description = line[line.index("-") + 1:].strip() - break - - return description - - -def guessFormats(scriptname, docstring): - '''guess the input/output format of a script.''' - - input_format, output_format = "tsv", "tsv" - - if "2" in scriptname: - input_format, output_format = scriptname.split("2") - - # map cgat format names to GALAXY ones - input_format = MAP_FORMATS.get(input_format, input_format) - output_format = MAP_FORMATS.get(output_format, output_format) - - return input_format, output_format - - -def buildParam(**kwargs): - '''return a parameter with default values. - - Specific fields can be set by providing keyword arguments. - ''' - - param = {} - - param['label'] = "label" - param['description'] = "description" - param['rank'] = 1 - param['display'] = 'show' - param['min_occurrence'] = 0 - param['max_occurrence'] = 1 - - # get default value - param['value'] = "value" - param['type'] = "text" - param['dependencies'] = {} - param['property_bag'] = {} - param['arg_long'] = '--long-argument' - - param.update(kwargs) - return param - - -def processScript(script_name, outfile, args): - '''process one script.''' - - # call other script - prefix, suffix = os.path.splitext(script_name) - - dirname = os.path.dirname(script_name) - basename = os.path.basename(script_name)[:-3] - - if args.src_dir: - dirname = args.src_dir - script_name = os.path.join(dirname, basename) + ".py" - - if os.path.exists(prefix + ".pyc"): - os.remove(prefix + ".pyc") - - pyxfile = os.path.join(dirname, "_") + basename + ".pyx" - if os.path.exists(pyxfile): - pass - - try: - module = imp.load_source(basename, script_name) - except ImportError as msg: - E.warn('could not import %s - skipped: %s' % (basename, msg)) - return - - E.info("loaded module %s" % module) - - E.start = LocalStart - try: - module.main(argv=["--help"]) - except TypeError as msg: - E.warn('could not import %s: %s' % (basename, msg)) - return - except DummyError: - pass - - # get script's docstring - docstring = module.__doc__ - - input_format, output_format = guessFormats(basename, docstring) - - if output_format in BREAK_FORMATS: - nodename = '%s%i' % (output_format, BREAK_FORMATS[output_format]) - outfile.write('%s [label="%s"];\n' % - (nodename, - output_format)) - BREAK_FORMATS[output_format] += 1 - output_format = nodename - - url = BASE_URL + "scripts/%s.html" % basename - - # Note that URL needs to be uppercase! - if input_format in PRINCIPAL_FORMATS and \ - output_format in PRINCIPAL_FORMATS: - edge_style = EDGE_STYLE_CONVERSION - else: - edge_style = EDGE_STYLE_DEFAULT - outfile.write('"%s" -> "%s" [label="%s",URL="%s",%s];\n' % - (input_format, output_format, basename, url, - edge_style)) - - return - - # for k in dir(PARSER): - # print k, getattr(PARSER, k) - # for option in PARSER.option_list: - # print option, option.type, option.help, option._short_opts, - # option._long_opts, option.default - - # @prefix clp: . - # @prefix co: . - # @prefix dcterms: . - - defaults = PARSER.get_default_values() - - for option in PARSER.option_list: - # ignore options added by optparse - if option.dest is None: - continue - - # ignore benchmarking options - if option.dest.startswith("timeit"): - continue - - # ignore options related to forcing output - if "force" in option.dest: - continue - - # ignore some special options: - # if option.dest in ("output_filename_pattern", ): - # continue - - # ignore output options - if option.dest in ("stdin", "stdout", "stdlog", "stderr", "loglevel"): - continue - - # remove default from help string - option.help = re.sub("\[[^\]]*%default[^\]]*\]", "", option.help) - - param = buildParam() - - # get command line option call (long/short option) - try: - param['arg'] = option._short_opts[0] - except IndexError: - pass - - try: - param['arg_long'] = option._long_opts[0] - except IndexError: - pass - - assert 'arg' in param or 'arg_long' in param - - # print "----------------------------------" - # print [(x,getattr(option,x)) for x in dir( option )] - - param['name'] = option.dest - param['ns_name'] = option.dest - if option.type == "int": - param['type'] = "integer" - elif option.type == "float": - param['type'] = "float" - elif option.type == "string": - param['type'] = "text" - if option.metavar: - mvar = option.metavar.lower() - if mvar in MAP_TYPE2FORMAT: - param['format'] = MAP_TYPE2FORMAT[mvar] - param['type'] = "data" - if mvar == "bam": - pass - - elif option.type == "choice": - param['type'] = "select" - param['choices'] = option.choices - if option.action == "append": - param['multiple'] = True - elif option.action.startswith("store"): - param['type'] = "boolean" - else: - raise ValueError("unknown type for %s" % str(option)) - - param['label'] = option.dest - param['description'] = option.help - param['rank'] = 1 - param['display'] = 'show' - param['min_occurrence'] = 0 - param['max_occurrence'] = 1 - - # get default value - param['value'] = getattr(defaults, option.dest) - - -def main(argv=None): - """script main. - - parses command line options in sys.argv, unless *argv* is given. - """ - - if not argv: - argv = sys.argv - - # setup command line parser - parser = E.ArgumentParser(description=__doc__) - - parser.add_argument("-f", "--format", dest="output_format", type=str, - choices=("rdf", "galaxy"), - help="output format . ") - - parser.add_argument("-l", "--list", dest="filename_list", type=str, - help="filename with list of files to export " - ". ") - - parser.add_argument("-s", "--source-dir", dest="src_dir", type=str, - help="directory to look for scripts . ") - - parser.add_argument("-r", "--input-regex", dest="input_regex", type=str, - help="regular expression to extract script name " - ". ") - - parser.add_argument("-p", "--output-filename-pattern", dest="output_pattern", - type=str, - help="pattern to build output filename. Should contain " - "an '%s' . ") - - parser.set_defaults(output_format="rdf", - src_dir=None, - input_regex=None, - output_pattern=None, - filename_list=None) - - # add common options (-h/--help, ...) and parse command line - (args) = E.start(parser, argv=argv) - - if len(args) == 0: - E.info("reading script names from stdin") - for line in args.stdin: - if line.startswith("#"): - continue - args.append(line[:-1].split("\t")[0]) - - # start script in order to build the command line parser - global ORIGINAL_START - ORIGINAL_START = E.start - - if args.output_pattern and not args.input_regex: - raise ValueError( - "please specify --input-regex when using --output-filename-pattern") - - outfile = args.stdout - outfile.write("""digraph cgat { - size="10,20"; - # scale graph so that there are no overlaps - overlap=scale; - splines=True; -\n""") - - # set node format for principal genomic formats - for format in PRINCIPAL_FORMATS: - outfile.write('"%s" [shape=box,%s];\n' % (format, NODE_STYLE_FORMAT)) - - # general node format - outfile.write('node [%s];\n' % NODE_STYLE_DEFAULT) - - # go through script to provide edges - for script_name in args: - if not script_name.endswith(".py"): - raise ValueError("expected a python script ending in '.py'") - - E.info("input=%s, output=%s" % (script_name, outfile)) - processScript(script_name, outfile, args) - - outfile.write("}\n") - - E.stop() - - -if __name__ == "__main__": - sys.exit(main(sys.argv)) diff --git a/cgat/tools/cgat_get_options.py b/cgat/tools/cgat_get_options.py index 9af305356..cb6fd5827 100644 --- a/cgat/tools/cgat_get_options.py +++ b/cgat/tools/cgat_get_options.py @@ -34,11 +34,10 @@ -------------------- ''' - import sys import os import glob -import imp +import importlib.util # Use importlib instead of imp import collections import pandas import cgatcore.experiment as E @@ -72,7 +71,6 @@ def LocalStart(parser, *args, **kwargs): def collectOptionsFromScript(script_name): '''collect options used in script *script_name*.''' - # call other script prefix, suffix = os.path.splitext(script_name) dirname = os.path.dirname(script_name) @@ -81,14 +79,16 @@ def collectOptionsFromScript(script_name): if os.path.exists(prefix + ".pyc"): os.remove(prefix + ".pyc") - # check if script contains getopt with iotools.open_file(script_name) as inf: if "getopt" in inf.read(): E.warn("script %s uses getopt directly" % script_name) return [] try: - module = imp.load_source(basename, script_name) + # Using importlib to load the module dynamically + spec = importlib.util.spec_from_file_location(basename, script_name) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) except ImportError as msg: E.warn('could not import %s - skipped: %s' % (basename, msg)) return [] @@ -108,7 +108,6 @@ def collectOptionsFromScript(script_name): result = [] for option in PARSER.option_list: - # ignore options added by optparse if option.dest is None: continue @@ -128,7 +127,6 @@ def main(argv=None): if argv is None: argv = sys.argv - # setup command line parser parser = E.ArgumentParser(description=__doc__) parser.add_argument( @@ -146,8 +144,7 @@ def main(argv=None): inplace=False, tsv_file=None) - # add common options (-h/--help, ...) and parse command line - (args) = E.start(parser, argv=argv) + args = E.start(parser, argv=argv) old_options = None if args.tsv_file: @@ -183,7 +180,6 @@ def main(argv=None): for o in collected_options: all_options[o].append(f) - # add old options for x in old_options.index: if x not in all_options: all_options[x].append("--") @@ -198,7 +194,6 @@ def main(argv=None): for o, v in sorted(all_options.items()): try: action, comment, alternative, ff = old_options.xs(o) - except KeyError: action, comment, alternative, ff = "", "", "", "" @@ -213,7 +208,6 @@ def main(argv=None): if outfile != args.stdout: outfile.close() - # write footer and output benchmark information. E.stop() if __name__ == "__main__": diff --git a/tests/test_commandline.py b/tests/test_commandline.py index 0feb1d953..78219da56 100644 --- a/tests/test_commandline.py +++ b/tests/test_commandline.py @@ -2,7 +2,7 @@ test_commandline - Tests coding style conformity of CGAT code collection. ========================================================================== -:Author: Andreas Heger +:Author: Adam Cribbs :Release: $Id$ :Date: |today| :Tags: Python @@ -20,7 +20,6 @@ to make all package scripts available for import and testing. ''' - import glob import os import importlib @@ -30,7 +29,6 @@ import copy import argparse -from nose.tools import ok_ import cgatcore.experiment as E import cgatcore.iotools as iotools import TestUtils @@ -52,9 +50,9 @@ "__init__.py", "version.py", "cgat.py", - "gtf2table.py", # Fails with pysam include issue - "bed2table.py", # Fails with pysam include issue - "fasta2bed.py", # Fails due to pybedtools rebuild requirements + "gtf2table.py", + "bed2table.py", + "fasta2bed.py", ] # Filename for the black/white list of options @@ -101,7 +99,6 @@ def load_script(script_name): script_dir, script_base = os.path.split(script_path) module_name = ".".join(filter(None, [script_dir.replace(os.sep, '.'), script_base])) - # Remove compiled files to ensure fresh import compiled_script = script_path + ".pyc" if os.path.exists(compiled_script): os.remove(compiled_script) @@ -139,9 +136,7 @@ def test_cmdline(): script_name = os.path.abspath(script) module, module_name = load_script(script) - if not module: - yield fail_, f"Module {script_name} could not be imported." - continue + assert module is not None, f"Module {script_name} could not be imported." # Replace the start function to capture parser E.start = LocalStart @@ -153,20 +148,19 @@ def test_cmdline(): # Expected flow interruption by LocalStart pass except Exception as e: - yield fail_, f"Error invoking main of {script_name}: {e}" - continue + assert False, f"Error invoking main of {script_name}: {e}" if PARSER: - for action in PARSER._actions: # Iterate through the actions stored in the parser - if isinstance(action, argparse._HelpAction): # Skip help actions + for action in PARSER._actions: + if isinstance(action, argparse._HelpAction): continue - opt_strings = action.option_strings # Get the list of CLI flags - if not opt_strings: # This skips positional arguments + opt_strings = action.option_strings + if not opt_strings: continue for opt_string in opt_strings: if opt_string.startswith("--"): opt_string = opt_string[2:] - yield check_option, opt_string, script_name, option_actions + check_option(opt_string, script_name, option_actions) # Reset module to avoid conflicts if module_name in sys.modules: @@ -174,14 +168,5 @@ def test_cmdline(): def check_option(option, script_name, option_actions): - print(f"Checking option: {option} in script: {script_name}") # Diagnostic print assert option in option_actions, f"Option {option} in script {script_name} is unknown or not allowed." assert option_actions[option] == "ok", f"Option {option} in script {script_name} is not allowed." - - -def fail_(msg): - '''Generate a failing test with the provided message.''' - ok_(False, msg) - -# Reset E.start to its original function after testing -E.start = ORIGINAL_START diff --git a/tests/test_import.py b/tests/test_import.py index 4e5ab5cee..4f9c80c66 100644 --- a/tests/test_import.py +++ b/tests/test_import.py @@ -1,7 +1,7 @@ '''test_import - test importing all modules and pipelines ========================================================= -:Author: Andreas Heger +:Author: Adam Cribbs :Release: $Id$ :Date: |today| :Tags: Python @@ -22,13 +22,10 @@ ''' - import os import glob import traceback -import imp - -from nose.tools import ok_ +import importlib.util # DIRECTORIES to examine for python modules/scripts EXPRESSIONS = ( @@ -38,27 +35,12 @@ # Scripts to exclude as they fail imports. EXCLUDE = ( - # The following fail because of pybedtools - # compilation fails. Reason why it triggers - # recompilation or why it fails is unknown - # (it seems using C compiler for C++ code). - 'pipeline_intervals', - 'PipelinePeakcalling', - 'IndexedFasta', # fails with relative import error in py2 - 'pipeline_peakcalling', - 'bam2transcriptContribution', - 'beds2counts', - 'fasta2bed', - # The following fail because of pyximport - # problems - 'bed2table', - # The following fail because of version imports from cgat-core table(s) - "table2table", - "combine_tables") + 'pipeline_intervals', 'PipelinePeakcalling', 'IndexedFasta', + 'pipeline_peakcalling', 'bam2transcriptContribution', 'beds2counts', + 'fasta2bed', 'bed2table', 'table2table', 'combine_tables') def check_import(filename, outfile): - prefix, suffix = os.path.splitext(filename) dirname, basename = os.path.split(prefix) @@ -71,53 +53,40 @@ def check_import(filename, outfile): except OSError: pass - # ignore script with pyximport for now, something does not work - # which can lead to errors in downstream files. Issues for - # example: - # When a pyximport script is imported before one that imports a module - # with a cython extension is being re-compiled, but without the proper - # flags. + # ignore scripts with pyximport for now blob = open(filename).read() if "import pyximport" in blob: return try: - imp.load_source(basename, filename) + # Use importlib to load the module dynamically + spec = importlib.util.spec_from_file_location(basename, filename) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) except ImportError as msg: - outfile.write("FAIL %s\n%s\n" % (basename, msg)) + outfile.write(f"FAIL {basename}\n{msg}\n") outfile.flush() traceback.print_exc(file=outfile) - ok_(False, '%s scripts/modules - ImportError: %s' % - (basename, msg)) + assert False, f'{basename} scripts/modules - ImportError: {msg}' except Exception as msg: - outfile.write("FAIL %s\n%s\n" % (basename, msg)) + outfile.write(f"FAIL {basename}\n{msg}\n") outfile.flush() - traceback.print_exc(file=outfile) - ok_(False, '%s scripts/modules - Exception: %s' % - (basename, msg)) + assert False, f'{basename} scripts/modules - Exception: {msg}' - ok_(True) + assert True def test_imports(): - '''test importing - - Relative imports will cause a failure because imp.load_source does - not import modules that are in the same directory as the module - being loaded from source. - - ''' - outfile = open('test_import.log', 'a') - for label, expression in EXPRESSIONS: - - files = glob.glob(expression) - files.sort() + '''test importing modules and scripts''' - for f in files: + with open('test_import.log', 'a') as outfile: + for label, expression in EXPRESSIONS: + files = glob.glob(expression) + files.sort() - if os.path.isdir(f): - continue - check_import.description = os.path.abspath(f) - yield(check_import, os.path.abspath(f), outfile) + for f in files: + if os.path.isdir(f): + continue + check_import(os.path.abspath(f), outfile)