Skip to content

Commit

Permalink
Check point merge (#147)
Browse files Browse the repository at this point in the history
* Do not cross metric day boundaries.

* add log file arg delete empty directories when done

* Tweak remove empty dir removal

* Tweak remove empty dir removal again

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <[email protected]>

* rebase agaist last approved PR

* rebase against last approved PR

* rebase against last approved PR again, fix flake8

* Fix a bug in remove empty directories.

Co-authored-by: Joseph Areeda <[email protected]>
  • Loading branch information
areeda and Joseph Areeda authored Oct 16, 2022
1 parent 702ce64 commit acb2921
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 3 deletions.
66 changes: 63 additions & 3 deletions omicron/cli/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@
"""
import time
prog_start = time.time()

from gwpy.segments import SegmentList, Segment

prog_start = time.time()
import argparse
import configparser
import os
Expand Down Expand Up @@ -114,6 +114,53 @@ def gps2str(gps):
return ret


def clean_dirs(dir_list):
"""Remove any empty directories we created
NB: each of those directories may contain subdirectories which we will also delete if empty
"""
for adir in dir_list:
pdir = Path(adir)
flist = list(pdir.glob('*'))
if len(flist) == 0:
pdir.rmdir()
else:
can_delete = True
for file in flist:
if file.is_dir():
can_delete &= remove_empty_dir(file)
else:
# we found a file. Do not delete this direcory
can_delete = False

if can_delete:
pdir.rmdir()


def remove_empty_dir(dir_path):
"""
Remove a directory if empty or if all it contains is empty directories.
This is a recursive function, it calls itself if it finds a directory.
:@param Path dir_path: directory to check
:@return boolean: True if directory was deleted
"""
ret = True
if not dir_path.is_dir():
ret = False
else:
flist = list(dir_path.glob('*'))
if len(flist) == 0:
dir_path.rmdir()
else:
for file in flist:
if file.is_dir():
ret = remove_empty_dir(file)
if not ret:
break
if ret:
dir_path.rmdir()
return ret


def clean_tempfiles(tempfiles):
for f in map(Path, tempfiles):
if f.is_dir():
Expand Down Expand Up @@ -209,6 +256,7 @@ def create_parser():
help='additional file tag to be appended to final '
'file descriptions',
)
outg.add_argument('-l', '--log-file', type=Path, help="save a copy of all logger messages to this file")

# data processing/chunking options
procg = parser.add_argument_group('Processing options')
Expand Down Expand Up @@ -416,6 +464,11 @@ def main(args=None):
# apply verbosity to logger
args.verbose = max(5 - args.verbose, 0)
logger.setLevel(args.verbose * 10)
if args.log_file:
logger.add_file_handler(args.log_file)
logger.debug("Command line args:")
for arg in vars(args):
logger.debug(f'{arg} = {str(getattr(args, arg))}')

# validate command line arguments
if args.ifo is None:
Expand Down Expand Up @@ -625,7 +678,8 @@ def main(args=None):
pardir = rundir / "parameters"
trigdir = rundir / "triggers"
mergedir = rundir / "merge"
for d in [cachedir, condir, logdir, pardir, trigdir, mergedir]:
run_dir_list = [cachedir, condir, logdir, pardir, trigdir, mergedir]
for d in run_dir_list:
d.mkdir(exist_ok=True)

oconfig.set('OUTPUT', 'DIRECTORY', str(trigdir))
Expand Down Expand Up @@ -713,6 +767,7 @@ def main(args=None):
if dataduration < minduration and online:
logger.info("Segment is too short (%d < %d), please try again later"
% (duration, minduration))
clean_dirs(run_dir_list)
clean_exit(0, tempfiles)
elif dataduration < minduration:
raise ValueError(
Expand All @@ -728,6 +783,7 @@ def main(args=None):
logger.info(f'Finding segments for relevant state... from:{datastart} length: {dataduration}s')
seg_qry_strt = time.time()
if statebits == "guardian": # use guardian
logger.debug(f'Using guardian for {statechannel}: {datastart}-{dataend}')
segs = segments.get_guardian_segments(
statechannel,
stateft,
Expand All @@ -736,6 +792,7 @@ def main(args=None):
pad=statepad,
)
else:
logger.debug(f'Using segdb for {statechannel}: {datastart}-{dataend}')
segs = segments.get_state_segments(
statechannel,
stateft,
Expand Down Expand Up @@ -890,10 +947,13 @@ def main(args=None):
)
segments.write_segments(cachesegs, segfile)
logger.info("Segments written to\n%s" % segfile)
clean_dirs(run_dir_list)
clean_exit(0, tempfiles)

# otherwise not all data are available, so
elif len(segs) == 0 and online:
logger.info("No analysable segments found, please try again later")
clean_dirs(run_dir_list)
clean_exit(0, tempfiles)
elif len(segs) == 0:
raise RuntimeError("No analysable segments found")
Expand Down Expand Up @@ -961,7 +1021,7 @@ def main(args=None):

ojob.add_condor_cmd('+OmicronProcess', f'"{group}"')

# create post-processing job
# create post-processing jobs
ppjob = condor.OmicronProcessJob(args.universe, find_executable('bash'),
subdir=condir, logdir=logdir,
tag='post-processing', **condorcmds)
Expand Down
14 changes: 14 additions & 0 deletions omicron/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import logging
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path

from gwpy.time import tconvert as gps_time_now

Expand Down Expand Up @@ -97,6 +99,18 @@ def __init__(self, name, level=logging.DEBUG):
self.addHandler(stdouthandler)
self.addHandler(stderrhandler)

def add_file_handler(self, path):
""" allow saving of all messages from here on to the file
:param Path-like path: file to hold log
"""
colorformatter = ColoredFormatter(self.FORMAT)
parent_dir = Path(path).parent
if not parent_dir.exists():
parent_dir.mkdir(mode=0o775, parents=True)
log_file_handler = RotatingFileHandler(path, maxBytes=10 ** 7, backupCount=5)
log_file_handler.setFormatter(colorformatter)
self.addHandler(log_file_handler)


def color_text(text, color):
if not isinstance(color, int):
Expand Down

0 comments on commit acb2921

Please sign in to comment.