From bb9370d266e5e9e5e5643cd76bdee58f7e68214f Mon Sep 17 00:00:00 2001 From: Juraj Smiesko <34742917+kjvbrt@users.noreply.github.com> Date: Sun, 28 Jan 2024 21:20:59 +0100 Subject: [PATCH] Generate computational graph of the analysis (#341) * Generate computational graph of the analysis * Generating full graph for histmaker * Checked with mypy --- .gitignore | 4 +++ man/man1/fccanalysis-run.1 | 11 ++++++- man/man7/fccanalysis-script.7 | 11 +++++++ python/anascript.py | 6 ++++ python/frame.py | 58 +++++++++++++++++++++++++++++++++++ python/frame.pyi | 7 +++++ python/parsers.py | 10 ++++++ python/process.py | 13 ++++---- python/process.pyi | 2 +- python/run_analysis.py | 45 +++++++++++++++++++-------- python/run_final_analysis.py | 27 ++++++++++------ 11 files changed, 165 insertions(+), 29 deletions(-) create mode 100644 python/frame.py create mode 100644 python/frame.pyi diff --git a/.gitignore b/.gitignore index eed87bf..c0d7979 100644 --- a/.gitignore +++ b/.gitignore @@ -100,3 +100,7 @@ benchmark*json # Local configuration .fccana/* + +# Graphviz graphs +*.dot +*.png diff --git a/man/man1/fccanalysis-run.1 b/man/man1/fccanalysis-run.1 index f6b416b..8bcb7bc 100644 --- a/man/man1/fccanalysis-run.1 +++ b/man/man1/fccanalysis-run.1 @@ -12,6 +12,8 @@ [\fB\-\-test\fR] [\fB\-\-bench\fR] [\fB\-\-ncpus\fR \fINCPUS\fR] +[\fB\-g\fR] +[\fB\-\-graph\-path\fR \fIGRAPH_PATH\fR] .I analysis-script .SH DESCRIPTION .B fccanalysis-run @@ -47,8 +49,15 @@ Run over the test file\&. .B \-\-bench Output benchmark results to a JSON file\&. .TP -\-\-ncpus \fINCPUS\fR +\fB\-\-ncpus\fR \fINCPUS\fR Set number of threads\&. +.TP +.BR \-g ", " \-\-graph +The computational graph of the analysis will be generated\&. +.TP +\fB\-\-graph\-path\fR \fIGRAPH_PATH\fR +Location where the computational graph of the analysis should be stored. Only +paths with \fI.dot\fR and \fI.png\fR extensions are accepted. .SH ENVIRONMENT VARIABLES .TP .B FCCDICTSDIR diff --git a/man/man7/fccanalysis-script.7 b/man/man7/fccanalysis-script.7 index eb68ef1..6addd93 100644 --- a/man/man7/fccanalysis-script.7 +++ b/man/man7/fccanalysis-script.7 @@ -160,6 +160,17 @@ Location of the test file. .br Default value: empty string .TP +\fBgraph\fR (optional) +The computational graph of the analysis will be generated. +.br +Default value: False +.TP +\fBgraphPath\fR (optional) +Location where the computational graph of the analysis should be stored. Only +paths with \fI.dot\fR and \fI.png\fR extensions are accepted. +.br +Default value: empty string +.TP .B procDict This variable controls which process dictionary will be used. It can be either simple file name, absolute path or url. In the case of simple filename, the file diff --git a/python/anascript.py b/python/anascript.py index dc7ed2c..f06ed07 100644 --- a/python/anascript.py +++ b/python/anascript.py @@ -225,6 +225,12 @@ def get_element(rdf_module, element: str, is_final: bool = False): 'stage of the analysis.', element) return '' + elif element == 'graph': + return False + + elif element == 'graphPath': + return '' + return None diff --git a/python/frame.py b/python/frame.py new file mode 100644 index 0000000..2930046 --- /dev/null +++ b/python/frame.py @@ -0,0 +1,58 @@ +''' +RDataFrame helpers. +''' + +import os +import pathlib +import shutil +import logging +import ROOT # type: ignore + + +ROOT.gROOT.SetBatch(True) + +LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.frame') + + +# _____________________________________________________________________________ +def generate_graph(dframe, args, suffix: str | None = None) -> None: + ''' + Generate computational graph of the analysis + ''' + # Check if output file path is provided + graph_path: pathlib.PurePath = pathlib.PurePath(args.graph_path) + if args.graph_path == '': + graph_path = pathlib.PurePath(args.anascript_path).with_suffix('.dot') + + # check if file path ends with "correct" extension + if graph_path.suffix not in ('.dot', '.png'): + LOGGER.warning('Graph output file extension not recognized!\n' + 'Using analysis script name...') + graph_path = pathlib.PurePath(args.anascript_path).with_suffix('.dot') + + # Add optional suffix to the output file path + if suffix is not None: + graph_path = graph_path.with_name(graph_path.stem + + suffix + + graph_path.suffix) # extension + + # Announce to which files graph will be saved + if shutil.which('dot') is None: + LOGGER.info('Analysis computational graph will be saved into:\n - %s', + graph_path.with_suffix('.dot')) + else: + LOGGER.info('Analysis computational graph will be saved into:\n - %s\n - %s', + graph_path.with_suffix('.dot'), + graph_path.with_suffix('.png')) + + # Generate graph in .dot format + ROOT.RDF.SaveGraph(dframe, str(graph_path.with_suffix('.dot'))) + + if shutil.which('dot') is None: + LOGGER.warning('PNG version of the computational graph will not be ' + 'generated.\nGraphviz library not found!') + return + + # Convert .dot file into .png + os.system(f'dot -Tpng {graph_path.with_suffix(".dot")} ' + f'-o {graph_path.with_suffix(".png")}') diff --git a/python/frame.pyi b/python/frame.pyi new file mode 100644 index 0000000..b9939a2 --- /dev/null +++ b/python/frame.pyi @@ -0,0 +1,7 @@ +# generated with `stubgen frame.py` + +import logging + +LOGGER: logging.Logger + +def generate_graph(dframe, args, suffix: str | None = None) -> None: ... diff --git a/python/parsers.py b/python/parsers.py index ce10776..03df576 100644 --- a/python/parsers.py +++ b/python/parsers.py @@ -114,6 +114,11 @@ def setup_run_parser(parser): help='output benchmark results to a JSON file') parser.add_argument('--ncpus', type=int, default=-1, help='set number of threads') + parser.add_argument('-g', '--graph', action='store_true', default=False, + help='generate computational graph of the analysis') + parser.add_argument('--graph-path', type=str, default='', + help='analysis graph save path, should end with ' + '\'.dot\' or \'.png\'') # Internal argument, not to be used by the users parser.add_argument('--batch', action='store_true', default=False, @@ -126,6 +131,11 @@ def setup_run_parser_final(parser): ''' parser.add_argument('anascript_path', help='path to analysis_final script') + parser.add_argument('-g', '--graph', action='store_true', default=False, + help='generate computational graph of the analysis') + parser.add_argument('--graph-path', type=str, default='', + help='analysis graph save path, should end with ' + '\'.dot\' or \'.png\'') def setup_run_parser_plots(parser): diff --git a/python/process.py b/python/process.py index cecbeab..addb708 100644 --- a/python/process.py +++ b/python/process.py @@ -84,8 +84,9 @@ def get_process_info_files(process: str, input_dir: str) -> tuple[list[str], return filelist, eventlist -def get_process_info_yaml(process: str, prod_tag: str) -> tuple[list[str], - list[int]]: +def get_process_info_yaml(process_name: str, + prod_tag: str) -> tuple[list[str], + list[int]]: ''' Get list of files and events from the YAML file ''' @@ -93,13 +94,13 @@ def get_process_info_yaml(process: str, prod_tag: str) -> tuple[list[str], proc_dict_dirs = get_process_dict_dirs() yamlfilepath = None for path in proc_dict_dirs: - yamlfilepath = os.path.join(path, 'yaml', prod_tag, process, + yamlfilepath = os.path.join(path, 'yaml', prod_tag, process_name, 'merge.yaml') if not os.path.isfile(yamlfilepath): continue - if not yamlfilepath: - LOGGER.error('Can\'t find the YAML file with process info!\n' - 'Aborting...') + if not os.path.isfile(yamlfilepath): + LOGGER.error('Can\'t find the YAML file with process info for process ' + '"%s"!\nAborting...', process_name) sys.exit(3) with open(yamlfilepath, 'r', encoding='utf-8') as ftmp: diff --git a/python/process.pyi b/python/process.pyi index b3b12a9..1a74ac0 100644 --- a/python/process.pyi +++ b/python/process.pyi @@ -7,6 +7,6 @@ LOGGER: logging.Logger def get_entries(inpath: str) -> int: ... def get_process_info(process: str, prod_tag: str, input_dir: str) -> tuple[list[str], list[int]]: ... def get_process_info_files(process: str, input_dir: str) -> tuple[list[str], list[int]]: ... -def get_process_info_yaml(process: str, prod_tag: str) -> tuple[list[str], list[int]]: ... +def get_process_info_yaml(process_name: str, prod_tag: str) -> tuple[list[str], list[int]]: ... def get_process_dict(proc_dict_location: str) -> dict: ... def get_process_dict_dirs() -> list[str]: ... diff --git a/python/run_analysis.py b/python/run_analysis.py index 9ac5355..c65d36a 100644 --- a/python/run_analysis.py +++ b/python/run_analysis.py @@ -5,8 +5,8 @@ import os import sys import time -import json import shutil +import json import logging import subprocess import importlib.util @@ -16,6 +16,7 @@ import ROOT # type: ignore from anascript import get_element, get_element_dict from process import get_process_info, get_process_dict +from frame import generate_graph LOGGER = logging.getLogger('FCCAnalyses.run') @@ -158,24 +159,26 @@ def create_subjob_script(local_dir: str, # _____________________________________________________________________________ -def get_subfile_list(in_file_list, event_list, fraction): +def get_subfile_list(in_file_list: list[str], + event_list: list[int], + fraction: float) -> list[str]: ''' Obtain list of files roughly containing the requested fraction of events. ''' - nevts_total = sum(event_list) - nevts_target = int(nevts_total * fraction) + nevts_total: int = sum(event_list) + nevts_target: int = int(nevts_total * fraction) if nevts_target <= 0: LOGGER.error('The reduction fraction %f too stringent, no events ' 'left!\nAborting...', fraction) sys.exit(3) - nevts_real = 0 - out_file_list = [] - for i in enumerate(event_list): + nevts_real: int = 0 + out_file_list: list[str] = [] + for i, nevts in enumerate(event_list): if nevts_real >= nevts_target: break - nevts_real += event_list[i] + nevts_real += nevts out_file_list.append(in_file_list[i]) info_msg = f'Reducing the input file list by fraction "{fraction}" of ' @@ -325,13 +328,19 @@ def run_rdf(rdf_module, for bname in blist: branch_list.push_back(bname) + evtcount = df2.Count() + + # Generate computational graph of the analysis + if args.graph: + generate_graph(df2, args) + df2.Snapshot("events", out_file, branch_list) except Exception as excp: LOGGER.error('During the execution of the analysis file exception ' 'occurred:\n%s', excp) sys.exit(3) - return df2.Count() + return evtcount # _____________________________________________________________________________ @@ -749,13 +758,18 @@ def run_histmaker(args, rdf_module, anapath): info_msg += f'\n\toutput = {output}\n\tchunks = {chunks}' LOGGER.info(info_msg) - df = ROOT.ROOT.RDataFrame("events", file_list_root) - evtcount = df.Count() - res, hweight = graph_function(df, process) + dframe = ROOT.ROOT.RDataFrame("events", file_list_root) + evtcount = dframe.Count() + + res, hweight = graph_function(dframe, process) results.append(res) hweights.append(hweight) evtcounts.append(evtcount) + # Generate computational graph of the analysis + if args.graph: + generate_graph(dframe, args) + LOGGER.info('Starting the event loop...') start_time = time.time() ROOT.ROOT.RDF.RunGraphs(evtcounts) @@ -906,6 +920,13 @@ def run(parser): rdf_module = importlib.util.module_from_spec(rdf_spec) rdf_spec.loader.exec_module(rdf_module) + # Merge configuration from analysis script file with command line arguments + if get_element(rdf_module, 'graph'): + args.graph = True + + if get_element(rdf_module, 'graphPath') != '': + args.graph_path = get_element(rdf_module, 'graphPath') + if hasattr(rdf_module, "build_graph") and \ hasattr(rdf_module, "RDFanalysis"): LOGGER.error('Analysis file ambiguous!\nBoth "RDFanalysis" ' diff --git a/python/run_final_analysis.py b/python/run_final_analysis.py index d3874ee..582803e 100644 --- a/python/run_final_analysis.py +++ b/python/run_final_analysis.py @@ -12,6 +12,7 @@ import ROOT # type: ignore from anascript import get_element, get_element_dict from process import get_process_dict +from frame import generate_graph LOGGER = logging.getLogger('FCCAnalyses.run_final') @@ -70,7 +71,7 @@ def testfile(f: str) -> bool: # __________________________________________________________ -def run(rdf_module): +def run(rdf_module, args): ''' Main loop. ''' @@ -117,7 +118,7 @@ def run(rdf_module): if not os.path.exists(output_dir) and output_dir != '': os.system(f'mkdir -p {output_dir}') - cut_list = get_element(rdf_module, "cutList", True) + cut_list: dict[str, str] = get_element(rdf_module, "cutList", True) length_cuts_names = max(len(cut) for cut in cut_list) cut_labels = get_element(rdf_module, "cutLabels", True) @@ -206,12 +207,13 @@ def run(rdf_module): # Define all histos, snapshots, etc... LOGGER.info('Defining snapshots and histograms') - for cut in cut_list: + for cut_name, cut_definition in cut_list.items(): # output file for tree - fout = output_dir + process_name + '_' + cut + '.root' + fout = output_dir + process_name + '_' + cut_name + '.root' fout_list.append(fout) - df_cut = df.Filter(cut_list[cut]) + df_cut = df.Filter(cut_definition) + count_list.append(df_cut.Count()) histos = [] @@ -270,6 +272,9 @@ def run(rdf_module): # the snapshot tdf_list.append(snapshot_tdf) + if args.graph: + generate_graph(df, args) + # Now perform the loop and evaluate everything at once. LOGGER.info('Evaluating...') all_events = df.Count().GetValue() @@ -407,11 +412,8 @@ def run(rdf_module): ' \\begin{tabular}{|l||') outfile.write('c|' * (len(cuts_list)-1)) outfile.write('} \\hline\n') - print(eff_list) for i in range(len(eff_list)): outfile.write(' ') - print('i:', i) - print(efficiency_list) v = [row[i] for row in efficiency_list] outfile.write(' & '.join(str(v))) outfile.write(' \\\\\n') @@ -493,4 +495,11 @@ def run_final(parser): rdf_module = importlib.util.module_from_spec(rdf_spec) rdf_spec.loader.exec_module(rdf_module) - run(rdf_module) + # Merge configuration from analysis script file with command line arguments + if get_element(rdf_module, 'graph'): + args.graph = True + + if get_element(rdf_module, 'graphPath') != '': + args.graph_path = get_element(rdf_module, 'graphPath') + + run(rdf_module, args)