diff --git a/bin/fccanalysis b/bin/fccanalysis index 731a17fdcd9..62231d89990 100755 --- a/bin/fccanalysis +++ b/bin/fccanalysis @@ -5,18 +5,21 @@ Starting point (executable) for fccanalysis command import argparse import logging +import sys from parsers import setup_subparsers from init_analysis import init_analysis from build_analysis import build_analysis from test_fccanalyses import test_fccanalyses from pin_analysis import PinAnalysis +from submit import submit_analysis from run_analysis import run from run_final_analysis import run_final from do_plots import do_plots from do_combine import do_combine +# _____________________________________________________________________________ class MultiLineFormatter(logging.Formatter): ''' Multi-line formatter. @@ -42,6 +45,25 @@ class MultiLineFormatter(logging.Formatter): return head + ''.join(indent + line for line in trailing) +# _____________________________________________________________________________ +class MaxLevelFilter(logging.Filter): + ''' + Filters (lets through) all messages with level < LEVEL + + Found here: + https://stackoverflow.com/questions/1383254/ + ''' + def __init__(self, level): + super().__init__() + self.level = level + + def filter(self, record): + # "<" instead of "<=": since logger.setLevel is inclusive, + # this should be exclusive + return record.levelno < self.level + + +# _____________________________________________________________________________ def main(): ''' Starting point for fccanalysis command @@ -78,10 +100,16 @@ def main(): logger = logging.getLogger('FCCAnalyses') logger.setLevel(verbosity_level) formatter = MultiLineFormatter(fmt='----> %(levelname)s: %(message)s') - stream_handler = logging.StreamHandler() - stream_handler.setLevel(verbosity_level) - stream_handler.setFormatter(formatter) - logger.addHandler(stream_handler) + stdout_stream_handler = logging.StreamHandler(sys.stdout) + stderr_stream_handler = logging.StreamHandler(sys.stderr) + lower_than_warning_filter = MaxLevelFilter(logging.WARNING) + stdout_stream_handler.addFilter(lower_than_warning_filter) + stdout_stream_handler.setLevel(verbosity_level) + stderr_stream_handler.setLevel(logging.WARNING) + stdout_stream_handler.setFormatter(formatter) + stderr_stream_handler.setFormatter(formatter) + logger.addHandler(stdout_stream_handler) + logger.addHandler(stderr_stream_handler) if args.command == 'init': init_analysis(parser) @@ -91,6 +119,8 @@ def main(): test_fccanalyses(parser) elif args.command == 'pin': PinAnalysis(parser) + elif args.command == 'submit': + submit_analysis(parser) elif args.command == 'final': run_final(parser) elif args.command == 'plots': @@ -101,5 +131,6 @@ def main(): run(parser) +# _____________________________________________________________________________ if __name__ == "__main__": main() diff --git a/examples/FCCee/higgs/mH-recoil/ee/analysis_stage1_batch.py b/examples/FCCee/higgs/mH-recoil/ee/analysis_stage1_batch.py index 7b1f5cbcf11..197ea86e6eb 100644 --- a/examples/FCCee/higgs/mH-recoil/ee/analysis_stage1_batch.py +++ b/examples/FCCee/higgs/mH-recoil/ee/analysis_stage1_batch.py @@ -1,4 +1,4 @@ -electron#Mandatory: List of processes +# Mandatory: List of processes processList = { 'p8_ee_ZZ_ecm240':{'chunks':20},#Run the full statistics in 10 jobs in output dir /p8_ee_ZZ_ecm240/chunk.root 'p8_ee_WW_ecm240':{'chunks':20},#Run the full statistics in 10 jobs in output dir /p8_ee_WW_ecm240/chunk.root diff --git a/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py b/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py index 25a29c0ea87..2e935ce45a4 100644 --- a/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py +++ b/examples/FCCee/higgs/mH-recoil/mumu/analysis_stage1_batch.py @@ -12,15 +12,15 @@ class Analysis(): Higgs mass recoil analysis in Z(mumu)H. ''' def __init__(self, cmdline_args): + # Parse additional arguments not known to the FCCAnalyses parsers. + # All command line arguments are provided in the `cmdline_arg` + # dictionary and arguments after "--" are stored under "remaining" key. parser = ArgumentParser( description='Additional analysis arguments', - usage='Provide additional arguments after analysis script path') + usage='Provided after "--"') parser.add_argument('--muon-pt', default='10.', type=float, help='Minimal pT of the mouns.') - # Parse additional arguments not known to the FCCAnalyses parsers - # All command line arguments know to fccanalysis are provided in the - # `cmdline_arg` dictionary. - self.ana_args, _ = parser.parse_known_args(cmdline_args['unknown']) + self.ana_args, _ = parser.parse_known_args(cmdline_args['remaining']) # Mandatory: List of processes to run over self.process_list = { @@ -42,14 +42,11 @@ def __init__(self, cmdline_args): # self.analysis_name = 'My Analysis' # Optional: number of threads to run on, default is 'all available' - # self.n_threads = 4 - - # Optional: running on HTCondor, default is False - self.run_batch = True + self.n_threads = 4 # Optional: batch queue name when running on HTCondor, default is - # 'workday' - self.batch_queue = 'workday' + # 'longlunch' + # self.batch_queue = 'workday' # Optional: computing account when running on CERN's HTCondor, default # is 'group_u_FCC.local_gen' @@ -57,12 +54,13 @@ def __init__(self, cmdline_args): # Optional: output directory on eos, if specified files will be copied # there once the batch job is done, default is empty - self.output_dir_eos = '/eos/experiment/fcc/ee/analyses/case-studies/' \ - f'higgs/mH-recoil/stage1_{self.ana_args.muon_pt}' + # self.output_dir_eos = '/eos/experiment/fcc/ee/analyses/case-studies/' \ + # f'higgs/mH-recoil/stage1_{self.ana_args.muon_pt}' + self.output_dir_eos = '/eos/user/j/jsmiesko/FCCAnalyses/output' # Optional: type for eos, needed when is specified. The # default is FCC EOS, which is eospublic - self.eos_type = 'eospublic' + # self.eos_type = 'eospublic' # Optional: test file self.test_file = 'root://eospublic.cern.ch//eos/experiment/fcc/ee/' \ diff --git a/man/man7/fccanalysis-script.7 b/man/man7/fccanalysis-script.7 index fc35fe9435c..0f6495e22d7 100644 --- a/man/man7/fccanalysis-script.7 +++ b/man/man7/fccanalysis-script.7 @@ -125,12 +125,7 @@ Default value: empty string \fBnCPUS\fR (optional) Number of threads the RDataFrame will use\&. .br -Default value: 4 -.TP -\fBrunBatch\fR (optional) -Run the analysis on the HTCondor batch system. -.br -Default value: False +Default value: 1 .TP \fBbatchQueue\fR (optional) Batch queue name when running on HTCondor. diff --git a/python/batch.py b/python/batch.py new file mode 100644 index 00000000000..f4176c5d3ff --- /dev/null +++ b/python/batch.py @@ -0,0 +1,427 @@ +''' +Submitting to the HTCondor batch system. +''' + +import os +import sys +import time +import logging +import subprocess +import datetime +import argparse +from typing import Any + +from process import get_process_info +from process import get_subfile_list, get_chunk_list + + +LOGGER = logging.getLogger('FCCAnalyses.batch') + + +# _____________________________________________________________________________ +def determine_os(fccana_dir: str) -> str | None: + ''' + Determines platform on which FCCAnalyses was compiled + ''' + cmake_config_path = fccana_dir + '/build/CMakeFiles/CMakeConfigureLog.yaml' + if not os.path.isfile(cmake_config_path): + LOGGER.warning('CMake configuration file was not found!\n' + 'Was FCCAnalyses properly build?') + return None + + with open(cmake_config_path, 'r', encoding='utf-8') as cmake_config_file: + cmake_config = cmake_config_file.read() + if 'centos7' in cmake_config: + return 'centos7' + if 'almalinux9' in cmake_config: + return 'almalinux9' + + return None + + +# _____________________________________________________________________________ +def create_condor_config(config: dict[str, Any], + log_dir: str, + sample_name: str, + subjob_scripts: list[str]) -> str: + ''' + Creates contents of HTCondor submit description file. + ''' + cfg = 'executable = $(scriptfile)\n' + + cfg += f'log = {log_dir}/condor_job.{sample_name}.$(ClusterId).log\n' + + cfg += f'output = {log_dir}/condor_job.{sample_name}.$(ClusterId).' + cfg += '$(ProcId).out\n' + + cfg += f'error = {log_dir}/condor_job.{sample_name}.$(ClusterId).' + cfg += '$(ProcId).error\n' + + cfg += 'getenv = False\n' + + build_os = determine_os(config['fccana-dir']) + if build_os == 'centos7': + cfg += 'MY.WantOS = el7\n' + elif build_os == 'almalinux9': + cfg += 'MY.WantOS = el9\n' + else: + LOGGER.warning('OS of the machine the jobs are submitted from is ' + 'unknown!\nThere may be compatibility issues...') + + # cfg += '(Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n' + + # cfg += 'on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n' + + cfg += 'max_retries = 3\n' + + cfg += f'+JobFlavour = "{config["batch-queue"]}"\n' + + cfg += f'+AccountingGroup = "{config["accounting-group"]}"\n' + + cfg += f'RequestCpus = {config["n-threads"]}\n\n' + + # cfg += 'should_transfer_files = yes\n' + # cfg += 'when_to_transfer_output = on_exit\n' + + # cfg += 'transfer_output_files = $(outfile)\n' + + if config['output-dir-eos']: + cfg += 'output_destination = ' + cfg += f'root://eosuser.cern.ch/{config["output-dir-eos"]}\n' + cfg += 'MY.XRDCP_CREATE_DIR = True\n\n' + + # Add user batch configuration if any. + if config['user-batch-config'] is not None: + with open(config['user-batch-config'], 'r', encoding='utf-8') as cfile: + for line in cfile: + cfg += line + '\n' + cfg += '\n\n' + + # sample_output_dir = os.path.join(config['output-dir'], sample_name) + # cfg += 'queue scriptfile, outfile from (\n' + # for idx, scriptfile in enumerate(subjob_scripts): + # cfg += f' {scriptfile}, {sample_output_dir}/chunk_{idx}.root\n' + # cfg += ')\n' + + cfg += 'queue scriptfile from (\n' + for scriptfile in subjob_scripts: + cfg += f' {scriptfile}\n' + cfg += ')\n' + + return cfg + + +# _____________________________________________________________________________ +def create_subjob_script(config: dict[str, Any], + sample_name: str, + chunk_list: list[list[str]], + chunk_num: int) -> str: + ''' + Creates sub-job script to be run. + ''' + + sample_output_filepath = os.path.join(config['output-dir'], + sample_name, + f'chunk_{chunk_num}.root') + + scr = '#!/bin/bash\n\n' + + scr += f'source {config["fccana-dir"]}/setup.sh\n\n' + + scr += 'which fccanalysis\n\n' + + scr += f'fccanalysis run {config["full-analysis-path"]}' + scr += f' --output {sample_output_filepath}' + scr += f' --n-threads {config["n-threads"]}' + scr += ' --files-list' + for file_path in chunk_list[chunk_num]: + scr += f' {file_path}' + if len(config['cli-arguments']['remaining']) > 0: + scr += ' -- ' + ' '.join(config['cli-arguments']['remaining']) + scr += '\n' + + # output_dir_eos = get_attribute(analysis, 'output_dir_eos', None) + # if not os.path.isabs(output_dir) and output_dir_eos is None: + # final_dest = os.path.join(fccana_dir, output_dir, sample_name, + # f'chunk_{chunk_num}.root') + # scr += f'cp {output_path} {final_dest}\n' + + # if output_dir_eos is not None: + # eos_type = get_attribute(analysis, 'eos_type', 'eospublic') + + # final_dest = os.path.join(output_dir_eos, + # sample_name, + # f'chunk_{chunk_num}.root') + # final_dest = f'root://{eos_type}.cern.ch/' + final_dest + # scr += f'xrdcp {output_path} {final_dest}\n' + + return scr + + +# _____________________________________________________________________________ +def submit_job(cmd: str, max_trials: int) -> bool: + ''' + Submit job to condor, retry `max_trials` times. + ''' + for i in range(max_trials): + with subprocess.Popen(cmd, shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) as proc: + (stdout, stderr) = proc.communicate() + + if proc.returncode == 0 and len(stderr) == 0: + LOGGER.info(stdout) + LOGGER.info('GOOD SUBMISSION') + return True + + LOGGER.warning('Error while submitting, retrying...\n ' + 'Trial: %i / %i\n Error: %s', + i, max_trials, stderr) + time.sleep(10) + + LOGGER.error('Failed submitting after: %i trials!', max_trials) + return False + + +# _____________________________________________________________________________ +def merge_config_analysis_class(config: dict[str, Any], + args: argparse.Namespace, + analysis_module: Any) -> dict[str, Any]: + ''' + Merge configuration from the analysis script class and the command-line + arguments. + ''' + + analysis_class = analysis_module.Analysis(vars(args)) + config['analysis-class'] = analysis_class + + # Check if there are any processes defined. + if not hasattr(analysis_class, 'process_list'): + LOGGER.error('Analysis does not define any processes!\n' + 'Aborting...') + sys.exit(3) + config['sample-list'] = analysis_class.process_list + + # Check if there is production tag or input directory defined. + if not hasattr(analysis_class, 'prod_tag') and \ + not hasattr(analysis_class, 'input_dir'): + LOGGER.error('Analysis does not define production tag or input ' + 'directory!\nAborting...') + sys.exit(3) + + if hasattr(analysis_class, 'prod_tag') and \ + hasattr(analysis_class, 'input_dir'): + LOGGER.error('Analysis defines both production tag and input ' + 'directory!\nAborting...') + sys.exit(3) + + # Determine input. + if hasattr(analysis_class, 'prod_tag'): + config['production-tag'] = analysis_class.prod_tag + else: + config['production-tag'] = None + + if hasattr(analysis_class, 'input_dir'): + config['input-directory'] = analysis_class.input_dir + else: + config['input-directory'] = None + + # Determine number of threads to run in. + if hasattr(analysis_class, 'n_threads'): + config['n-threads'] = analysis_class.n_threads + else: + config['n-threads'] = 1 + + # Determine batch queue. + if hasattr(analysis_class, 'batch_queue'): + config['batch-queue'] = analysis_class.batch_queue + else: + config['batch-queue'] = 'longlunch' + + # Determine accounting group. + if hasattr(analysis_class, 'comp_group'): + config['accounting-group'] = analysis_class.comp_group + else: + config['accounting-group'] = 'group_u_FCC.local_gen' + + # Check if user provided additional job description parameters. + config['user-batch-config'] = None + if hasattr(analysis_class, 'user_batch_config'): + if not os.path.isfile(analysis_class.user_batch_config): + LOGGER.warning('Provided file with additional job description ' + 'parameters can\'t be found!\nFile path: %s\n' + 'Continuing...', analysis_class.user_batch_config) + else: + config['user-batch-config'] = analysis_class.user_batch_config + + # Check for global output directory. + config['output-dir'] = None + if hasattr(analysis_class, 'output_dir'): + config['output-dir'] = analysis_class.output_dir + os.system(f'mkdir -p {config["output-dir"]}') + + # Check if EOS output dir is defined. + config['output-dir-eos'] = None + if hasattr(analysis_class, 'output_dir_eos'): + config['output-dir-eos'] = analysis_class.output_dir_eos + else: + if config['submission-filesystem-type'] == 'eos': + LOGGER.error('Submission to CERN\'s HTCondor from EOS requires ' + '"output_dir_eos" analysis attribute defined!\n' + 'Aborting...') + sys.exit(3) + + return config + + +# _____________________________________________________________________________ +def send_sample(config: dict[str, Any], + sample_name: str) -> None: + ''' + Send sample to HTCondor batch system. + ''' + sample_dict = config['sample-list'][sample_name] + + # Create log directory + current_date = datetime.datetime.fromtimestamp( + datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') + log_dir = os.path.join('BatchOutputs', current_date, sample_name) + if not os.path.exists(log_dir): + os.system(f'mkdir -p {log_dir}') + + # Making sure the FCCAnalyses libraries are compiled and installed + try: + subprocess.check_output(['make', 'install'], + cwd=config['fccana-dir']+'/build', + stderr=subprocess.DEVNULL + ) + except subprocess.CalledProcessError: + LOGGER.error('The FCCanalyses libraries are not properly build and ' + 'installed!\nAborting job submission...') + sys.exit(3) + + # Determine the fraction of the input to be processed + fraction = 1. + if 'fraction' in sample_dict: + fraction = sample_dict['fraction'] + + # Determine the number of chunks the output will be split into + chunks = 1 + if 'chunks' in sample_dict: + chunks = sample_dict['chunks'] + + file_list, event_list = get_process_info(sample_name, + config['production-tag'], + config['input-directory']) + + if len(file_list) <= 0: + LOGGER.error('No files to process!\nContinuing...') + return + + # Adjust number of input files according to the fraction requirement + if fraction < 1.: + file_list = get_subfile_list(file_list, event_list, fraction) + + # Adjust number of output files according to the chunk requirement + chunk_list = [file_list] + if chunks > 1: + chunk_list = get_chunk_list(file_list, chunks) + else: + LOGGER.warning('Number of requested output chunks for the sample ' + '"%s" is suspiciously low!', sample_name) + + subjob_scripts = [] + for chunk_num in range(len(chunk_list)): + subjob_script_path = os.path.join( + log_dir, + f'job_{sample_name}_chunk_{chunk_num}.sh') + subjob_scripts.append(subjob_script_path) + + for i in range(3): + try: + with open(subjob_script_path, 'w', encoding='utf-8') as ofile: + subjob_script = create_subjob_script(config, + sample_name, + chunk_list, + chunk_num) + ofile.write(subjob_script) + except IOError as err: + if i < 2: + LOGGER.warning('I/O error(%i): %s', + err.errno, err.strerror) + else: + LOGGER.error('I/O error(%i): %s', err.errno, err.strerror) + sys.exit(3) + else: + break + time.sleep(10) + subprocess.getstatusoutput(f'chmod u+x {subjob_script_path}') + + LOGGER.debug('Sub-job scripts to be run:\n - %s', + '\n - '.join(subjob_scripts)) + + condor_config_path = f'{log_dir}/job_desc_{sample_name}.cfg' + + for i in range(3): + try: + with open(condor_config_path, 'w', encoding='utf-8') as cfgfile: + condor_config = create_condor_config(config, + log_dir, + sample_name, + subjob_scripts) + cfgfile.write(condor_config) + except IOError as err: + LOGGER.warning('I/O error(%i): %s', err.errno, err.strerror) + if i == 2: + sys.exit(3) + else: + break + time.sleep(10) + # subprocess.getstatusoutput(f'chmod u+x {condor_config_path}') + + if config['submission-filesystem-type'] == 'eos': + batch_cmd = f'condor_submit -spool {condor_config_path}' + LOGGER.warning('To download the log files use "condor_transfer_data" ' + 'command!') + else: + batch_cmd = f'condor_submit {condor_config_path}' + LOGGER.info('Batch command:\n %s', batch_cmd) + success = submit_job(batch_cmd, 10) + if not success: + sys.exit(3) + + +# _____________________________________________________________________________ +def send_to_batch(args: argparse.Namespace, + analysis_module: Any) -> None: + ''' + Send jobs to HTCondor batch system. + ''' + + config: dict[str, Any] = {} + config['cli-arguments'] = vars(args) + config['analysis-path'] = args.anascript_path + config['full-analysis-path'] = os.path.abspath(args.anascript_path) + + # Find location of the FCCanalyses directory + # TODO: Rename LOCAL_DIR to FCCANA_DIR + config['fccana-dir'] = os.environ['LOCAL_DIR'] + + # Get current working directory + config['current-working-directory'] = os.getcwd() + + # Determine type of filesystem the submission is made from + if os.getcwd().startswith('/eos/'): + config['submission-filesystem-type'] = 'eos' + elif os.getcwd().startswith('/afs/'): + config['submission-filesystem-type'] = 'afs' + else: + config['submission-filesystem-type'] = 'unknown' + + if hasattr(analysis_module, "Analysis"): + config = merge_config_analysis_class(config, args, analysis_module) + + for sample_name in config['sample-list'].keys(): + LOGGER.info('Submitting sample "%s"', sample_name) + + send_sample(config, sample_name) diff --git a/python/parsers.py b/python/parsers.py index 75db7975afc..3e2fc03ac73 100644 --- a/python/parsers.py +++ b/python/parsers.py @@ -83,6 +83,7 @@ def setup_test_parser(parser): ) +# _____________________________________________________________________________ def setup_pin_parser(parser): ''' Arguments for the pin sub-command @@ -102,16 +103,33 @@ def setup_pin_parser(parser): help='show pinned stack') +# _____________________________________________________________________________ +def setup_submit_parser(parser): + ''' + Define command line arguments for the submit sub-command. + ''' + parser.add_argument('anascript_path', + type=str, + help='path to the analysis script') + parser.add_argument('-w', '--where', + type=str, + choices=['ht-condor', 'slurm', 'grid'], + default='ht-condor', + help='where to submit the analysis') + parser.add_argument('remaining', nargs=argparse.REMAINDER) + + +# _____________________________________________________________________________ def setup_run_parser(parser): ''' Define command line arguments for the run sub-command. ''' parser.add_argument('anascript_path', help='path to analysis script') - parser.add_argument('--files-list', default=[], nargs='+', + parser.add_argument('--files-list', nargs='+', default=[], help='specify input file(s) to bypass the processList') parser.add_argument( - '--output', + '-o', '--output', type=str, default='output.root', help='specify output file name to bypass the processList and or ' @@ -122,7 +140,7 @@ def setup_run_parser(parser): help='run over the test input file') parser.add_argument('--bench', action='store_true', default=False, help='output benchmark results to a JSON file') - parser.add_argument('-j', '--ncpus', type=int, default=-1, + parser.add_argument('-j', '--ncpus', '--n-threads', type=int, help='set number of threads') parser.add_argument('-g', '--graph', action='store_true', default=False, help='generate computational graph of the analysis') @@ -133,11 +151,8 @@ def setup_run_parser(parser): '--use-data-source', action='store_true', default=False, help='use EDM4hep RDataSource to construct dataframe') - # Internal argument, not to be used by the users - parser.add_argument('--batch', action='store_true', default=False, - help=argparse.SUPPRESS) - +# _____________________________________________________________________________ def setup_run_parser_final(parser): ''' Define command line arguments for the final sub-command. @@ -151,6 +166,7 @@ def setup_run_parser_final(parser): '\'.dot\' or \'.png\'') +# _____________________________________________________________________________ def setup_run_parser_plots(parser): ''' Define command line arguments for the plots sub-command. @@ -172,6 +188,7 @@ def setup_run_parser_plots(parser): help='maximal y position of the legend') +# _____________________________________________________________________________ def setup_run_parser_combine(parser): ''' Define command line arguments for the combine sub-command. @@ -180,34 +197,37 @@ def setup_run_parser_combine(parser): # _____________________________________________________________________________ -def setup_subparsers(subparsers): +def setup_subparsers(topparser): ''' Sets all sub-parsers for all sub-commands ''' - # Create sub-parsers - parser_init = subparsers.add_parser( + # Instantiate sub-parsers + parser_init = topparser.add_parser( 'init', help="generate a RDataFrame based FCC analysis") - parser_build = subparsers.add_parser( + parser_build = topparser.add_parser( 'build', help='build and install local analysis') - parser_test = subparsers.add_parser( + parser_test = topparser.add_parser( 'test', help='test whole or a part of the analysis framework') - parser_pin = subparsers.add_parser( + parser_pin = topparser.add_parser( 'pin', help='pin fccanalyses to the current version of Key4hep stack') - parser_run = subparsers.add_parser( + parser_submit = topparser.add_parser( + 'submit', + help="submit the analysis to be run on a remote machine(s)") + parser_run = topparser.add_parser( 'run', help="run a RDataFrame based FCC analysis") - parser_run_final = subparsers.add_parser( + parser_run_final = topparser.add_parser( 'final', help="run a RDataFrame based FCC analysis final configuration") - parser_run_plots = subparsers.add_parser( + parser_run_plots = topparser.add_parser( 'plots', help="run a RDataFrame based FCC analysis plot configuration") - parser_run_combine = subparsers.add_parser( + parser_run_combine = topparser.add_parser( 'combine', help="prepare combine cards to run basic template fits") @@ -216,6 +236,7 @@ def setup_subparsers(subparsers): setup_build_parser(parser_build) setup_test_parser(parser_test) setup_pin_parser(parser_pin) + setup_submit_parser(parser_submit) setup_run_parser(parser_run) setup_run_parser_final(parser_run_final) setup_run_parser_plots(parser_run_plots) diff --git a/python/process.py b/python/process.py index c510eb440eb..2a69455d2a8 100644 --- a/python/process.py +++ b/python/process.py @@ -12,13 +12,15 @@ import yaml # type: ignore import ROOT # type: ignore import cppyy +import numpy as np -ROOT.gROOT.SetBatch(True) +ROOT.gROOT.SetBatch(True) LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.process_info') +# _____________________________________________________________________________ def get_entries(inpath: str) -> int | None: ''' Retrieves number of entries in the "events" TTree from the provided ROOT @@ -196,6 +198,7 @@ def get_process_info_yaml(process_name: str, return filelist, eventlist +# _____________________________________________________________________________ def get_process_dict(proc_dict_location: str) -> dict: ''' Pick up the dictionary with process information @@ -241,6 +244,7 @@ def get_process_dict(proc_dict_location: str) -> dict: return proc_dict +# _____________________________________________________________________________ def get_process_dict_dirs() -> list[str]: ''' Get search directories for the process dictionaries @@ -255,3 +259,48 @@ def get_process_dict_dirs() -> list[str]: dirs[:] = [d for d in dirs if d] return dirs + + +# _____________________________________________________________________________ +def get_subfile_list(in_file_list: list[str], + event_list: list[int], + fraction: float) -> list[str]: + ''' + Obtain list of files roughly containing the requested fraction of events. + ''' + nevts_total: int = sum(event_list) + nevts_target: int = int(nevts_total * fraction) + + if nevts_target <= 0: + LOGGER.error('The reduction fraction %f too stringent, no events ' + 'left!\nAborting...', fraction) + sys.exit(3) + + nevts_real: int = 0 + out_file_list: list[str] = [] + for i, nevts in enumerate(event_list): + if nevts_real >= nevts_target: + break + nevts_real += nevts + out_file_list.append(in_file_list[i]) + + info_msg = f'Reducing the input file list by fraction "{fraction}" of ' + info_msg += 'total events:\n\t' + info_msg += f'- total number of events: {nevts_total:,}\n\t' + info_msg += f'- targeted number of events: {nevts_target:,}\n\t' + info_msg += '- number of events in the resulting file list: ' + info_msg += f'{nevts_real:,}\n\t' + info_msg += '- number of files after reduction: ' + info_msg += str((len(out_file_list))) + LOGGER.info(info_msg) + + return out_file_list + + +# _____________________________________________________________________________ +def get_chunk_list(file_list: str, chunks: int): + ''' + Get list of input file paths arranged into chunks. + ''' + chunk_list = list(np.array_split(file_list, chunks)) + return [chunk for chunk in chunk_list if chunk.size > 0] diff --git a/python/run_analysis.py b/python/run_analysis.py index 52e3a9b6605..6dc7b10a4b9 100644 --- a/python/run_analysis.py +++ b/python/run_analysis.py @@ -5,247 +5,21 @@ import os import sys import time -import shutil -import json import logging -import subprocess import importlib.util -import datetime -import numpy as np import ROOT # type: ignore import cppyy from anascript import get_element, get_element_dict from process import get_process_info, get_process_dict -from frame import generate_graph +from process import get_subfile_list, get_chunk_list +from utils import generate_graph, save_benchmark +from run_fccanalysis import run_fccanalysis -LOGGER = logging.getLogger('FCCAnalyses.run') ROOT.gROOT.SetBatch(True) - -# _____________________________________________________________________________ -def determine_os(local_dir: str) -> str | None: - ''' - Determines platform on which FCCAnalyses was compiled - ''' - cmake_config_path = local_dir + '/build/CMakeFiles/CMakeConfigureLog.yaml' - if not os.path.isfile(cmake_config_path): - LOGGER.warning('CMake configuration file was not found!\n' - 'Was FCCAnalyses properly build?') - return None - - with open(cmake_config_path, 'r', encoding='utf-8') as cmake_config_file: - cmake_config = cmake_config_file.read() - if 'centos7' in cmake_config: - return 'centos7' - if 'almalinux9' in cmake_config: - return 'almalinux9' - - return None - - -# _____________________________________________________________________________ -def create_condor_config(log_dir: str, - process_name: str, - build_os: str | None, - rdf_module, - subjob_scripts: list[str]) -> str: - ''' - Creates contents of condor configuration file. - ''' - cfg = 'executable = $(filename)\n' - - cfg += f'Log = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).log\n' - - cfg += f'Output = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).out\n' - - cfg += f'Error = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).error\n' - - cfg += 'getenv = False\n' - - cfg += 'environment = "LS_SUBCWD={log_dir}"\n' # not sure - - cfg += 'requirements = ( ' - if build_os == 'centos7': - cfg += '(OpSysAndVer =?= "CentOS7") && ' - if build_os == 'almalinux9': - cfg += '(OpSysAndVer =?= "AlmaLinux9") && ' - if build_os is None: - LOGGER.warning('Submitting jobs to default operating system. There ' - 'may be compatibility issues.') - cfg += '(Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n' - - cfg += 'on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n' - - cfg += 'max_retries = 3\n' - - cfg += '+JobFlavour = "%s"\n' % get_element(rdf_module, 'batchQueue') - - cfg += '+AccountingGroup = "%s"\n' % get_element(rdf_module, 'compGroup') - - cfg += 'RequestCpus = %i\n' % get_element(rdf_module, "nCPUS") - - cfg += 'queue filename matching files' - for script in subjob_scripts: - cfg += ' ' + script - cfg += '\n' - - return cfg - - -# _____________________________________________________________________________ -def create_subjob_script(local_dir: str, - rdf_module, - process_name: str, - chunk_num: int, - chunk_list: list[list[str]], - anapath: str) -> str: - ''' - Creates sub-job script to be run. - ''' - - output_dir = get_element(rdf_module, "outputDir") - output_dir_eos = get_element(rdf_module, "outputDirEos") - eos_type = get_element(rdf_module, "eosType") - user_batch_config = get_element(rdf_module, "userBatchConfig") - - scr = '#!/bin/bash\n\n' - scr += 'source ' + local_dir + '/setup.sh\n\n' - - # add userBatchConfig if any - if user_batch_config != '': - if not os.path.isfile(user_batch_config): - LOGGER.warning('userBatchConfig file can\'t be found! Will not ' - 'add it to the default config.') - else: - with open(user_batch_config, 'r', encoding='utf-8') as cfgfile: - for line in cfgfile: - scr += line + '\n' - scr += '\n\n' - - scr += f'mkdir job_{process_name}_chunk_{chunk_num}\n' - scr += f'cd job_{process_name}_chunk_{chunk_num}\n\n' - - if not os.path.isabs(output_dir): - output_path = os.path.join(output_dir, f'chunk_{chunk_num}.root') - else: - output_path = os.path.join(output_dir, process_name, - f'chunk_{chunk_num}.root') - - scr += local_dir - scr += f'/bin/fccanalysis run {anapath} --batch ' - scr += f'--output {output_path} ' - scr += '--files-list' - for file_path in chunk_list[chunk_num]: - scr += f' {file_path}' - scr += '\n\n' - - if not os.path.isabs(output_dir) and output_dir_eos == '': - final_dest = os.path.join(local_dir, output_dir, process_name, - f'chunk_{chunk_num}.root') - scr += f'cp {output_path} {final_dest}\n' - - if output_dir_eos != '': - final_dest = os.path.join(output_dir_eos, - process_name, - f'chunk_{chunk_num}.root') - final_dest = f'root://{eos_type}.cern.ch/' + final_dest - scr += f'xrdcp {output_path} {final_dest}\n' - - return scr - - -# _____________________________________________________________________________ -def get_subfile_list(in_file_list: list[str], - event_list: list[int], - fraction: float) -> list[str]: - ''' - Obtain list of files roughly containing the requested fraction of events. - ''' - nevts_total: int = sum(event_list) - nevts_target: int = int(nevts_total * fraction) - - if nevts_target <= 0: - LOGGER.error('The reduction fraction %f too stringent, no events ' - 'left!\nAborting...', fraction) - sys.exit(3) - - nevts_real: int = 0 - out_file_list: list[str] = [] - for i, nevts in enumerate(event_list): - if nevts_real >= nevts_target: - break - nevts_real += nevts - out_file_list.append(in_file_list[i]) - - info_msg = f'Reducing the input file list by fraction "{fraction}" of ' - info_msg += 'total events:\n\t' - info_msg += f'- total number of events: {nevts_total:,}\n\t' - info_msg += f'- targeted number of events: {nevts_target:,}\n\t' - info_msg += '- number of events in the resulting file list: ' - info_msg += f'{nevts_real:,}\n\t' - info_msg += '- number of files after reduction: ' - info_msg += str((len(out_file_list))) - LOGGER.info(info_msg) - - return out_file_list - - -# _____________________________________________________________________________ -def get_chunk_list(file_list: str, chunks: int): - ''' - Get list of input file paths arranged into chunks. - ''' - chunk_list = list(np.array_split(file_list, chunks)) - return [chunk for chunk in chunk_list if chunk.size > 0] - - -# _____________________________________________________________________________ -def save_benchmark(outfile, benchmark): - ''' - Save benchmark results to a JSON file. - ''' - benchmarks = [] - try: - with open(outfile, 'r', encoding='utf-8') as benchin: - benchmarks = json.load(benchin) - except OSError: - pass - - benchmarks = [b for b in benchmarks if b['name'] != benchmark['name']] - benchmarks.append(benchmark) - - with open(outfile, 'w', encoding='utf-8') as benchout: - json.dump(benchmarks, benchout, indent=2) - - -# _____________________________________________________________________________ -def submit_job(cmd: str, max_trials: int) -> bool: - ''' - Submit job to condor, retry `max_trials` times. - ''' - for i in range(max_trials): - with subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) as proc: - (stdout, stderr) = proc.communicate() - - if proc.returncode == 0 and len(stderr) == 0: - LOGGER.info(stdout) - LOGGER.info('GOOD SUBMISSION') - return True - - LOGGER.warning('Error while submitting, retrying...\n ' - 'Trial: %i / %i\n Error: %s', - i, max_trials, stderr) - time.sleep(10) - - LOGGER.error('Failed submitting after: %i trials!', max_trials) - return False +LOGGER = logging.getLogger('FCCAnalyses.run') # _____________________________________________________________________________ @@ -254,6 +28,13 @@ def initialize(args, rdf_module, anapath: str): Common initialization steps. ''' + # Put runBatch deprecation warning + if hasattr(rdf_module, 'runBatch'): + if rdf_module.runBatch: + LOGGER.error('runBatch script attribute is no longer supported, ' + 'use "fccanalysis submit" instead!\nAborting...') + sys.exit(3) + # for convenience and compatibility with user code if args.use_data_source: ROOT.gInterpreter.Declare("using namespace FCCAnalyses::PodioSource;") @@ -314,7 +95,7 @@ def initialize(args, rdf_module, anapath: str): # _____________________________________________________________________________ def run_rdf(rdf_module, input_list: list[str], - out_file: str, + outfile_path: str, args) -> int: ''' Create RDataFrame and snapshot it. @@ -343,7 +124,7 @@ def run_rdf(rdf_module, if args.graph: generate_graph(dframe, args) - dframe3.Snapshot("events", out_file, branch_list) + dframe3.Snapshot("events", outfile_path, branch_list) except cppyy.gbl.std.runtime_error as err: LOGGER.error('%s\nDuring the execution of the analysis script an ' 'exception occurred!\nAborting...', err) @@ -352,86 +133,6 @@ def run_rdf(rdf_module, return evtcount_init.GetValue(), evtcount_final.GetValue() -# _____________________________________________________________________________ -def send_to_batch(rdf_module, chunk_list, process, anapath: str): - ''' - Send jobs to HTCondor batch system. - ''' - local_dir = os.environ['LOCAL_DIR'] - current_date = datetime.datetime.fromtimestamp( - datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') - log_dir = os.path.join(local_dir, 'BatchOutputs', current_date, process) - if not os.path.exists(log_dir): - os.system(f'mkdir -p {log_dir}') - - # Making sure the FCCAnalyses libraries are compiled and installed - try: - subprocess.check_output(['make', 'install'], - cwd=local_dir+'/build', - stderr=subprocess.DEVNULL - ) - except subprocess.CalledProcessError: - LOGGER.error('The FCCAnalyses libraries are not properly build and ' - 'installed!\nAborting job submission...') - sys.exit(3) - - subjob_scripts = [] - for ch in range(len(chunk_list)): - subjob_script_path = os.path.join(log_dir, - f'job_{process}_chunk_{ch}.sh') - subjob_scripts.append(subjob_script_path) - - for i in range(3): - try: - with open(subjob_script_path, 'w', encoding='utf-8') as ofile: - subjob_script = create_subjob_script(local_dir, - rdf_module, - process, - ch, - chunk_list, - anapath) - ofile.write(subjob_script) - except IOError as e: - if i < 2: - LOGGER.warning('I/O error(%i): %s', e.errno, e.strerror) - else: - LOGGER.error('I/O error(%i): %s', e.errno, e.strerror) - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {subjob_script_path}') - - LOGGER.debug('Sub-job scripts to be run:\n - %s', - '\n - '.join(subjob_scripts)) - - condor_config_path = f'{log_dir}/job_desc_{process}.cfg' - - for i in range(3): - try: - with open(condor_config_path, 'w', encoding='utf-8') as cfgfile: - condor_config = create_condor_config(log_dir, - process, - determine_os(local_dir), - rdf_module, - subjob_scripts) - cfgfile.write(condor_config) - except IOError as e: - LOGGER.warning('I/O error(%i): %s', e.errno, e.strerror) - if i == 2: - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {condor_config_path}') - - batch_cmd = f'condor_submit {condor_config_path}' - LOGGER.info('Batch command:\n %s', batch_cmd) - success = submit_job(batch_cmd, 10) - if not success: - sys.exit(3) - - # _____________________________________________________________________________ def apply_filepath_rewrites(filepath: str) -> str: ''' @@ -512,14 +213,7 @@ def run_local(rdf_module, infile_list, args): else: LOGGER.info('Number of local events: %s', f'{nevents_local:,}') - output_dir = get_element(rdf_module, "outputDir") - if not args.batch: - if os.path.isabs(args.output): - LOGGER.warning('Provided output path is absolute, "outputDir" ' - 'from analysis script will be ignored!') - outfile_path = os.path.join(output_dir, args.output) - else: - outfile_path = args.output + outfile_path = args.output LOGGER.info('Output file path:\n%s', outfile_path) # Run RDF @@ -594,7 +288,7 @@ def run_stages(args, rdf_module, anapath): if not os.path.exists(output_dir) and output_dir: os.system(f'mkdir -p {output_dir}') - # Check if outputDir exist and if not create it + # Check if EOS outputDir exist and if not create it output_dir_eos = get_element(rdf_module, "outputDirEos") if not os.path.exists(output_dir_eos) and output_dir_eos: os.system(f'mkdir -p {output_dir_eos}') @@ -620,12 +314,6 @@ def run_stages(args, rdf_module, anapath): run_local(rdf_module, args.files_list, args) sys.exit(0) - # Check if batch mode is available - run_batch = get_element(rdf_module, 'runBatch') - if run_batch and shutil.which('condor_q') is None: - LOGGER.error('HTCondor tools can\'t be found!\nAborting...') - sys.exit(3) - # Check if the process list is specified process_list = get_element(rdf_module, 'processList') @@ -640,24 +328,37 @@ def run_stages(args, rdf_module, anapath): sys.exit(3) # Determine the fraction of the input to be processed - fraction = 1 + fraction = 1. if get_element_dict(process_list[process_name], 'fraction'): fraction = get_element_dict(process_list[process_name], 'fraction') + + # Determine the number of chunks the output will be split into + chunks = 1 + if get_element_dict(process_list[process_name], 'chunks'): + chunks = get_element_dict(process_list[process_name], 'chunks') + # Put together output path output_stem = process_name if get_element_dict(process_list[process_name], 'output'): output_stem = get_element_dict(process_list[process_name], 'output') - # Determine the number of chunks the output will be split into - chunks = 1 - if get_element_dict(process_list[process_name], 'chunks'): - chunks = get_element_dict(process_list[process_name], 'chunks') + output_dir = get_attribute(rdf_module, 'outputDir', '') + + if chunks == 1: + output_filepath = os.path.join(output_dir, output_stem+'.root') + output_dir = None + else: + output_filepath = None + output_dir = os.path.join(output_dir, output_stem) info_msg = f'Adding process "{process_name}" with:' if fraction < 1: info_msg += f'\n\t- fraction: {fraction}' info_msg += f'\n\t- number of files: {len(file_list):,}' - info_msg += f'\n\t- output stem: {output_stem}' + if output_dir: + info_msg += f'\n\t- output directory: {output_dir}' + if output_filepath: + info_msg += f'\n\t- output file path: {output_dir}' if chunks > 1: info_msg += f'\n\t- number of chunks: {chunks}' @@ -676,25 +377,15 @@ def run_stages(args, rdf_module, anapath): if not os.path.exists(output_directory): os.system(f'mkdir -p {output_directory}') - if run_batch: - # Sending to the batch system - LOGGER.info('Running on the batch...') - if len(chunk_list) == 1: - LOGGER.warning('\033[4m\033[1m\033[91mRunning on batch with ' - 'only one chunk might not be optimal\033[0m') - - send_to_batch(rdf_module, chunk_list, process_name, anapath) - + # Running locally + LOGGER.info('Running locally...') + if len(chunk_list) == 1: + args.output = output_filepath + run_local(rdf_module, chunk_list[0], args) else: - # Running locally - LOGGER.info('Running locally...') - if len(chunk_list) == 1: - args.output = f'{output_stem}.root' - run_local(rdf_module, chunk_list[0], args) - else: - for index, chunk in enumerate(chunk_list): - args.output = f'{output_stem}/chunk{index}.root' - run_local(rdf_module, chunk, args) + for index, chunk in enumerate(chunk_list): + args.output = os.path.join(output_dir, f'chunk{index}.root') + run_local(rdf_module, chunk, args) def run_histmaker(args, rdf_module, anapath): @@ -925,11 +616,26 @@ def run(parser): Set things in motion. ''' - args, unknown_args = parser.parse_known_args() - # Add unknown arguments including unknown input files - unknown_args += [x for x in args.files_list if not x.endswith('.root')] - args.unknown = unknown_args - args.files_list = [x for x in args.files_list if x.endswith('.root')] + try: + dash_dash_index = sys.argv.index('--') + print(sys.argv[1:dash_dash_index]) + args = parser.parse_args(sys.argv[1:dash_dash_index]) + print(sys.argv[dash_dash_index+1:]) + args.remaining = sys.argv[dash_dash_index+1:] + except ValueError: + args = parser.parse_args() + args.remaining = [] + # print(dash_dash_index) + # print(sys.argv[:dash_dash_index]) + # args = parser.parse_args() + # print(args) + # if len(args.remaining) > 0: + # try: + # print(args.remaining.index('--')) + # dash_dash_index = args.remaining.index('--') + # except ValueError: + # args = parser.parse_args(args.remaining) + print(args) if not hasattr(args, 'command'): LOGGER.error('Error occurred during subcommand routing!\nAborting...') @@ -943,7 +649,6 @@ def run(parser): anapath = os.path.abspath(args.anascript_path) # Check that the analysis file exists - anapath = args.anascript_path if not os.path.isfile(anapath): LOGGER.error('Analysis script %s not found!\nAborting...', anapath) @@ -1020,7 +725,6 @@ def run(parser): sys.exit(3) if hasattr(rdf_module, "Analysis"): - from run_fccanalysis import run_fccanalysis run_fccanalysis(args, rdf_module) if hasattr(rdf_module, "RDFanalysis"): run_stages(args, rdf_module, anapath) diff --git a/python/run_fccanalysis.py b/python/run_fccanalysis.py index 97dd0d73ac8..f5fb621ab3b 100644 --- a/python/run_fccanalysis.py +++ b/python/run_fccanalysis.py @@ -5,259 +5,36 @@ import os import sys import time -import shutil -import json import logging -import subprocess -import datetime -import numpy as np +import argparse +from typing import Any import ROOT # type: ignore -from anascript import get_element, get_element_dict, get_attribute +from anascript import get_element_dict, get_attribute from process import get_process_info, get_entries_sow -from frame import generate_graph +from process import get_subfile_list, get_chunk_list +from utils import generate_graph, save_benchmark -LOGGER = logging.getLogger('FCCAnalyses.run') ROOT.gROOT.SetBatch(True) - -# _____________________________________________________________________________ -def determine_os(local_dir: str) -> str | None: - ''' - Determines platform on which FCCAnalyses was compiled - ''' - cmake_config_path = local_dir + '/build/CMakeFiles/CMakeConfigureLog.yaml' - if not os.path.isfile(cmake_config_path): - LOGGER.warning('CMake configuration file was not found!\n' - 'Was FCCAnalyses properly build?') - return None - - with open(cmake_config_path, 'r', encoding='utf-8') as cmake_config_file: - cmake_config = cmake_config_file.read() - if 'centos7' in cmake_config: - return 'centos7' - if 'almalinux9' in cmake_config: - return 'almalinux9' - - return None - - -# _____________________________________________________________________________ -def create_condor_config(log_dir: str, - process_name: str, - build_os: str | None, - rdf_module, - subjob_scripts: list[str]) -> str: - ''' - Creates contents of condor configuration file. - ''' - cfg = 'executable = $(filename)\n' - - cfg += f'Log = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).log\n' - - cfg += f'Output = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).out\n' - - cfg += f'Error = {log_dir}/condor_job.{process_name}.' - cfg += '$(ClusterId).$(ProcId).error\n' - - cfg += 'getenv = False\n' - - cfg += 'environment = "LS_SUBCWD={log_dir}"\n' # not sure - - cfg += 'requirements = ( ' - if build_os == 'centos7': - cfg += '(OpSysAndVer =?= "CentOS7") && ' - if build_os == 'almalinux9': - cfg += '(OpSysAndVer =?= "AlmaLinux9") && ' - if build_os is None: - LOGGER.warning('Submitting jobs to default operating system. There ' - 'may be compatibility issues.') - cfg += '(Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n' - - cfg += 'on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n' - - cfg += 'max_retries = 3\n' - - cfg += '+JobFlavour = "%s"\n' % get_element(rdf_module, 'batchQueue') - - cfg += '+AccountingGroup = "%s"\n' % get_element(rdf_module, 'compGroup') - - cfg += 'RequestCpus = %i\n' % get_element(rdf_module, "nCPUS") - - cfg += 'queue filename matching files' - for script in subjob_scripts: - cfg += ' ' + script - cfg += '\n' - - return cfg - - -# _____________________________________________________________________________ -def create_subjob_script(local_dir: str, - analysis, - process_name: str, - chunk_num: int, - chunk_list: list[list[str]], - anapath: str, - cmd_args) -> str: - ''' - Creates sub-job script to be run. - ''' - - output_dir = get_attribute(analysis, 'output_dir', None) - - scr = '#!/bin/bash\n\n' - scr += 'source ' + local_dir + '/setup.sh\n\n' - - # add user batch configuration if any - user_batch_config = get_attribute(analysis, 'user_batch_config', None) - if user_batch_config is not None: - if not os.path.isfile(user_batch_config): - LOGGER.warning('userBatchConfig file can\'t be found! Will not ' - 'add it to the default config.') - else: - with open(user_batch_config, 'r', encoding='utf-8') as cfgfile: - for line in cfgfile: - scr += line + '\n' - scr += '\n\n' - - scr += f'mkdir job_{process_name}_chunk_{chunk_num}\n' - scr += f'cd job_{process_name}_chunk_{chunk_num}\n\n' - - if not os.path.isabs(output_dir): - output_path = os.path.join(output_dir, f'chunk_{chunk_num}.root') - else: - output_path = os.path.join(output_dir, process_name, - f'chunk_{chunk_num}.root') - - scr += local_dir - scr += f'/bin/fccanalysis run {anapath} --batch' - scr += f' --output {output_path}' - if cmd_args.ncpus > 0: - scr += f' --ncpus {cmd_args.ncpus}' - if len(cmd_args.unknown) > 0: - scr += ' ' + ' '.join(cmd_args.unknown) - scr += ' --files-list' - for file_path in chunk_list[chunk_num]: - scr += f' {file_path}' - scr += '\n\n' - - output_dir_eos = get_attribute(analysis, 'output_dir_eos', None) - if not os.path.isabs(output_dir) and output_dir_eos is None: - final_dest = os.path.join(local_dir, output_dir, process_name, - f'chunk_{chunk_num}.root') - scr += f'cp {output_path} {final_dest}\n' - - if output_dir_eos is not None: - eos_type = get_attribute(analysis, 'eos_type', 'eospublic') - - final_dest = os.path.join(output_dir_eos, - process_name, - f'chunk_{chunk_num}.root') - final_dest = f'root://{eos_type}.cern.ch/' + final_dest - scr += f'xrdcp {output_path} {final_dest}\n' - - return scr - - -# _____________________________________________________________________________ -def get_subfile_list(in_file_list: list[str], - event_list: list[int], - fraction: float) -> list[str]: - ''' - Obtain list of files roughly containing the requested fraction of events. - ''' - nevts_total: int = sum(event_list) - nevts_target: int = int(nevts_total * fraction) - - if nevts_target <= 0: - LOGGER.error('The reduction fraction %f too stringent, no events ' - 'left!\nAborting...', fraction) - sys.exit(3) - - nevts_real: int = 0 - out_file_list: list[str] = [] - for i, nevts in enumerate(event_list): - if nevts_real >= nevts_target: - break - nevts_real += nevts - out_file_list.append(in_file_list[i]) - - info_msg = f'Reducing the input file list by fraction "{fraction}" of ' - info_msg += 'total events:\n\t' - info_msg += f'- total number of events: {nevts_total:,}\n\t' - info_msg += f'- targeted number of events: {nevts_target:,}\n\t' - info_msg += '- number of events in the resulting file list: ' - info_msg += f'{nevts_real:,}\n\t' - info_msg += '- number of files after reduction: ' - info_msg += str((len(out_file_list))) - LOGGER.info(info_msg) - - return out_file_list - - -# _____________________________________________________________________________ -def get_chunk_list(file_list: str, chunks: int): - ''' - Get list of input file paths arranged into chunks. - ''' - chunk_list = list(np.array_split(file_list, chunks)) - return [chunk for chunk in chunk_list if chunk.size > 0] - - -# _____________________________________________________________________________ -def save_benchmark(outfile, benchmark): - ''' - Save benchmark results to a JSON file. - ''' - benchmarks = [] - try: - with open(outfile, 'r', encoding='utf-8') as benchin: - benchmarks = json.load(benchin) - except OSError: - pass - - benchmarks = [b for b in benchmarks if b['name'] != benchmark['name']] - benchmarks.append(benchmark) - - with open(outfile, 'w', encoding='utf-8') as benchout: - json.dump(benchmarks, benchout, indent=2) - - -# _____________________________________________________________________________ -def submit_job(cmd: str, max_trials: int) -> bool: - ''' - Submit job to condor, retry `max_trials` times. - ''' - for i in range(max_trials): - with subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) as proc: - (stdout, stderr) = proc.communicate() - - if proc.returncode == 0 and len(stderr) == 0: - LOGGER.info(stdout) - LOGGER.info('GOOD SUBMISSION') - return True - - LOGGER.warning('Error while submitting, retrying...\n ' - 'Trial: %i / %i\n Error: %s', - i, max_trials, stderr) - time.sleep(10) - - LOGGER.error('Failed submitting after: %i trials!', max_trials) - return False +LOGGER = logging.getLogger('FCCAnalyses.run') # _____________________________________________________________________________ -def merge_config(args: object, analysis: object) -> dict[str, any]: +def merge_config(args: argparse.Namespace, analysis: Any) -> dict[str, Any]: ''' Merge configuration from command line arguments and analysis class. ''' - config: dict[str, any] = {} + config: dict[str, Any] = {} + + # Put runBatch deprecation warning + if hasattr(analysis, 'run_batch'): + if analysis.run_batch: + LOGGER.error('run_batch analysis attribute is no longer ' + 'supported, use "fccanalysis submit" instead!\n' + 'Aborting...') + sys.exit(3) # Check whether to use PODIO DataSource to load the events config['use_data_source'] = False @@ -265,11 +42,16 @@ def merge_config(args: object, analysis: object) -> dict[str, any]: config['use_data_source'] = True if get_attribute(analysis, 'use_data_source', False): config['use_data_source'] = True - # Check whether to use event weights (only supported as analysis config file option, not command line!) + # Check whether to use event weights (only supported as analysis config + # file option, not command line!) config['do_weighted'] = False if get_attribute(analysis, 'do_weighted', False): config['do_weighted'] = True + # Check the output path + # config['output_file_path'] = None + # if args.output + return config @@ -325,7 +107,7 @@ def initialize(config, args, analysis): # _____________________________________________________________________________ -def run_rdf(config: dict[str, any], +def run_rdf(config: dict[str, Any], args, analysis, input_list: list[str], @@ -388,90 +170,6 @@ def run_rdf(config: dict[str, any], return evtcount_init.GetValue(), evtcount_final.GetValue(), sow_init.GetValue(), sow_final.GetValue() -# _____________________________________________________________________________ -def send_to_batch(args, analysis, chunk_list, sample_name, anapath: str): - ''' - Send jobs to HTCondor batch system. - ''' - local_dir = os.environ['LOCAL_DIR'] - current_date = datetime.datetime.fromtimestamp( - datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') - log_dir = os.path.join(local_dir, 'BatchOutputs', current_date, - sample_name) - if not os.path.exists(log_dir): - os.system(f'mkdir -p {log_dir}') - - # Making sure the FCCAnalyses libraries are compiled and installed - try: - subprocess.check_output(['make', 'install'], - cwd=local_dir+'/build', - stderr=subprocess.DEVNULL - ) - except subprocess.CalledProcessError: - LOGGER.error('The FCCanalyses libraries are not properly build and ' - 'installed!\nAborting job submission...') - sys.exit(3) - - subjob_scripts = [] - for ch_num in range(len(chunk_list)): - subjob_script_path = os.path.join( - log_dir, - f'job_{sample_name}_chunk_{ch_num}.sh') - subjob_scripts.append(subjob_script_path) - - for i in range(3): - try: - with open(subjob_script_path, 'w', encoding='utf-8') as ofile: - subjob_script = create_subjob_script(local_dir, - analysis, - sample_name, - ch_num, - chunk_list, - anapath, - args) - ofile.write(subjob_script) - except IOError as err: - if i < 2: - LOGGER.warning('I/O error(%i): %s', - err.errno, err.strerror) - else: - LOGGER.error('I/O error(%i): %s', err.errno, err.strerror) - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {subjob_script_path}') - - LOGGER.debug('Sub-job scripts to be run:\n - %s', - '\n - '.join(subjob_scripts)) - - condor_config_path = f'{log_dir}/job_desc_{sample_name}.cfg' - - for i in range(3): - try: - with open(condor_config_path, 'w', encoding='utf-8') as cfgfile: - condor_config = create_condor_config(log_dir, - sample_name, - determine_os(local_dir), - analysis, - subjob_scripts) - cfgfile.write(condor_config) - except IOError as err: - LOGGER.warning('I/O error(%i): %s', err.errno, err.strerror) - if i == 2: - sys.exit(3) - else: - break - time.sleep(10) - subprocess.getstatusoutput(f'chmod 777 {condor_config_path}') - - batch_cmd = f'condor_submit {condor_config_path}' - LOGGER.info('Batch command:\n %s', batch_cmd) - success = submit_job(batch_cmd, 10) - if not success: - sys.exit(3) - - # _____________________________________________________________________________ def apply_filepath_rewrites(filepath: str) -> str: ''' @@ -501,7 +199,7 @@ def apply_filepath_rewrites(filepath: str) -> str: # _____________________________________________________________________________ -def run_local(config: dict[str, any], +def run_local(config: dict[str, Any], args: object, analysis: object, infile_list): @@ -531,11 +229,13 @@ def run_local(config: dict[str, any], info_msg += f'- {filepath}\t\n' if config['do_weighted']: - # Adjust number of events in case --nevents was specified + # Adjust number of events in case --nevents was specified if args.nevents > 0: - nevts_param, nevts_tree, sow_param, sow_tree = get_entries_sow(filepath, args.nevents) + nevts_param, nevts_tree, sow_param, sow_tree = \ + get_entries_sow(filepath, args.nevents) else: - nevts_param, nevts_tree, sow_param, sow_tree = get_entries_sow(filepath) + nevts_param, nevts_tree, sow_param, sow_tree = \ + get_entries_sow(filepath) nevents_orig += nevts_param nevents_local += nevts_tree @@ -558,14 +258,12 @@ def run_local(config: dict[str, any], sys.exit(3) infile.Close() - # Adjust number of events in case --nevents was specified + # Adjust number of events in case --nevents was specified if args.nevents > 0 and args.nevents < nevents_local: nevents_local = args.nevents - LOGGER.info(info_msg) - if nevents_orig > 0: LOGGER.info('Number of events:\n\t- original: %s\n\t- local: %s', f'{nevents_orig:,}', f'{nevents_local:,}') @@ -577,20 +275,13 @@ def run_local(config: dict[str, any], if config['do_weighted']: LOGGER.info('Local sum of weights: %s', f'{sow_local:0,.2f}') - - output_dir = get_attribute(analysis, 'output_dir', '') - if not args.batch: - if os.path.isabs(args.output): - LOGGER.warning('Provided output path is absolute, "outputDir" ' - 'from analysis script will be ignored!') - outfile_path = os.path.join(output_dir, args.output) - else: - outfile_path = args.output + outfile_path = args.output LOGGER.info('Output file path:\n%s', outfile_path) # Run RDF start_time = time.time() - inn, outn, in_sow, out_sow = run_rdf(config, args, analysis, file_list, outfile_path) + inn, outn, in_sow, out_sow = run_rdf(config, args, analysis, file_list, + outfile_path) elapsed_time = time.time() - start_time # replace nevents_local by inn = the amount of processed events @@ -625,15 +316,16 @@ def run_local(config: dict[str, any], 'eventsProcessed', nevents_orig if nevents_orig != 0 else inn) param.Write() - param = ROOT.TParameter(int)('eventsSelected', outn) + param = ROOT.TParameter(int)('eventsSelected', outn) param.Write() if config['do_weighted']: - param_sow = ROOT.TParameter(float)( - 'SumOfWeights', - sow_orig if sow_orig != 0 else in_sow ) + param_sow = ROOT.TParameter(float)( + 'SumOfWeights', + sow_orig if sow_orig != 0 else in_sow) param_sow.Write() - param_sow = ROOT.TParameter(float)('SumOfWeightsSelected', out_sow) # No of weighted, selected events + # No of weighted, selected events + param_sow = ROOT.TParameter(float)('SumOfWeightsSelected', out_sow) param_sow.Write() outfile.Write() @@ -667,11 +359,10 @@ def run_fccanalysis(args, analysis_module): ''' # Get analysis class out of the module - analysis_args = vars(args) - analysis = analysis_module.Analysis(analysis_args) + analysis = analysis_module.Analysis(vars(args)) # Merge configuration from command line arguments and analysis class - config: dict[str, any] = merge_config(args, analysis) + config: dict[str, Any] = merge_config(args, analysis) # Set number of threads, load header files, custom dicts, ... initialize(config, args, analysis_module) @@ -681,7 +372,7 @@ def run_fccanalysis(args, analysis_module): if output_dir is not None and not os.path.exists(output_dir): os.system(f'mkdir -p {output_dir}') - # Check if eos output directory exist and if not create it + # Check if EOS output directory exist and if not create it output_dir_eos = get_attribute(analysis, 'output_dir_eos', None) if output_dir_eos is not None and not os.path.exists(output_dir_eos): os.system(f'mkdir -p {output_dir_eos}') @@ -690,7 +381,7 @@ def run_fccanalysis(args, analysis_module): LOGGER.info('Using generator weights') # Check if test mode is specified, and if so run the analysis on it (this - # will exit after) + # will exit afterwards) if args.test: LOGGER.info('Running over test file...') testfile_path = getattr(analysis, "test_file") @@ -700,8 +391,8 @@ def run_fccanalysis(args, analysis_module): run_local(config, args, analysis, [testfile_path]) sys.exit(0) - # Check if files are specified, and if so run the analysis on it/them (this - # will exit after) + # Check if input file(s) are specified, and if so run the analysis on + # it/them (this will exit afterwards) if len(args.files_list) > 0: LOGGER.info('Running over files provided in command line argument...') directory, _ = os.path.split(args.output) @@ -710,12 +401,6 @@ def run_fccanalysis(args, analysis_module): run_local(config, args, analysis, args.files_list) sys.exit(0) - # Check if batch mode is available - run_batch = get_attribute(analysis, 'run_batch', False) - if run_batch and shutil.which('condor_q') is None: - LOGGER.error('HTCondor tools can\'t be found!\nAborting...') - sys.exit(3) - # Check if the process list is specified process_list = get_attribute(analysis, 'process_list', []) @@ -728,8 +413,6 @@ def run_fccanalysis(args, analysis_module): 'analysis script!\nAborting...') sys.exit(3) - - for process_name in process_list: LOGGER.info('Started processing sample "%s" ...', process_name) file_list, event_list = get_process_info(process_name, @@ -741,24 +424,37 @@ def run_fccanalysis(args, analysis_module): sys.exit(3) # Determine the fraction of the input to be processed - fraction = 1 + fraction = 1. if get_element_dict(process_list[process_name], 'fraction'): fraction = get_element_dict(process_list[process_name], 'fraction') + + # Determine the number of chunks the output will be split into + chunks = 1 + if get_element_dict(process_list[process_name], 'chunks'): + chunks = get_element_dict(process_list[process_name], 'chunks') + # Put together output path output_stem = process_name if get_element_dict(process_list[process_name], 'output'): output_stem = get_element_dict(process_list[process_name], 'output') - # Determine the number of chunks the output will be split into - chunks = 1 - if get_element_dict(process_list[process_name], 'chunks'): - chunks = get_element_dict(process_list[process_name], 'chunks') + output_dir = get_attribute(analysis, 'output_dir', '') + + if chunks == 1: + output_filepath = os.path.join(output_dir, output_stem+'.root') + output_dir = None + else: + output_filepath = None + output_dir = os.path.join(output_dir, output_stem) info_msg = f'Adding process "{process_name}" with:' if fraction < 1: info_msg += f'\n\t- fraction: {fraction}' info_msg += f'\n\t- number of files: {len(file_list):,}' - info_msg += f'\n\t- output stem: {output_stem}' + if output_dir: + info_msg += f'\n\t- output directory: {output_dir}' + if output_filepath: + info_msg += f'\n\t- output file path: {output_dir}' if chunks > 1: info_msg += f'\n\t- number of chunks: {chunks}' @@ -778,27 +474,15 @@ def run_fccanalysis(args, analysis_module): if not os.path.exists(output_directory): os.system(f'mkdir -p {output_directory}') - if run_batch: - # Sending to the batch system - LOGGER.info('Running on the batch...') - if len(chunk_list) == 1: - LOGGER.warning('\033[4m\033[1m\033[91mRunning on batch with ' - 'only one chunk might not be optimal\033[0m') - - anapath = os.path.abspath(args.anascript_path) - - send_to_batch(args, analysis, chunk_list, process_name, anapath) - + # Running locally + LOGGER.info('Running locally...') + if len(chunk_list) == 1: + args.output = f'{output_stem}.root' + run_local(config, args, analysis, chunk_list[0]) else: - # Running locally - LOGGER.info('Running locally...') - if len(chunk_list) == 1: - args.output = f'{output_stem}.root' - run_local(config, args, analysis, chunk_list[0]) - else: - for index, chunk in enumerate(chunk_list): - args.output = f'{output_stem}/chunk{index}.root' - run_local(config, args, analysis, chunk) + for index, chunk in enumerate(chunk_list): + args.output = f'{output_stem}/chunk{index}.root' + run_local(config, args, analysis, chunk) if len(process_list) == 0: LOGGER.warning('No files processed (process_list not found)!\n' diff --git a/python/run_final_analysis.py b/python/run_final_analysis.py index 8b8a90c40bf..ce6727df014 100644 --- a/python/run_final_analysis.py +++ b/python/run_final_analysis.py @@ -16,7 +16,7 @@ import cppyy from anascript import get_element, get_attribute from process import get_process_dict, get_entries_sow -from frame import generate_graph +from utils import generate_graph LOGGER = logging.getLogger('FCCAnalyses.run_final') diff --git a/python/submit.py b/python/submit.py new file mode 100644 index 00000000000..1b97ad7f350 --- /dev/null +++ b/python/submit.py @@ -0,0 +1,63 @@ +''' +Submit analysis to be run on remote machine(s). +''' + +import os +import sys +import logging +import importlib +import argparse +import shutil +from batch import send_to_batch + + +LOGGER = logging.getLogger('FCCAnalyses.submit') + + +# _____________________________________________________________________________ +def submit_analysis(parser: argparse.ArgumentParser) -> None: + ''' + Sub-command entry point. + ''' + + args = parser.parse_args() + + # Check to where the analysis will be submitted. + if args.where == 'ht-condor': + # Check if HTCondor is available + if shutil.which('condor_q') is None: + LOGGER.error('HTCondor tools can\'t be found!\nAborting...') + sys.exit(3) + LOGGER.info('Submitting analysis to HTCondor...') + + elif args.where == 'slurm': + LOGGER.error('Submission to the Slurm is not yet implemented!\n' + 'Aborting...') + sys.exit(3) + elif args.where == 'grid': + LOGGER.error('Submission to the GRID is not yet implemented!\n' + 'Aborting...') + sys.exit(3) + + # Work with absolute path of the analysis script. + anapath = os.path.abspath(args.anascript_path) + LOGGER.info('Loading analysis script from:\n %s', anapath) + + # Check that the analysis file exists. + if not os.path.isfile(anapath): + LOGGER.error('Analysis script not found!\nAborting...') + sys.exit(3) + + # Load the analysis script as a module + try: + analysis_spec = importlib.util.spec_from_file_location('fccanalysis', + anapath) + analysis_module = importlib.util.module_from_spec(analysis_spec) + analysis_spec.loader.exec_module(analysis_module) + except SyntaxError as err: + LOGGER.error('Syntax error encountered in the analysis script:\n%s', + err) + sys.exit(3) + + if args.where == 'ht-condor': + send_to_batch(args, analysis_module) diff --git a/python/frame.py b/python/utils.py similarity index 75% rename from python/frame.py rename to python/utils.py index 78a58cfa8ab..08760905c47 100644 --- a/python/frame.py +++ b/python/utils.py @@ -1,5 +1,5 @@ ''' -RDataFrame helpers. +RDataFrame or other helpers. ''' import os @@ -11,7 +11,7 @@ ROOT.gROOT.SetBatch(True) -LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.frame') +LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.utils') # _____________________________________________________________________________ @@ -57,3 +57,22 @@ def generate_graph(dframe, args, suffix: str | None = None) -> None: # Convert .dot file into .png os.system(f'dot -Tpng {graph_path.with_suffix(".dot")} ' f'-o {graph_path.with_suffix(".png")}') + + +# _____________________________________________________________________________ +def save_benchmark(outfile, benchmark): + ''' + Save benchmark results to a JSON file. + ''' + benchmarks = [] + try: + with open(outfile, 'r', encoding='utf-8') as benchin: + benchmarks = json.load(benchin) + except OSError: + pass + + benchmarks = [b for b in benchmarks if b['name'] != benchmark['name']] + benchmarks.append(benchmark) + + with open(outfile, 'w', encoding='utf-8') as benchout: + json.dump(benchmarks, benchout, indent=2) diff --git a/python/frame.pyi b/python/utils.pyi similarity index 100% rename from python/frame.pyi rename to python/utils.pyi diff --git a/setup.sh b/setup.sh index 23262292292..d7384496b6c 100644 --- a/setup.sh +++ b/setup.sh @@ -2,13 +2,13 @@ if [ "${0}" != "${BASH_SOURCE}" ]; then # Determinig the location of this setup script export LOCAL_DIR=$(cd $(dirname "${BASH_SOURCE}") && pwd) - echo "----> Info: Setting up Key4hep stack..." + echo "----> INFO: Setting up Key4hep stack..." # Sourcing of the stack if [ -n "${KEY4HEP_STACK}" ]; then - echo "----> Info: Key4hep stack already set up. Skipping..." + echo "----> INFO: Key4hep stack already set up. Skipping..." elif [ -f "${LOCAL_DIR}/.fccana/stackpin" ]; then STACK_PATH=$(<${LOCAL_DIR}/.fccana/stackpin) - echo "----> Info: Sourcing pinned Key4hep stack..." + echo "----> INFO: Sourcing pinned Key4hep stack..." echo " ${STACK_PATH}" source ${STACK_PATH} else @@ -16,11 +16,11 @@ if [ "${0}" != "${BASH_SOURCE}" ]; then fi if [ -z "${KEY4HEP_STACK}" ]; then - echo "----> Error: Key4hep stack not setup correctly! Aborting..." + echo "----> ERROR: Key4hep stack not setup correctly! Aborting..." return 1 fi - echo "----> Info: Setting up environment variables..." + echo "----> INFO: Setting up environment variables..." export PYTHONPATH=${LOCAL_DIR}/python:${PYTHONPATH} export PYTHONPATH=${LOCAL_DIR}/install/python:${PYTHONPATH} export PYTHONPATH=${LOCAL_DIR}/install/share/examples:${PYTHONPATH} @@ -35,7 +35,7 @@ if [ "${0}" != "${BASH_SOURCE}" ]; then export ONNXRUNTIME_ROOT_DIR=`python -c "import onnxruntime; print(onnxruntime.__path__[0]+'/../../../..')" 2> /dev/null` if [ -z "${ONNXRUNTIME_ROOT_DIR}" ]; then - echo "----> Warning: ONNX Runtime not found! Related analyzers won't be build..." + echo "----> WARNING: ONNX Runtime not found! Related analyzers won't be build..." else export LD_LIBRARY_PATH=${ONNXRUNTIME_ROOT_DIR}/lib:${LD_LIBRARY_PATH} fi @@ -47,5 +47,5 @@ if [ "${0}" != "${BASH_SOURCE}" ]; then export FCCDICTSDIR=/cvmfs/fcc.cern.ch/FCCDicts:${FCCDICTSDIR} else - echo "----> Error: This script is meant to be sourced!" + echo "----> ERROR: This script is meant to be sourced!" fi