Skip to content

Commit

Permalink
Condor update2 (#176)
Browse files Browse the repository at this point in the history
* Do not cross metric day boundaries.

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"


* add log file arg delete empty directories when done

* Tweak remove empty dir removal

* Tweak remove empty dir removal again


* rebase agaist last approved PR

* rebase against last approved PR

* rebase against last approved PR again, fix flake8

* Fix a bug in remove empty directories.

Co-authored-by: Joseph Areeda <[email protected]>

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <[email protected]>

* minor doc changes

* Fix a bug where an xml.gz file could get compressed again in merge-with-gaps

* Fix a double gzip of ligolw files (#151)

* Do not cross metric day boundaries.

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <[email protected]>

* Check point merge (#147)

* Do not cross metric day boundaries.

* add log file arg delete empty directories when done

* Tweak remove empty dir removal

* Tweak remove empty dir removal again

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <[email protected]>

* rebase agaist last approved PR

* rebase against last approved PR

* rebase against last approved PR again, fix flake8

* Fix a bug in remove empty directories.

Co-authored-by: Joseph Areeda <[email protected]>

* Merge day boundary (#146)

* Address issue #126. Allow pyomicron to run from a frame cache without accessing dqsegdb. Add documentation for this

* Do not merge files if they overlap "metric days"

* Do not cross metric day boundaries.

Co-authored-by: Joseph Areeda <[email protected]>

* minor doc changes

* Fix a bug where an xml.gz file could get compressed again in merge-with-gaps

* Implement a periodic vacate to address permanent D-state (uninterupptible wait) causing jobs to fail to complete

* Always create a log file. If not specified put one in the output directory

* Fix a problem with periodic vacate.

* Up the periodic vacate time to 3 hrs

* Found a job killing typo

* Add time limits to post processing also

* Don't save segments.txt file if no sgments founds because we don't know if it's an issue of not finding them or a valid not analyzable state.

* disable periodic vacate to demo the problem.

* Fix reported version in some utilities. Only update segments.txt if omicron is actually run.

* Clarify relative imports. and add details to a few log messages

* Resolve flake8 issues

---------

Co-authored-by: Joseph Areeda <[email protected]>

* Resolve flake8 issues

* Update log format to use human readble date/time instead of gps
tweak logging to better underst guardian channel usage-

* remove old setup.cfg

* Work vonpytest failures. The remaining errors are the result of omicron segfaults if environment variable not set

* missing blank line, from flake8

* Update condor defaults in argparse. Use local time in logs not gps

* uncomment test I had ignored because of omicron segfaults

* remove extra blank lines

* Make sure we have a maximum segment for online processing, default = 1200 seconds.

* flake8 on github is different than my workstation

---------

Co-authored-by: Joseph Areeda <[email protected]>
  • Loading branch information
areeda and Joseph Areeda authored Dec 4, 2023
1 parent 360a252 commit d2411f9
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 27 deletions.
4 changes: 2 additions & 2 deletions omicron/cli/merge_with_gaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def valid_file(path, uint_bug):
os.remove(path)
else:
ret = True
logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), took {time.time()-vf_strt:.2f}')
logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), took {time.time() - vf_strt:.2f}')
return ret


Expand All @@ -203,7 +203,7 @@ def main():
parser.add_argument('--no-gzip', action='store_true', default=False,
help='Do not compress the ligolw xml files')
parser.add_argument('--uint-bug', default=False, action='store_true',
help='Fix problem XML files created by old version of Omicron beforew merging.')
help='Fix problem XML files created by old version of Omicron before merging.')
parser.add_argument('--file-list', help='File with list of input file paths, one per line')
parser.add_argument('infiles', nargs='*', help='List of paths to files to merge or copy')

Expand Down
51 changes: 32 additions & 19 deletions omicron/cli/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,13 @@ def create_parser():
'-N',
'--max-channels-per-job',
type=int,
default=10,
default=20,
help='maximum number of channels to process in a single '
'condor job (default: %(default)s)',
)
procg.add_argument('--max-online-lookback', type=int, default=1200,
help='With no immediately previous run, or one that was long ago this is the max time of an '
'online job. Default: %(default)d')
# max concurrent omicron jobs
procg.add_argument('--max-concurrent', default=10, type=int,
help='Max omicron jobs at one time [%(default)s]')
Expand Down Expand Up @@ -331,7 +334,7 @@ def create_parser():
)
condorg.add_argument(
'--condor-accounting-group',
default='ligo.prod.o3.detchar.transient.omicron',
default='ligo.prod.o4.detchar.transient.omicron',
help='accounting_group for condor submission on the LIGO '
'Data Grid (default: %(default)s)',
)
Expand All @@ -345,7 +348,7 @@ def create_parser():
)
condorg.add_argument(
'--condor-request-disk',
default='1G',
default='50G',
help='Required LIGO argument: local disk use (default: %(default)s)',
)
condorg.add_argument(
Expand All @@ -370,7 +373,7 @@ def create_parser():
'--dagman-option',
action='append',
type=str,
default=['force'],
default=['force', '-import_env'],
metavar="\"opt | opt=value\"",
help="Extra options to pass to condor_submit_dag as "
"\"-{opt} [{value}]\". "
Expand Down Expand Up @@ -472,7 +475,7 @@ def main(args=None):

logger.debug("Command line args:")
for arg in vars(args):
logger.debug(f'{arg} = {str(getattr(args, arg))}')
logger.debug(f' {arg} = {str(getattr(args, arg))}')

# validate command line arguments
if args.ifo is None:
Expand Down Expand Up @@ -644,9 +647,12 @@ def main(args=None):
pass

if statechannel:
logger.debug("State channel = %s" % statechannel)
logger.debug("State bits = %s" % ', '.join(map(str, statebits)))
logger.debug("State frametype = %s" % stateft)
logger.debug(f"State channel {statechannel}")
if statebits == 'guardian':
logger.debug(f"State bits {statebits}")
else:
logger.debug("State bits = %s" % ', '.join(map(str, statebits)))
logger.debug(f"State frametype {stateft}")

# parse padding for state segments
if statechannel or stateflag:
Expand Down Expand Up @@ -674,7 +680,7 @@ def main(args=None):
# -- set directories ------------------------------------------------------

rundir.mkdir(exist_ok=True, parents=True)
logger.info(f"Using run directory\n{rundir}")
logger.info(f"Using run directory: {rundir}")

cachedir = rundir / "cache"
condir = rundir / "condor"
Expand Down Expand Up @@ -723,26 +729,31 @@ def main(args=None):

segfile = str(rundir / "segments.txt")
keepfiles.append(segfile)
max_lookback = args.max_online_lookback

if newdag and online:
# get limit of available data (allowing for padding)
end = data.get_latest_data_gps(ifo, frametype) - padding

now = tconvert()
earliest_online = now - max_lookback
try: # start from where we got to last time
start = segments.get_last_run_segment(segfile)[1]
last_run_segment = segments.get_last_run_segment(segfile)
start = last_run_segment[1]
except IOError: # otherwise start with a sensible amount of data
if args.use_dev_shm: # process one chunk
logger.debug("No online segment record, starting with "
"%s seconds" % chunkdur)
start = end - chunkdur + padding
else: # process the last 4000 seconds (arbitrarily)
logger.debug("No online segment record, starting with "
"4000 seconds")
start = end - 4000
else: # process the last requested seconds (arbitrarily)
logger.debug(f"No online segment record, starting with {max_lookback} seconds ago, {earliest_online}")
start = end - max_lookback
else:
logger.debug("Online segment record recovered")
logger.debug(f"Online segment record recovered: {last_run_segment[0]} - {last_run_segment[1]}")
elif online:
start, end = segments.get_last_run_segment(segfile)
logger.debug(f"Online segment record recovered: {start} - {end}")
if end - start > max_lookback:
start = end - max_lookback
else:
start, end = args.gps
start = int(start)
Expand Down Expand Up @@ -785,9 +796,11 @@ def main(args=None):
if (online and statechannel) or (statechannel and not stateflag) or (
statechannel and args.no_segdb):
logger.info(f'Finding segments for relevant state... from:{datastart} length: {dataduration}s')
logger.debug(f'For segment finding: online: {online}, statechannel: {statechannel}, '
f'stateflag: {stateflag} args.no_segdb: {args.no_segdb}')
seg_qry_strt = time.time()
if statebits == "guardian": # use guardian
logger.debug(f'Using guardian for {statechannel}: {datastart}-{dataend}')
logger.debug(f'Using guardian for {statechannel}: {datastart}-{dataend} ')
segs = segments.get_guardian_segments(
statechannel,
stateft,
Expand Down Expand Up @@ -854,7 +867,7 @@ def main(args=None):
# segment, or long enough to process safely)
if truncate and abs(lastseg) < chunkdur * 2:
logger.info(
"The final segment is too short, " f'Minimum length is {int(chunkdur*2)} '
"The final segment is too short, " f'Minimum length is {int(chunkdur * 2)} '
"but ends at the limit of "
"available data, presumably this is an active segment. It "
"will be removed so that it can be processed properly later",
Expand Down Expand Up @@ -1447,7 +1460,7 @@ def main(args=None):
clean_tempfiles(tempfiles)

# and exit
logger.info(f"--- Processing complete. Elapsed: {time.time()-prog_start} seconds ----------------")
logger.info(f"--- Processing complete. Elapsed: {time.time() - prog_start} seconds ----------------")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion omicron/cli/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,4 @@ def print_nagios_json(code, message, outfile, tag='status', **extras):
if __name__ == "__main__":
main()
if logger:
logger.info(f'Run time: {(time.time()-prog_start):.1f} seconds')
logger.info(f'Run time: {(time.time() - prog_start):.1f} seconds')
2 changes: 1 addition & 1 deletion omicron/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def find_frames(obs, frametype, start, end, on_gaps='warn', **kwargs):
if on_gaps != 'ignore':
seglist = SegmentList(map(file_segment, cache)).coalesce()
missing = (SegmentList([Segment(start, end)]) - seglist).coalesce()
msg = "Missing frames:\n{}".format('\n'.join(map(lambda s: f'[{s[0]}, {s[1]}) -> {s[1]-s[0]}s', missing)))
msg = "Missing frames:\n{}".format('\n'.join(map(lambda s: f'[{s[0]}, {s[1]}) -> {s[1] - s[0]}s', missing)))
if missing and on_gaps == 'warn':
warnings.warn(msg)
elif missing:
Expand Down
5 changes: 3 additions & 2 deletions omicron/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ def filter(self, record):
class Logger(logging.Logger):
"""`~logging.Logger` with a nice format
"""
FORMAT = ('[{bold}%(name)s{reset} %(gpstime)d] %(levelname)+19s %(filename)s:%(lineno)d: '
FORMAT = ('[{bold}%(name)s{reset} %(asctime)s] %(levelname)+19s %(filename)s:%(lineno)d: '
'%(message)s'.format(bold=BOLD_SEQ, reset=RESET_SEQ))
log_file_date_format = '%m-%d %H:%M:%S'

def __init__(self, name, level=logging.DEBUG):
try:
Expand All @@ -87,7 +88,7 @@ def __init__(self, name, level=logging.DEBUG):
logging.Logger.__init__(self, name, level=level)

# set up handlers for WARNING and above to go to stderr
colorformatter = ColoredFormatter(self.FORMAT)
colorformatter = ColoredFormatter(self.FORMAT, datefmt=self.log_file_date_format)
stdouthandler = logging.StreamHandler(sys.stdout)
stdouthandler.setFormatter(colorformatter)
stderrhandler = logging.StreamHandler(sys.stderr)
Expand Down
2 changes: 1 addition & 1 deletion omicron/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(self, version=None, defaults=dict(), **kwargs):
if version is None:
try:
version = utils.get_omicron_version()
except KeyError:
except (KeyError, RuntimeError):
version = utils.OmicronVersion(const.OMICRON_VERSION)
self.version = version
self._set_defaults()
Expand Down
2 changes: 1 addition & 1 deletion omicron/tests/test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ def test_logger():

# test that the formatter prints the correct thing
outhandler = logger.handlers[0]
assert re.match('.*TEST.*\\d{10}.*DEBUG.*FILE.*test message', outhandler.format(record))
assert re.match('.*TEST.*\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}.*DEBUG.*FILE.*test message', outhandler.format(record))

0 comments on commit d2411f9

Please sign in to comment.