diff --git a/omicron/cli/merge_with_gaps.py b/omicron/cli/merge_with_gaps.py index 3861d74..8ca72a3 100644 --- a/omicron/cli/merge_with_gaps.py +++ b/omicron/cli/merge_with_gaps.py @@ -181,7 +181,7 @@ def valid_file(path, uint_bug): os.remove(path) else: ret = True - logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), took {time.time()-vf_strt:.2f}') + logger.debug(f'valid_file: {ret} {path.name} ({ntrig}), took {time.time() - vf_strt:.2f}') return ret @@ -203,7 +203,7 @@ def main(): parser.add_argument('--no-gzip', action='store_true', default=False, help='Do not compress the ligolw xml files') parser.add_argument('--uint-bug', default=False, action='store_true', - help='Fix problem XML files created by old version of Omicron beforew merging.') + help='Fix problem XML files created by old version of Omicron before merging.') parser.add_argument('--file-list', help='File with list of input file paths, one per line') parser.add_argument('infiles', nargs='*', help='List of paths to files to merge or copy') diff --git a/omicron/cli/process.py b/omicron/cli/process.py index 57f39d0..94efaa8 100644 --- a/omicron/cli/process.py +++ b/omicron/cli/process.py @@ -271,10 +271,13 @@ def create_parser(): '-N', '--max-channels-per-job', type=int, - default=10, + default=20, help='maximum number of channels to process in a single ' 'condor job (default: %(default)s)', ) + procg.add_argument('--max-online-lookback', type=int, default=1200, + help='With no immediately previous run, or one that was long ago this is the max time of an ' + 'online job. Default: %(default)d') # max concurrent omicron jobs procg.add_argument('--max-concurrent', default=10, type=int, help='Max omicron jobs at one time [%(default)s]') @@ -331,7 +334,7 @@ def create_parser(): ) condorg.add_argument( '--condor-accounting-group', - default='ligo.prod.o3.detchar.transient.omicron', + default='ligo.prod.o4.detchar.transient.omicron', help='accounting_group for condor submission on the LIGO ' 'Data Grid (default: %(default)s)', ) @@ -345,7 +348,7 @@ def create_parser(): ) condorg.add_argument( '--condor-request-disk', - default='1G', + default='50G', help='Required LIGO argument: local disk use (default: %(default)s)', ) condorg.add_argument( @@ -370,7 +373,7 @@ def create_parser(): '--dagman-option', action='append', type=str, - default=['force'], + default=['force', '-import_env'], metavar="\"opt | opt=value\"", help="Extra options to pass to condor_submit_dag as " "\"-{opt} [{value}]\". " @@ -472,7 +475,7 @@ def main(args=None): logger.debug("Command line args:") for arg in vars(args): - logger.debug(f'{arg} = {str(getattr(args, arg))}') + logger.debug(f' {arg} = {str(getattr(args, arg))}') # validate command line arguments if args.ifo is None: @@ -644,9 +647,12 @@ def main(args=None): pass if statechannel: - logger.debug("State channel = %s" % statechannel) - logger.debug("State bits = %s" % ', '.join(map(str, statebits))) - logger.debug("State frametype = %s" % stateft) + logger.debug(f"State channel {statechannel}") + if statebits == 'guardian': + logger.debug(f"State bits {statebits}") + else: + logger.debug("State bits = %s" % ', '.join(map(str, statebits))) + logger.debug(f"State frametype {stateft}") # parse padding for state segments if statechannel or stateflag: @@ -674,7 +680,7 @@ def main(args=None): # -- set directories ------------------------------------------------------ rundir.mkdir(exist_ok=True, parents=True) - logger.info(f"Using run directory\n{rundir}") + logger.info(f"Using run directory: {rundir}") cachedir = rundir / "cache" condir = rundir / "condor" @@ -723,26 +729,31 @@ def main(args=None): segfile = str(rundir / "segments.txt") keepfiles.append(segfile) + max_lookback = args.max_online_lookback if newdag and online: # get limit of available data (allowing for padding) end = data.get_latest_data_gps(ifo, frametype) - padding - + now = tconvert() + earliest_online = now - max_lookback try: # start from where we got to last time - start = segments.get_last_run_segment(segfile)[1] + last_run_segment = segments.get_last_run_segment(segfile) + start = last_run_segment[1] except IOError: # otherwise start with a sensible amount of data if args.use_dev_shm: # process one chunk logger.debug("No online segment record, starting with " "%s seconds" % chunkdur) start = end - chunkdur + padding - else: # process the last 4000 seconds (arbitrarily) - logger.debug("No online segment record, starting with " - "4000 seconds") - start = end - 4000 + else: # process the last requested seconds (arbitrarily) + logger.debug(f"No online segment record, starting with {max_lookback} seconds ago, {earliest_online}") + start = end - max_lookback else: - logger.debug("Online segment record recovered") + logger.debug(f"Online segment record recovered: {last_run_segment[0]} - {last_run_segment[1]}") elif online: start, end = segments.get_last_run_segment(segfile) + logger.debug(f"Online segment record recovered: {start} - {end}") + if end - start > max_lookback: + start = end - max_lookback else: start, end = args.gps start = int(start) @@ -785,9 +796,11 @@ def main(args=None): if (online and statechannel) or (statechannel and not stateflag) or ( statechannel and args.no_segdb): logger.info(f'Finding segments for relevant state... from:{datastart} length: {dataduration}s') + logger.debug(f'For segment finding: online: {online}, statechannel: {statechannel}, ' + f'stateflag: {stateflag} args.no_segdb: {args.no_segdb}') seg_qry_strt = time.time() if statebits == "guardian": # use guardian - logger.debug(f'Using guardian for {statechannel}: {datastart}-{dataend}') + logger.debug(f'Using guardian for {statechannel}: {datastart}-{dataend} ') segs = segments.get_guardian_segments( statechannel, stateft, @@ -854,7 +867,7 @@ def main(args=None): # segment, or long enough to process safely) if truncate and abs(lastseg) < chunkdur * 2: logger.info( - "The final segment is too short, " f'Minimum length is {int(chunkdur*2)} ' + "The final segment is too short, " f'Minimum length is {int(chunkdur * 2)} ' "but ends at the limit of " "available data, presumably this is an active segment. It " "will be removed so that it can be processed properly later", @@ -1447,7 +1460,7 @@ def main(args=None): clean_tempfiles(tempfiles) # and exit - logger.info(f"--- Processing complete. Elapsed: {time.time()-prog_start} seconds ----------------") + logger.info(f"--- Processing complete. Elapsed: {time.time() - prog_start} seconds ----------------") if __name__ == "__main__": diff --git a/omicron/cli/status.py b/omicron/cli/status.py index 4960a36..05dd0d8 100644 --- a/omicron/cli/status.py +++ b/omicron/cli/status.py @@ -862,4 +862,4 @@ def print_nagios_json(code, message, outfile, tag='status', **extras): if __name__ == "__main__": main() if logger: - logger.info(f'Run time: {(time.time()-prog_start):.1f} seconds') + logger.info(f'Run time: {(time.time() - prog_start):.1f} seconds') diff --git a/omicron/data.py b/omicron/data.py index 7552715..a21dbd7 100644 --- a/omicron/data.py +++ b/omicron/data.py @@ -190,7 +190,7 @@ def find_frames(obs, frametype, start, end, on_gaps='warn', **kwargs): if on_gaps != 'ignore': seglist = SegmentList(map(file_segment, cache)).coalesce() missing = (SegmentList([Segment(start, end)]) - seglist).coalesce() - msg = "Missing frames:\n{}".format('\n'.join(map(lambda s: f'[{s[0]}, {s[1]}) -> {s[1]-s[0]}s', missing))) + msg = "Missing frames:\n{}".format('\n'.join(map(lambda s: f'[{s[0]}, {s[1]}) -> {s[1] - s[0]}s', missing))) if missing and on_gaps == 'warn': warnings.warn(msg) elif missing: diff --git a/omicron/log.py b/omicron/log.py index d25913c..35421dc 100644 --- a/omicron/log.py +++ b/omicron/log.py @@ -77,8 +77,9 @@ def filter(self, record): class Logger(logging.Logger): """`~logging.Logger` with a nice format """ - FORMAT = ('[{bold}%(name)s{reset} %(gpstime)d] %(levelname)+19s %(filename)s:%(lineno)d: ' + FORMAT = ('[{bold}%(name)s{reset} %(asctime)s] %(levelname)+19s %(filename)s:%(lineno)d: ' '%(message)s'.format(bold=BOLD_SEQ, reset=RESET_SEQ)) + log_file_date_format = '%m-%d %H:%M:%S' def __init__(self, name, level=logging.DEBUG): try: @@ -87,7 +88,7 @@ def __init__(self, name, level=logging.DEBUG): logging.Logger.__init__(self, name, level=level) # set up handlers for WARNING and above to go to stderr - colorformatter = ColoredFormatter(self.FORMAT) + colorformatter = ColoredFormatter(self.FORMAT, datefmt=self.log_file_date_format) stdouthandler = logging.StreamHandler(sys.stdout) stdouthandler.setFormatter(colorformatter) stderrhandler = logging.StreamHandler(sys.stderr) diff --git a/omicron/parameters.py b/omicron/parameters.py index b36671f..3eb3310 100644 --- a/omicron/parameters.py +++ b/omicron/parameters.py @@ -65,7 +65,7 @@ def __init__(self, version=None, defaults=dict(), **kwargs): if version is None: try: version = utils.get_omicron_version() - except KeyError: + except (KeyError, RuntimeError): version = utils.OmicronVersion(const.OMICRON_VERSION) self.version = version self._set_defaults() diff --git a/omicron/tests/test_log.py b/omicron/tests/test_log.py index 3d5a0ee..ff8572a 100644 --- a/omicron/tests/test_log.py +++ b/omicron/tests/test_log.py @@ -44,4 +44,4 @@ def test_logger(): # test that the formatter prints the correct thing outhandler = logger.handlers[0] - assert re.match('.*TEST.*\\d{10}.*DEBUG.*FILE.*test message', outhandler.format(record)) + assert re.match('.*TEST.*\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}.*DEBUG.*FILE.*test message', outhandler.format(record))