-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #110 from duncanmmacleod/no-glue
Remove all references to lscsoft-glue from hveto
- Loading branch information
Showing
12 changed files
with
152 additions
and
136 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,30 +29,24 @@ import argparse | |
import os | ||
import warnings | ||
import multiprocessing | ||
from pathlib import Path | ||
|
||
from lal.utils import CacheEntry | ||
import h5py | ||
|
||
from glue.lal import Cache | ||
from glue.ligolw.ligolw import (Document, LIGO_LW, LIGOLWContentHandler) | ||
from glue.ligolw.lsctables import ProcessTable | ||
from glue.ligolw.utils import (write_filename as write_ligolw, | ||
load_filename as load_ligolw) | ||
from glue.ligolw.utils.process import (register_to_xmldoc as | ||
append_process_table) | ||
from astropy.table import vstack | ||
|
||
from gwpy.io import ligolw as io_ligolw | ||
from gwpy.io.cache import (cache_segments, file_segment, read_cache) | ||
from gwpy.time import to_gps | ||
from gwpy.segments import (Segment, SegmentList, | ||
DataQualityFlag, DataQualityDict) | ||
|
||
from hveto import (__version__, log, config) | ||
from hveto.triggers import (get_triggers, find_auxiliary_channels, | ||
find_trigger_files) | ||
from hveto.utils import write_lal_cache | ||
|
||
__author__ = 'Duncan Macleod <[email protected]>' | ||
|
||
Cache.entry_class = CacheEntry # remove deprecationwarning | ||
|
||
IFO = os.getenv('IFO') | ||
|
||
logger = log.Logger('hveto-cache-events') | ||
|
@@ -61,7 +55,7 @@ logger = log.Logger('hveto-cache-events') | |
# -- parse command line ------------------------------------------------------- | ||
|
||
def abs_path(p): | ||
return os.path.abspath(os.path.expanduser(p)) | ||
return Path(p).expanduser().resolve() | ||
|
||
parser = argparse.ArgumentParser(description=__doc__) | ||
|
||
|
@@ -97,7 +91,7 @@ parser.add_argument('--append', action='store_true', default=False, | |
'start from scratch (default)') | ||
|
||
pout = parser.add_argument_group('Output options') | ||
pout.add_argument('-o', '--output-directory', default=os.curdir, | ||
pout.add_argument('-o', '--output-directory', default=os.curdir, type=abs_path, | ||
help='path of output directory, default: %(default)s') | ||
|
||
args = parser.parse_args() | ||
|
@@ -107,16 +101,6 @@ start = int(args.gpsstart) | |
end = int(args.gpsend) | ||
duration = end - start | ||
|
||
# format process params for LIGO_LW | ||
procparams = {k.replace('_', '-'): v for k, v in vars(args).items() if v} | ||
for gpskey in ('gpsstart', 'gpsend'): | ||
procparams[gpskey] = int(procparams[gpskey]) | ||
for listkey in ('config-file', 'primary-cache', 'auxiliary-cache'): | ||
try: | ||
procparams[listkey] = ','.join(procparams[listkey]) | ||
except KeyError: | ||
pass | ||
|
||
logger.info("-- Welcome to Hveto --") | ||
logger.info("GPS start time: %d" % start) | ||
logger.info("GPS end time: %d" % end) | ||
|
@@ -126,21 +110,15 @@ logger.info("Interferometer: %s" % ifo) | |
|
||
# read configuration | ||
cp = config.HvetoConfigParser(ifo=args.ifo) | ||
cp.read(args.config_file) | ||
cp.read(map(str, args.config_file)) | ||
logger.info("Parsed configuration file(s)") | ||
|
||
# format output directory | ||
outdir = abs_path(args.output_directory) | ||
if not os.path.isdir(outdir): | ||
os.makedirs(outdir) | ||
os.chdir(outdir) | ||
logger.info("Working directory: %s" % outdir) | ||
trigdir = 'triggers' | ||
if not os.path.isdir(trigdir): | ||
os.makedirs(trigdir) | ||
|
||
os.chdir(trigdir) | ||
trigdir = os.getcwd() | ||
outdir = args.output_directory | ||
outdir.mkdir(parents=True, exist_ok=True) | ||
logger.info("Working directory: {}".format(outdir)) | ||
trigdir = outdir / 'triggers' | ||
trigdir.mkdir(parents=True, exist_ok=True) | ||
|
||
# get segments | ||
aflag = cp.get('segments', 'analysis-flag') | ||
|
@@ -166,48 +144,52 @@ logger.info("Retrieved %d segments for %s with %ss (%.2f%%) livetime" | |
snrs = cp.getfloats('hveto', 'snr-thresholds') | ||
minsnr = min(snrs) | ||
|
||
# -- utility methods ---------------------------------------------------------- | ||
|
||
contenthandler = LIGOLWContentHandler | ||
|
||
# -- utility methods ---------------------------------------------------------- | ||
|
||
def create_filename(channel): | ||
def create_path(channel): | ||
ifo, name = channel.split(':', 1) | ||
name = name.replace('-', '_') | ||
return os.path.join(trigdir, | ||
'%s-%s-%d-%d.xml.gz' % (ifo, name, start, duration)) | ||
return trigdir / "{}-{}-{}-{}.h5".format(ifo, name, start, duration) | ||
|
||
|
||
def read_and_cache_events(channel, etg, cache=None, trigfind_kw={}, | ||
**read_kw): | ||
cfile = create_filename(channel) | ||
cfile = create_path(channel) | ||
# read existing cached triggers and work out new segments to query | ||
if args.append and os.path.isfile(cfile): | ||
previous = DataQualityFlag.read(cfile, format='ligolw').coalesce() | ||
if args.append and cfile.is_file(): | ||
previous = DataQualityFlag.read( | ||
str(cfile), | ||
path='segments', | ||
format='hdf5', | ||
).coalesce() | ||
new = analysis - previous | ||
else: | ||
new = analysis.copy() | ||
# get cache of files | ||
if cache is None: | ||
cache = find_trigger_files(channel, etg, new.active, **trigfind_kw) | ||
else: | ||
cache = cache.sieve(segmentlist=new.active) | ||
cache = list(filter( | ||
lambda e: new.active.intersects_segment(file_segment(e)), | ||
cache, | ||
)) | ||
# restrict 'active' segments to when we have data | ||
try: | ||
new.active &= cache.to_segmentlistdict().values()[0] | ||
new.active &= cache_segments(cache) | ||
except IndexError: | ||
new.active = type(new.active)() | ||
# find new triggers | ||
try: | ||
trigs = get_triggers(channel, auxetg, new.active, cache=cache, | ||
trigs = get_triggers(channel, etg, new.active, cache=cache, | ||
raw=True, **read_kw) | ||
# catch error and continue | ||
except ValueError as e: | ||
warnings.warn('%s: %s' % (type(e).__name__, str(e))) | ||
else: | ||
a = write_events(channel, trigs, new) | ||
path = write_events(channel, trigs, new) | ||
try: | ||
return CacheEntry.from_T050017(a), len(trigs) | ||
return path, len(trigs) | ||
except TypeError: # None | ||
return | ||
|
||
|
@@ -216,39 +198,32 @@ def write_events(channel, tab, segments): | |
"""Write events to file with a given filename | ||
""" | ||
# get filename | ||
filename = create_filename(channel) | ||
path = create_path(channel) | ||
h5f = h5py.File(str(path), 'a') | ||
|
||
# read existing document | ||
if args.append and os.path.isfile(filename): | ||
xmldoc = io_ligolw.read_ligolw(filename) | ||
# or, create document | ||
# read existing table from file | ||
try: | ||
old = tab.read(h5f["triggers"], format="hdf5") | ||
except KeyError: | ||
pass | ||
else: | ||
xmldoc = Document() | ||
xmldoc.appendChild(LIGO_LW()) | ||
tab = vstack(old, tab) | ||
|
||
# append process table | ||
with multiprocessing.Lock(): | ||
ProcessTable.next_id = type(ProcessTable.next_id)(0) | ||
process = append_process_table(xmldoc, os.path.basename(__file__), | ||
procparams) | ||
# append event table | ||
tab.write(h5f, path="triggers", append=True, overwrite=True) | ||
|
||
# append segment tables | ||
# write segments | ||
try: | ||
segments.write(xmldoc, format='ligolw', append=True, | ||
attrs={'process_id': process.process_id}) | ||
except TypeError as exc: | ||
if 'process_id' in str(exc): | ||
segments.write(xmldoc, format='ligolw', append=True) | ||
else: | ||
raise | ||
|
||
# append event table | ||
if len(tab): | ||
tab.write(xmldoc, append=True) | ||
oldsegs = DataQualityFlag.read(h5f, path="segments", format="hdf5") | ||
except KeyError: | ||
pass | ||
else: | ||
segments = oldsegs + segments | ||
segments.write(h5f, path="segments", append=True, overwrite=True) | ||
|
||
# write file to disk | ||
write_ligolw(xmldoc, filename, gz=True) | ||
return filename | ||
h5f.close() | ||
return path | ||
|
||
|
||
# -- load channels ------------------------------------------------------------ | ||
|
@@ -258,7 +233,7 @@ pchannel = cp.get('primary', 'channel') | |
|
||
# read auxiliary cache | ||
if args.auxiliary_cache: | ||
acache = Cache.fromfilenames(args.auxiliary_cache) | ||
acache = [e for c in args.auxiliary_cache for e in read_cache(str(c))] | ||
else: | ||
acache = None | ||
|
||
|
@@ -295,7 +270,7 @@ logger.debug("Read list of %d auxiliary channels" % len(auxchannels)) | |
|
||
# remove unsafe channels | ||
nunsafe = 0 | ||
for i in xrange(len(auxchannels) -1, -1, -1): | ||
for i in range(len(auxchannels) -1, -1, -1): | ||
if auxchannels[i] in unsafe: | ||
logger.warning("Auxiliary channel %r identified as unsafe and has " | ||
"been removed" % auxchannels[i]) | ||
|
@@ -311,7 +286,7 @@ logger.info("Reading events for primary channel...") | |
|
||
# read primary cache | ||
if args.primary_cache: | ||
pcache = Cache.fromfilenames(args.primary_cache) | ||
pcache = [e for c in args.primary_cache for e in read_cache(str(c))] | ||
else: | ||
pcache = None | ||
|
||
|
@@ -333,20 +308,19 @@ except TypeError: | |
n = 0 | ||
if n: | ||
logger.info("Cached %d new events for %s" % (n, pchannel)) | ||
elif args.append and os.path.isfile(e.path): | ||
elif args.append and e.is_file(): | ||
logger.info("Cached 0 new events for %s" % pchannel) | ||
else: | ||
message = "No events found for %r in %d seconds of livetime" % ( | ||
pchannel, livetime) | ||
logger.critical(message) | ||
|
||
# write primary to local cache | ||
pcache = Cache([e]) | ||
with open('%s-HVETO_PRIMARY_CACHE-%d-%d.lcf' | ||
% (ifo, start, duration), 'w') as f: | ||
pcache.tofile(f) | ||
pname = os.path.join(trigdir, f.name) | ||
logger.info('Primary cache written to %s' % pname) | ||
pname = trigdir / '{}-HVETO_PRIMARY_CACHE-{}-{}.lcf'.format( | ||
ifo, start, duration, | ||
) | ||
write_lal_cache(str(pname), [e]) | ||
logger.info('Primary cache written to {}'.format(pname)) | ||
|
||
# -- load auxiliary triggers -------------------------------------------------- | ||
|
||
|
@@ -362,8 +336,9 @@ def read_and_write_aux_triggers(channel): | |
auxcache = None | ||
else: | ||
ifo, name = channel.split(':') | ||
desc = name.replace('-', '_') | ||
auxcache = acache.sieve(ifos=ifo, description='%s*' % desc) | ||
match = "{}-{}".format(ifo, name.replace('-', '_')) | ||
auxcache = [e for e in acache if Path(e).name.startswith(match)] | ||
|
||
out = read_and_cache_events(channel, auxetg, cache=auxcache, snr=minsnr, | ||
frange=auxfreq, trigfind_kw=atrigfindkw, | ||
**areadkw) | ||
|
@@ -394,15 +369,15 @@ if args.nproc > 1: | |
else: | ||
results = map(read_and_write_aux_triggers, auxchannels) | ||
|
||
acache = Cache(x for x in results if x is not None) | ||
with open('%s-HVETO_AUXILIARY_CACHE-%d-%d.lcf' | ||
% (ifo, start, duration), 'w') as f: | ||
acache.tofile(f) | ||
aname = os.path.join(trigdir, f.name) | ||
logger.info('Auxiliary cache written to %s' % aname) | ||
acache = [x for x in results if x is not None] | ||
aname = trigdir / '{}-HVETO_AUXILIARY_CACHE-{}-{}.lcf'.format( | ||
ifo, start, duration, | ||
) | ||
write_lal_cache(str(aname), [e for e in results if e is not None]) | ||
logger.info('Auxiliary cache written to {}'.format(aname)) | ||
|
||
# -- finish ------------------------------------------------------------------- | ||
|
||
logger.info('Done, you can use these cache files in an hveto analysis by ' | ||
'passing the following arguments:\n --primary-cache %s ' | ||
'--auxiliary-cache %s' % (pname, aname)) | ||
'passing the following arguments:\n --primary-cache {} ' | ||
'--auxiliary-cache {}'.format(pname, aname)) |
Oops, something went wrong.