From b4ba0cd56ad506d6da79b77f208e11f4b6f31f02 Mon Sep 17 00:00:00 2001 From: Calum Chamberlain Date: Mon, 8 Jan 2024 16:34:49 +1300 Subject: [PATCH] Remove detection handling from real-time. TODO: handle detections in another process --- rt_eqcorrscan/event_trigger/triggers.py | 14 ++- rt_eqcorrscan/rt_match_filter.py | 158 +++++++++++++----------- 2 files changed, 97 insertions(+), 75 deletions(-) diff --git a/rt_eqcorrscan/event_trigger/triggers.py b/rt_eqcorrscan/event_trigger/triggers.py index 1858486..4fc6b0e 100644 --- a/rt_eqcorrscan/event_trigger/triggers.py +++ b/rt_eqcorrscan/event_trigger/triggers.py @@ -1,6 +1,7 @@ """ Standardised trigger functions for use with a Reactor. """ +import datetime import logging import numpy as np from typing import Union, List, Optional @@ -121,7 +122,7 @@ def inter_event_distance( def average_rate( - catalog: Union[List[Detection], Catalog], + catalog: Union[List[Detection], List[UTCDateTime], List[datetime.datetime], Catalog], starttime: Optional[UTCDateTime] = None, endtime: Optional[UTCDateTime] = None ) -> float: @@ -149,8 +150,15 @@ def average_rate( if isinstance(catalog, Catalog): event_times = sorted([event_time(e) for e in catalog]) elif isinstance(catalog, list): - assert all([isinstance(d, Detection) for d in catalog]) - event_times = sorted([d.detect_time for d in catalog]) + if isinstance(catalog[0], Detection): + assert all([isinstance(d, Detection) for d in catalog]) + event_times = sorted([d.detect_time for d in catalog]) + elif isinstance(catalog[0], UTCDateTime): + assert all([isinstance(d, UTCDateTime) for d in catalog]) + event_times = sorted(catalog) + else: + assert all([isinstance(d, datetime.datetime) for d in catalog]) + event_times = sorted([UTCDateTime(d) for d in catalog]) starttime = starttime or event_times[0] endtime = endtime or event_times[-1] duration = (endtime - starttime) / 86400. diff --git a/rt_eqcorrscan/rt_match_filter.py b/rt_eqcorrscan/rt_match_filter.py index a8fbb72..c0342bf 100644 --- a/rt_eqcorrscan/rt_match_filter.py +++ b/rt_eqcorrscan/rt_match_filter.py @@ -133,7 +133,7 @@ def __init__( self.plot_options.update({ key: value for key, value in plot_options.items() if key != "plot_length"}) - self.detections = [] + self.detections = set() # Wavebank status to avoid accessing the underlying, lockable, wavebank if isinstance(wavebank, str): @@ -227,10 +227,10 @@ def _ensure_templates_have_enough_stations(self, min_stations): def _remove_old_detections(self, endtime: UTCDateTime) -> None: """ Remove detections older than keep duration. Works in-place. """ - # Use a copy to avoid changing list while iterating + # Use a copy to avoid changing while iterating for d in copy.copy(self.detections): - if d.detect_time <= endtime: - self.detections.remove(d) + if d <= endtime: + self.detections.discard(d) def _remove_unused_backfillers( self, @@ -436,77 +436,91 @@ def _handle_detections( Logger.debug(f"New detection at {d.detect_time}") # Cope with no picks and hence no origins - these events have to be removed family.detections = [d for d in family if len(d.event.origins)] - if family.template.name not in _detected_templates: - self.party.families.append(family) - else: - self.party.select(family.template.name).detections.extend( - family.detections) - - Logger.info("Removing duplicate detections") - Logger.info(f"Party contained {len(self.party)} before decluster") - if len(self.party) > 0: - # TODO: Need to remove detections from disk that are removed in decluster - self.party.decluster( - trig_int=trig_int, timing="origin", metric="cor_sum", - hypocentral_separation=hypocentral_separation) - Logger.info("Completed decluster") - Logger.info(f"Party contains {len(self.party)} after decluster") - Logger.info("Writing detections to disk") - - # Cope with not being given a stream - read_st = False - if st is None and backfill_dir is None: - read_st = True - - # TODO: Need a better way to keep track of written detections - unique keys for detections? - # TODO: This is slow, and for Kaikoura, this is what stops it from running in real time - for family in self.party: for detection in family: - # TODO: this check doesn't necassarily work well - detections may be the same physical detection, but different Detection objects - if detection in self.detections: - continue detect_file_base = _detection_filename( detection=detection, detect_directory=detect_directory) - _filename = f"{detect_file_base}.xml" - if os.path.isfile(f"{detect_file_base}.xml") and skip_existing: - Logger.info(f"{_filename} exists, skipping") - continue - Logger.debug(f"Writing detection: {detection.detect_time}") - # TODO: Do not do this, let some other process work on making the waveforms. - if read_st: - max_shift = ( - max(tr.stats.endtime for tr in family.template.st) - - min(tr.stats.starttime for tr in family.template.st)) - bulk = [ - (tr.stats.network, - tr.stats.station, - tr.stats.location, - tr.stats.channel, - (detection.detect_time - 5), - (detection.detect_time + max_shift + 5)) - for tr in family.template.st] - st = self.wavebank.get_waveforms_bulk(bulk) - st_read = True - self._fig = _write_detection( - detection=detection, - detect_file_base=detect_file_base, - save_waveform=save_waveforms, - plot_detection=plot_detections, stream=st, - fig=self._fig, backfill_dir=backfill_dir, - detect_dir=detect_directory) - Logger.info("Expiring old detections") - # Empty self.detections - self.detections.clear() - for family in self.party: - Logger.debug(f"Checking for {family.template.name}") - family.detections = [ - d for d in family.detections if d.detect_time >= earliest_detection_time] - Logger.debug(f"Appending {len(family)} detections") - for detection in family: - # Need to append rather than create a new object - self.detections.append(detection) + _filename = f"{detect_file_base}.pkl" + if not os.path.exists(_filename) or not skip_existing: + Logger.debug(f"Writing detection: {detection.detect_time}") + with open(_filename, "wb") as f: + pickle.dump(detection, f) + self.detections.add(detection.detect_time.datetime) return + # TODO: Move all of this old handling of detections to a seperate post-process process. + + # + # if family.template.name not in _detected_templates: + # self.party.families.append(family) + # else: + # self.party.select(family.template.name).detections.extend( + # family.detections) + # + # Logger.info("Removing duplicate detections") + # Logger.info(f"Party contained {len(self.party)} before decluster") + # if len(self.party) > 0: + # # TODO: Need to remove detections from disk that are removed in decluster + # self.party.decluster( + # trig_int=trig_int, timing="origin", metric="cor_sum", + # hypocentral_separation=hypocentral_separation) + # Logger.info("Completed decluster") + # Logger.info(f"Party contains {len(self.party)} after decluster") + # Logger.info("Writing detections to disk") + + # # Cope with not being given a stream + # read_st = False + # if st is None and backfill_dir is None: + # read_st = True + # + # # TODO: Need a better way to keep track of written detections - unique keys for detections? + # # TODO: This is slow, and for Kaikoura, this is what stops it from running in real time + # for family in self.party: + # for detection in family: + # # TODO: this check doesn't necassarily work well - detections may be the same physical detection, but different Detection objects + # if detection in self.detections: + # continue + # detect_file_base = _detection_filename( + # detection=detection, detect_directory=detect_directory) + # _filename = f"{detect_file_base}.xml" + # if os.path.isfile(f"{detect_file_base}.xml") and skip_existing: + # Logger.info(f"{_filename} exists, skipping") + # continue + # Logger.debug(f"Writing detection: {detection.detect_time}") + # # TODO: Do not do this, let some other process work on making the waveforms. + # if read_st: + # max_shift = ( + # max(tr.stats.endtime for tr in family.template.st) - + # min(tr.stats.starttime for tr in family.template.st)) + # bulk = [ + # (tr.stats.network, + # tr.stats.station, + # tr.stats.location, + # tr.stats.channel, + # (detection.detect_time - 5), + # (detection.detect_time + max_shift + 5)) + # for tr in family.template.st] + # st = self.wavebank.get_waveforms_bulk(bulk) + # st_read = True + # self._fig = _write_detection( + # detection=detection, + # detect_file_base=detect_file_base, + # save_waveform=save_waveforms, + # plot_detection=plot_detections, stream=st, + # fig=self._fig, backfill_dir=backfill_dir, + # detect_dir=detect_directory) + # Logger.info("Expiring old detections") + # # Empty self.detections + # self.detections.clear() + # for family in self.party: + # Logger.debug(f"Checking for {family.template.name}") + # family.detections = [ + # d for d in family.detections if d.detect_time >= earliest_detection_time] + # Logger.debug(f"Appending {len(family)} detections") + # for detection in family: + # # Need to append rather than create a new object + # self.detections.append(detection) + # return + def _plot(self) -> None: # pragma: no cover """ Plot the data as it comes in. """ from rt_eqcorrscan.plotting.plot_buffer import EQcorrscanPlot @@ -520,7 +534,7 @@ def _plot(self) -> None: # pragma: no cover self.plotter = EQcorrscanPlot( rt_client=self.rt_client, plot_length=self.plot_length, tribe=self, inventory=self.inventory, - detections=self.detections, + detections=[], exclude_channels=self.plotting_exclude_channels, update_interval=update_interval, plot_height=plot_height, plot_width=plot_width, offline=offline, @@ -947,7 +961,7 @@ def run( break if minimum_rate and UTCDateTime.now() > run_start + self._min_run_length: _rate = average_rate( - self.detections, + list(self.detections), starttime=max( self._stream_end - keep_detections, first_data), endtime=self._stream_end)