From acb2921d47de47826253fe8c81bd2a63b34b4628 Mon Sep 17 00:00:00 2001 From: Joseph Areeda Date: Sun, 16 Oct 2022 13:14:52 -0700 Subject: [PATCH] Check point merge (#147) * Do not cross metric day boundaries. * add log file arg delete empty directories when done * Tweak remove empty dir removal * Tweak remove empty dir removal again * Merge day boundary (#146) * Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this * Do not merge files if they overlap "metric days" * Do not cross metric day boundaries. Co-authored-by: Joseph Areeda * rebase agaist last approved PR * rebase against last approved PR * rebase against last approved PR again, fix flake8 * Fix a bug in remove empty directories. Co-authored-by: Joseph Areeda --- omicron/cli/process.py | 66 ++++++++++++++++++++++++++++++++++++++++-- omicron/log.py | 14 +++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/omicron/cli/process.py b/omicron/cli/process.py index 60d95e1..3268d98 100644 --- a/omicron/cli/process.py +++ b/omicron/cli/process.py @@ -59,10 +59,10 @@ """ import time +prog_start = time.time() from gwpy.segments import SegmentList, Segment -prog_start = time.time() import argparse import configparser import os @@ -114,6 +114,53 @@ def gps2str(gps): return ret +def clean_dirs(dir_list): + """Remove any empty directories we created + NB: each of those directories may contain subdirectories which we will also delete if empty + """ + for adir in dir_list: + pdir = Path(adir) + flist = list(pdir.glob('*')) + if len(flist) == 0: + pdir.rmdir() + else: + can_delete = True + for file in flist: + if file.is_dir(): + can_delete &= remove_empty_dir(file) + else: + # we found a file. Do not delete this direcory + can_delete = False + + if can_delete: + pdir.rmdir() + + +def remove_empty_dir(dir_path): + """ + Remove a directory if empty or if all it contains is empty directories. + This is a recursive function, it calls itself if it finds a directory. + :@param Path dir_path: directory to check + :@return boolean: True if directory was deleted + """ + ret = True + if not dir_path.is_dir(): + ret = False + else: + flist = list(dir_path.glob('*')) + if len(flist) == 0: + dir_path.rmdir() + else: + for file in flist: + if file.is_dir(): + ret = remove_empty_dir(file) + if not ret: + break + if ret: + dir_path.rmdir() + return ret + + def clean_tempfiles(tempfiles): for f in map(Path, tempfiles): if f.is_dir(): @@ -209,6 +256,7 @@ def create_parser(): help='additional file tag to be appended to final ' 'file descriptions', ) + outg.add_argument('-l', '--log-file', type=Path, help="save a copy of all logger messages to this file") # data processing/chunking options procg = parser.add_argument_group('Processing options') @@ -416,6 +464,11 @@ def main(args=None): # apply verbosity to logger args.verbose = max(5 - args.verbose, 0) logger.setLevel(args.verbose * 10) + if args.log_file: + logger.add_file_handler(args.log_file) + logger.debug("Command line args:") + for arg in vars(args): + logger.debug(f'{arg} = {str(getattr(args, arg))}') # validate command line arguments if args.ifo is None: @@ -625,7 +678,8 @@ def main(args=None): pardir = rundir / "parameters" trigdir = rundir / "triggers" mergedir = rundir / "merge" - for d in [cachedir, condir, logdir, pardir, trigdir, mergedir]: + run_dir_list = [cachedir, condir, logdir, pardir, trigdir, mergedir] + for d in run_dir_list: d.mkdir(exist_ok=True) oconfig.set('OUTPUT', 'DIRECTORY', str(trigdir)) @@ -713,6 +767,7 @@ def main(args=None): if dataduration < minduration and online: logger.info("Segment is too short (%d < %d), please try again later" % (duration, minduration)) + clean_dirs(run_dir_list) clean_exit(0, tempfiles) elif dataduration < minduration: raise ValueError( @@ -728,6 +783,7 @@ def main(args=None): logger.info(f'Finding segments for relevant state... from:{datastart} length: {dataduration}s') seg_qry_strt = time.time() if statebits == "guardian": # use guardian + logger.debug(f'Using guardian for {statechannel}: {datastart}-{dataend}') segs = segments.get_guardian_segments( statechannel, stateft, @@ -736,6 +792,7 @@ def main(args=None): pad=statepad, ) else: + logger.debug(f'Using segdb for {statechannel}: {datastart}-{dataend}') segs = segments.get_state_segments( statechannel, stateft, @@ -890,10 +947,13 @@ def main(args=None): ) segments.write_segments(cachesegs, segfile) logger.info("Segments written to\n%s" % segfile) + clean_dirs(run_dir_list) clean_exit(0, tempfiles) + # otherwise not all data are available, so elif len(segs) == 0 and online: logger.info("No analysable segments found, please try again later") + clean_dirs(run_dir_list) clean_exit(0, tempfiles) elif len(segs) == 0: raise RuntimeError("No analysable segments found") @@ -961,7 +1021,7 @@ def main(args=None): ojob.add_condor_cmd('+OmicronProcess', f'"{group}"') - # create post-processing job + # create post-processing jobs ppjob = condor.OmicronProcessJob(args.universe, find_executable('bash'), subdir=condir, logdir=logdir, tag='post-processing', **condorcmds) diff --git a/omicron/log.py b/omicron/log.py index b94f5ea..d25913c 100644 --- a/omicron/log.py +++ b/omicron/log.py @@ -21,6 +21,8 @@ import logging import sys +from logging.handlers import RotatingFileHandler +from pathlib import Path from gwpy.time import tconvert as gps_time_now @@ -97,6 +99,18 @@ def __init__(self, name, level=logging.DEBUG): self.addHandler(stdouthandler) self.addHandler(stderrhandler) + def add_file_handler(self, path): + """ allow saving of all messages from here on to the file + :param Path-like path: file to hold log + """ + colorformatter = ColoredFormatter(self.FORMAT) + parent_dir = Path(path).parent + if not parent_dir.exists(): + parent_dir.mkdir(mode=0o775, parents=True) + log_file_handler = RotatingFileHandler(path, maxBytes=10 ** 7, backupCount=5) + log_file_handler.setFormatter(colorformatter) + self.addHandler(log_file_handler) + def color_text(text, color): if not isinstance(color, int):