Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyCBC Live: check state vector before trying to read h(t) #4845

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions pycbc/frame/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ def __init__(self, frame_src,
self.detector = channel_name.split(':')[0]

self.update_cache()
self.channel_type, self.raw_sample_rate = self._retrieve_metadata(self.stream, self.channel_name)
self.channel_type, self.raw_sample_rate = self._retrieve_metadata(
self.stream, self.channel_name
)

raw_size = self.raw_sample_rate * max_buffer
self.raw_buffer = TimeSeries(zeros(raw_size, dtype=dtype),
Expand Down Expand Up @@ -584,13 +586,21 @@ def _read_frame(self, blocksize):
try:
read_func = _fr_type_map[self.channel_type][0]
dtype = _fr_type_map[self.channel_type][1]
data = read_func(self.stream, self.channel_name,
self.read_pos, int(blocksize), 0)
return TimeSeries(data.data.data, delta_t=data.deltaT,
epoch=self.read_pos,
dtype=dtype)
data = read_func(
self.stream,
self.channel_name,
self.read_pos,
int(blocksize),
0
)
return TimeSeries(
data.data.data,
delta_t=data.deltaT,
epoch=self.read_pos,
dtype=dtype
)
except Exception:
raise RuntimeError('Cannot read {0} frame data'.format(self.channel_name))
raise RuntimeError(f'Cannot read {self.channel_name} frame data')

def null_advance(self, blocksize):
"""Advance and insert zeros
Expand Down Expand Up @@ -664,8 +674,9 @@ def update_cache_by_increment(self, blocksize):
cache = locations_to_cache(keys)
stream = lalframe.FrStreamCacheOpen(cache)
self.stream = stream
self.channel_type, self.raw_sample_rate = \
self._retrieve_metadata(self.stream, self.channel_name)
self.channel_type, self.raw_sample_rate = self._retrieve_metadata(
self.stream, self.channel_name
)

def attempt_advance(self, blocksize, timeout=10):
""" Attempt to advance the frame buffer. Retry upon failure, except
Expand Down Expand Up @@ -826,19 +837,22 @@ def indices_of_flag(self, start_time, duration, times, padding=0):
return idx

def advance(self, blocksize):
""" Add blocksize seconds more to the buffer, push blocksize seconds
from the beginning.
"""Read `blocksize` seconds of new status data, append it to the end of
the internal status buffer, and drop `blocksize` seconds from the
beginning of the buffer. Check if the newly added status data indicates
usable or unusable strain data.

Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
The number of seconds to attempt to read from the channel.

Returns
-------
status: boolean
Returns True if all of the status information if valid,
False if any is not.
Returns True if all of the status information if valid, False if
any is not, None if there was a problem reading the status
information.
"""
try:
if self.increment_update_cache:
Expand All @@ -847,7 +861,14 @@ def advance(self, blocksize):
return self.check_valid(ts)
except RuntimeError:
self.null_advance(blocksize)
return False
return None

def attempt_advance(self, blocksize, timeout=10):
result = super().attempt_advance(blocksize, timeout)
if result is None:
return None
return self.check_valid(result)


class iDQBuffer(object):

Expand Down
117 changes: 76 additions & 41 deletions pycbc/strain/strain.py
Original file line number Diff line number Diff line change
Expand Up @@ -1848,32 +1848,67 @@ def null_advance_strain(self, blocksize):
self.taper_immediate_strain = True

def advance(self, blocksize, timeout=10):
"""Advanced buffer blocksize seconds.
"""Advance buffer blocksize seconds.

Add blocksize seconds more to the buffer, push blocksize seconds
from the beginning.

Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
The number of seconds to attempt to read from the channel.
timeout: {int, 10}
Maximum time (in seconds) to wait before declaring frame files are
too late and reporting unusable data.

Returns
-------
status: boolean
Returns True if this block is analyzable.
"""
ts = super(StrainBuffer, self).attempt_advance(blocksize, timeout=timeout)
self.blocksize = blocksize
if self.state:
# We are using information from the state vector, so check what is
# says about the new strain data.
state_advance_result = self.state.attempt_advance(
blocksize, timeout=timeout
)
if not state_advance_result:
# Something about the state vector indicates a problem.
if state_advance_result is None:
logger.warning(
"Failed to read %s state vector. Problem with the "
"frame files, or analysis configured incorrectly",
self.detector
)
else:
logger.info(
"%s state vector indicates unusable data", self.detector
)
# Either way, give up here; we will not analyze the new segment
# of strain data.
self.add_hard_count()
self.null_advance_strain(blocksize)
if self.dq:
self.dq.null_advance(blocksize)
if self.idq:
self.idq.null_advance(blocksize)
return False

# Either we are not using the state vector, or the state vector says
# the data is ok to analyze. So try to get the next segment of data.
ts = super(StrainBuffer, self).attempt_advance(
blocksize, timeout=timeout
)
self.blocksize = blocksize
self.gate_params = []

# We have given up so there is no time series
if ts is None:
logger.info("%s frame is late, giving up", self.detector)
logger.warning(
"Failed to read %s strain channel. Problem with the frame "
"files, or analysis configured incorrectly",
self.detector
)
self.null_advance_strain(blocksize)
if self.state:
self.state.null_advance(blocksize)
if self.dq:
self.dq.null_advance(blocksize)
if self.idq:
Expand All @@ -1883,19 +1918,6 @@ def advance(self, blocksize, timeout=10):
# We collected some data so we are closer to being able to analyze data
self.wait_duration -= blocksize

# If the data we got was invalid, reset the counter on how much to collect
# This behavior corresponds to how we handle CAT1 vetoes
if self.state and self.state.advance(blocksize) is False:
self.add_hard_count()
self.null_advance_strain(blocksize)
if self.dq:
self.dq.null_advance(blocksize)
if self.idq:
self.idq.null_advance(blocksize)
logger.info("%s time has invalid data, resetting buffer",
self.detector)
return False

# Also advance the dq vector and idq timeseries in lockstep
if self.dq:
self.dq.advance(blocksize)
Expand All @@ -1913,13 +1935,17 @@ def advance(self, blocksize, timeout=10):
start = len(self.raw_buffer) - csize * self.factor
strain = self.raw_buffer[start:]

strain = pycbc.filter.highpass_fir(strain, self.highpass_frequency,
self.highpass_samples,
beta=self.beta)
strain = pycbc.filter.highpass_fir(
strain,
self.highpass_frequency,
self.highpass_samples,
beta=self.beta
)
strain = (strain * self.dyn_range_fac).astype(numpy.float32)

strain = pycbc.filter.resample_to_delta_t(strain,
1.0/self.sample_rate, method='ldas')
strain = pycbc.filter.resample_to_delta_t(
strain, 1.0 / self.sample_rate, method='ldas'
)

# remove corruption at beginning
strain = strain[self.corruption:]
Expand All @@ -1928,7 +1954,8 @@ def advance(self, blocksize, timeout=10):
if self.taper_immediate_strain:
logger.info("Tapering start of %s strain block", self.detector)
strain = gate_data(
strain, [(strain.start_time, 0., self.autogating_taper)])
strain, [(strain.start_time, 0., self.autogating_taper)]
)
self.taper_immediate_strain = False

# Stitch into continuous stream
Expand All @@ -1939,20 +1966,28 @@ def advance(self, blocksize, timeout=10):
# apply gating if needed
if self.autogating_threshold is not None:
autogating_duration_length = self.autogating_duration * self.sample_rate
autogating_start_sample = int(len(self.strain) - autogating_duration_length)
autogating_start_sample = int(
len(self.strain) - autogating_duration_length
)
glitch_times = detect_loud_glitches(
self.strain[autogating_start_sample:-self.corruption],
psd_duration=self.autogating_psd_segment_length, psd_stride=self.autogating_psd_stride,
threshold=self.autogating_threshold,
cluster_window=self.autogating_cluster,
low_freq_cutoff=self.highpass_frequency,
corrupt_time=self.autogating_pad)
self.strain[autogating_start_sample:-self.corruption],
psd_duration=self.autogating_psd_segment_length,
psd_stride=self.autogating_psd_stride,
threshold=self.autogating_threshold,
cluster_window=self.autogating_cluster,
low_freq_cutoff=self.highpass_frequency,
corrupt_time=self.autogating_pad
)
if len(glitch_times) > 0:
logger.info('Autogating %s at %s', self.detector,
', '.join(['%.3f' % gt for gt in glitch_times]))
self.gate_params = \
[(gt, self.autogating_width, self.autogating_taper)
for gt in glitch_times]
logger.info(
'Autogating %s at %s',
self.detector,
', '.join(['%.3f' % gt for gt in glitch_times])
)
self.gate_params = [
(gt, self.autogating_width, self.autogating_taper)
for gt in glitch_times
]
self.strain = gate_data(self.strain, self.gate_params)

if self.psd is None and self.wait_duration <=0:
Expand All @@ -1968,13 +2003,13 @@ def from_cli(cls, ifo, args):
state_channel = analyze_flags = None
if args.state_channel and ifo in args.state_channel \
and args.analyze_flags and ifo in args.analyze_flags:
state_channel = ':'.join([ifo, args.state_channel[ifo]])
state_channel = f'{ifo}:{args.state_channel[ifo]}'
analyze_flags = args.analyze_flags[ifo].split(',')

dq_channel = dq_flags = None
if args.data_quality_channel and ifo in args.data_quality_channel \
and args.data_quality_flags and ifo in args.data_quality_flags:
dq_channel = ':'.join([ifo, args.data_quality_channel[ifo]])
dq_channel = f'{ifo}:{args.data_quality_channel[ifo]}'
dq_flags = args.data_quality_flags[ifo].split(',')

idq_channel = None
Expand Down
Loading