From 851a4cca7102f7222390edcfce7404792f6f34cc Mon Sep 17 00:00:00 2001 From: Tito Dal Canton Date: Mon, 5 Aug 2024 16:00:10 +0200 Subject: [PATCH 1/4] Start work on reading state vector first --- pycbc/frame/frame.py | 20 +++++++++------ pycbc/strain/strain.py | 55 +++++++++++++++++++++++++++--------------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/pycbc/frame/frame.py b/pycbc/frame/frame.py index bea7386418a..06cafa9dbec 100644 --- a/pycbc/frame/frame.py +++ b/pycbc/frame/frame.py @@ -589,8 +589,8 @@ def _read_frame(self, blocksize): return TimeSeries(data.data.data, delta_t=data.deltaT, epoch=self.read_pos, dtype=dtype) - except Exception: - raise RuntimeError('Cannot read {0} frame data'.format(self.channel_name)) + except: + raise RuntimeError(f'Cannot read {self.channel_name} frame data') def null_advance(self, blocksize): """Advance and insert zeros @@ -826,19 +826,22 @@ def indices_of_flag(self, start_time, duration, times, padding=0): return idx def advance(self, blocksize): - """ Add blocksize seconds more to the buffer, push blocksize seconds - from the beginning. + """Read `blocksize` seconds of new status data, append it to the end of + the internal status buffer, and drop `blocksize` seconds from the + beginning of the buffer. Check if the newly added status data indicates + usable or unusable strain data. Parameters ---------- blocksize: int - The number of seconds to attempt to read from the channel + The number of seconds to attempt to read from the channel. Returns ------- status: boolean - Returns True if all of the status information if valid, - False if any is not. + Returns True if all of the status information if valid, False if + any is not, None if there was a problem reading the status + information. """ try: if self.increment_update_cache: @@ -847,7 +850,8 @@ def advance(self, blocksize): return self.check_valid(ts) except RuntimeError: self.null_advance(blocksize) - return False + return None + class iDQBuffer(object): diff --git a/pycbc/strain/strain.py b/pycbc/strain/strain.py index 4ff3b01d061..b383cd5c9e3 100644 --- a/pycbc/strain/strain.py +++ b/pycbc/strain/strain.py @@ -1863,17 +1863,47 @@ def advance(self, blocksize, timeout=10): status: boolean Returns True if this block is analyzable. """ - ts = super(StrainBuffer, self).attempt_advance(blocksize, timeout=timeout) - self.blocksize = blocksize + if self.state: + # We are using information from the state vector, so check what is + # says about the new strain data. + state_advance_result = self.state.advance(blocksize) + if not state_advance_result: + # Something about the state vector indicates a problem. + if state_advance_result is None: + logger.warning( + "Failed to read %s state vector. Problem with the " + "frame files, or analysis configured incorrectly", + self.detector + ) + else: + logger.info( + "%s state vector indicates unusable data", self.detector + ) + # Either way, give up here; we will not analyze the new segment + # of strain data. + self.add_hard_count() + self.null_advance_strain(blocksize) + if self.dq: + self.dq.null_advance(blocksize) + if self.idq: + self.idq.null_advance(blocksize) + return False + # Either we are not using the state vector, or the state vector says + # the data is ok to analyze. So try to get the next segment of data. + ts = super(StrainBuffer, self).attempt_advance( + blocksize, timeout=timeout + ) + self.blocksize = blocksize self.gate_params = [] - # We have given up so there is no time series if ts is None: - logger.info("%s frame is late, giving up", self.detector) + logger.warning( + "Failed to read %s strain channel. Problem with the frame " + "files, or analysis configured incorrectly", + self.detector + ) self.null_advance_strain(blocksize) - if self.state: - self.state.null_advance(blocksize) if self.dq: self.dq.null_advance(blocksize) if self.idq: @@ -1883,19 +1913,6 @@ def advance(self, blocksize, timeout=10): # We collected some data so we are closer to being able to analyze data self.wait_duration -= blocksize - # If the data we got was invalid, reset the counter on how much to collect - # This behavior corresponds to how we handle CAT1 vetoes - if self.state and self.state.advance(blocksize) is False: - self.add_hard_count() - self.null_advance_strain(blocksize) - if self.dq: - self.dq.null_advance(blocksize) - if self.idq: - self.idq.null_advance(blocksize) - logger.info("%s time has invalid data, resetting buffer", - self.detector) - return False - # Also advance the dq vector and idq timeseries in lockstep if self.dq: self.dq.advance(blocksize) From d0f496b9a5a8fa43f63d28b52a36fef47575446a Mon Sep 17 00:00:00 2001 From: Tito Dal Canton Date: Tue, 6 Aug 2024 12:51:57 +0200 Subject: [PATCH 2/4] Code reformatting --- pycbc/strain/strain.py | 53 ++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/pycbc/strain/strain.py b/pycbc/strain/strain.py index b383cd5c9e3..7ef407bc030 100644 --- a/pycbc/strain/strain.py +++ b/pycbc/strain/strain.py @@ -1930,13 +1930,17 @@ def advance(self, blocksize, timeout=10): start = len(self.raw_buffer) - csize * self.factor strain = self.raw_buffer[start:] - strain = pycbc.filter.highpass_fir(strain, self.highpass_frequency, - self.highpass_samples, - beta=self.beta) + strain = pycbc.filter.highpass_fir( + strain, + self.highpass_frequency, + self.highpass_samples, + beta=self.beta + ) strain = (strain * self.dyn_range_fac).astype(numpy.float32) - strain = pycbc.filter.resample_to_delta_t(strain, - 1.0/self.sample_rate, method='ldas') + strain = pycbc.filter.resample_to_delta_t( + strain, 1.0 / self.sample_rate, method='ldas' + ) # remove corruption at beginning strain = strain[self.corruption:] @@ -1945,7 +1949,8 @@ def advance(self, blocksize, timeout=10): if self.taper_immediate_strain: logger.info("Tapering start of %s strain block", self.detector) strain = gate_data( - strain, [(strain.start_time, 0., self.autogating_taper)]) + strain, [(strain.start_time, 0., self.autogating_taper)] + ) self.taper_immediate_strain = False # Stitch into continuous stream @@ -1956,20 +1961,28 @@ def advance(self, blocksize, timeout=10): # apply gating if needed if self.autogating_threshold is not None: autogating_duration_length = self.autogating_duration * self.sample_rate - autogating_start_sample = int(len(self.strain) - autogating_duration_length) + autogating_start_sample = int( + len(self.strain) - autogating_duration_length + ) glitch_times = detect_loud_glitches( - self.strain[autogating_start_sample:-self.corruption], - psd_duration=self.autogating_psd_segment_length, psd_stride=self.autogating_psd_stride, - threshold=self.autogating_threshold, - cluster_window=self.autogating_cluster, - low_freq_cutoff=self.highpass_frequency, - corrupt_time=self.autogating_pad) + self.strain[autogating_start_sample:-self.corruption], + psd_duration=self.autogating_psd_segment_length, + psd_stride=self.autogating_psd_stride, + threshold=self.autogating_threshold, + cluster_window=self.autogating_cluster, + low_freq_cutoff=self.highpass_frequency, + corrupt_time=self.autogating_pad + ) if len(glitch_times) > 0: - logger.info('Autogating %s at %s', self.detector, - ', '.join(['%.3f' % gt for gt in glitch_times])) - self.gate_params = \ - [(gt, self.autogating_width, self.autogating_taper) - for gt in glitch_times] + logger.info( + 'Autogating %s at %s', + self.detector, + ', '.join(['%.3f' % gt for gt in glitch_times]) + ) + self.gate_params = [ + (gt, self.autogating_width, self.autogating_taper) + for gt in glitch_times + ] self.strain = gate_data(self.strain, self.gate_params) if self.psd is None and self.wait_duration <=0: @@ -1985,13 +1998,13 @@ def from_cli(cls, ifo, args): state_channel = analyze_flags = None if args.state_channel and ifo in args.state_channel \ and args.analyze_flags and ifo in args.analyze_flags: - state_channel = ':'.join([ifo, args.state_channel[ifo]]) + state_channel = f'{ifo}:{args.state_channel[ifo]}' analyze_flags = args.analyze_flags[ifo].split(',') dq_channel = dq_flags = None if args.data_quality_channel and ifo in args.data_quality_channel \ and args.data_quality_flags and ifo in args.data_quality_flags: - dq_channel = ':'.join([ifo, args.data_quality_channel[ifo]]) + dq_channel = f'{ifo}:{args.data_quality_channel[ifo]}' dq_flags = args.data_quality_flags[ifo].split(',') idq_channel = None From 831501790a7a75945e3fd820b38b15bf52e7f181 Mon Sep 17 00:00:00 2001 From: Tito Dal Canton Date: Tue, 6 Aug 2024 13:31:42 +0200 Subject: [PATCH 3/4] Code Climate / reformatting --- pycbc/frame/frame.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pycbc/frame/frame.py b/pycbc/frame/frame.py index 06cafa9dbec..924c810a745 100644 --- a/pycbc/frame/frame.py +++ b/pycbc/frame/frame.py @@ -519,7 +519,9 @@ def __init__(self, frame_src, self.detector = channel_name.split(':')[0] self.update_cache() - self.channel_type, self.raw_sample_rate = self._retrieve_metadata(self.stream, self.channel_name) + self.channel_type, self.raw_sample_rate = self._retrieve_metadata( + self.stream, self.channel_name + ) raw_size = self.raw_sample_rate * max_buffer self.raw_buffer = TimeSeries(zeros(raw_size, dtype=dtype), @@ -584,12 +586,20 @@ def _read_frame(self, blocksize): try: read_func = _fr_type_map[self.channel_type][0] dtype = _fr_type_map[self.channel_type][1] - data = read_func(self.stream, self.channel_name, - self.read_pos, int(blocksize), 0) - return TimeSeries(data.data.data, delta_t=data.deltaT, - epoch=self.read_pos, - dtype=dtype) - except: + data = read_func( + self.stream, + self.channel_name, + self.read_pos, + int(blocksize), + 0 + ) + return TimeSeries( + data.data.data, + delta_t=data.deltaT, + epoch=self.read_pos, + dtype=dtype + ) + except Exception: raise RuntimeError(f'Cannot read {self.channel_name} frame data') def null_advance(self, blocksize): @@ -664,8 +674,9 @@ def update_cache_by_increment(self, blocksize): cache = locations_to_cache(keys) stream = lalframe.FrStreamCacheOpen(cache) self.stream = stream - self.channel_type, self.raw_sample_rate = \ - self._retrieve_metadata(self.stream, self.channel_name) + self.channel_type, self.raw_sample_rate = self._retrieve_metadata( + self.stream, self.channel_name + ) def attempt_advance(self, blocksize, timeout=10): """ Attempt to advance the frame buffer. Retry upon failure, except From 64e090e93a0ef7b9a69ede670d6d9d5e34b35d67 Mon Sep 17 00:00:00 2001 From: Tito Dal Canton Date: Tue, 6 Aug 2024 13:57:50 +0200 Subject: [PATCH 4/4] =?UTF-8?q?This=20is=20a=20bit=20more=20complicated?= =?UTF-8?q?=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pycbc/frame/frame.py | 6 ++++++ pycbc/strain/strain.py | 11 ++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pycbc/frame/frame.py b/pycbc/frame/frame.py index 924c810a745..f0d0570c047 100644 --- a/pycbc/frame/frame.py +++ b/pycbc/frame/frame.py @@ -863,6 +863,12 @@ def advance(self, blocksize): self.null_advance(blocksize) return None + def attempt_advance(self, blocksize, timeout=10): + result = super().attempt_advance(blocksize, timeout) + if result is None: + return None + return self.check_valid(result) + class iDQBuffer(object): diff --git a/pycbc/strain/strain.py b/pycbc/strain/strain.py index 7ef407bc030..08cfa15f3f6 100644 --- a/pycbc/strain/strain.py +++ b/pycbc/strain/strain.py @@ -1848,7 +1848,7 @@ def null_advance_strain(self, blocksize): self.taper_immediate_strain = True def advance(self, blocksize, timeout=10): - """Advanced buffer blocksize seconds. + """Advance buffer blocksize seconds. Add blocksize seconds more to the buffer, push blocksize seconds from the beginning. @@ -1856,7 +1856,10 @@ def advance(self, blocksize, timeout=10): Parameters ---------- blocksize: int - The number of seconds to attempt to read from the channel + The number of seconds to attempt to read from the channel. + timeout: {int, 10} + Maximum time (in seconds) to wait before declaring frame files are + too late and reporting unusable data. Returns ------- @@ -1866,7 +1869,9 @@ def advance(self, blocksize, timeout=10): if self.state: # We are using information from the state vector, so check what is # says about the new strain data. - state_advance_result = self.state.advance(blocksize) + state_advance_result = self.state.attempt_advance( + blocksize, timeout=timeout + ) if not state_advance_result: # Something about the state vector indicates a problem. if state_advance_result is None: