Skip to content

Commit

Permalink
Remove detection handling from real-time. TODO: handle detections in …
Browse files Browse the repository at this point in the history
…another process
  • Loading branch information
calum-chamberlain committed Jan 8, 2024
1 parent ebfd232 commit b4ba0cd
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 75 deletions.
14 changes: 11 additions & 3 deletions rt_eqcorrscan/event_trigger/triggers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Standardised trigger functions for use with a Reactor.
"""
import datetime
import logging
import numpy as np
from typing import Union, List, Optional
Expand Down Expand Up @@ -121,7 +122,7 @@ def inter_event_distance(


def average_rate(
catalog: Union[List[Detection], Catalog],
catalog: Union[List[Detection], List[UTCDateTime], List[datetime.datetime], Catalog],
starttime: Optional[UTCDateTime] = None,
endtime: Optional[UTCDateTime] = None
) -> float:
Expand Down Expand Up @@ -149,8 +150,15 @@ def average_rate(
if isinstance(catalog, Catalog):
event_times = sorted([event_time(e) for e in catalog])
elif isinstance(catalog, list):
assert all([isinstance(d, Detection) for d in catalog])
event_times = sorted([d.detect_time for d in catalog])
if isinstance(catalog[0], Detection):
assert all([isinstance(d, Detection) for d in catalog])
event_times = sorted([d.detect_time for d in catalog])
elif isinstance(catalog[0], UTCDateTime):
assert all([isinstance(d, UTCDateTime) for d in catalog])
event_times = sorted(catalog)
else:
assert all([isinstance(d, datetime.datetime) for d in catalog])
event_times = sorted([UTCDateTime(d) for d in catalog])
starttime = starttime or event_times[0]
endtime = endtime or event_times[-1]
duration = (endtime - starttime) / 86400.
Expand Down
158 changes: 86 additions & 72 deletions rt_eqcorrscan/rt_match_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def __init__(
self.plot_options.update({
key: value for key, value in plot_options.items()
if key != "plot_length"})
self.detections = []
self.detections = set()

# Wavebank status to avoid accessing the underlying, lockable, wavebank
if isinstance(wavebank, str):
Expand Down Expand Up @@ -227,10 +227,10 @@ def _ensure_templates_have_enough_stations(self, min_stations):

def _remove_old_detections(self, endtime: UTCDateTime) -> None:
""" Remove detections older than keep duration. Works in-place. """
# Use a copy to avoid changing list while iterating
# Use a copy to avoid changing while iterating
for d in copy.copy(self.detections):
if d.detect_time <= endtime:
self.detections.remove(d)
if d <= endtime:
self.detections.discard(d)

def _remove_unused_backfillers(
self,
Expand Down Expand Up @@ -436,77 +436,91 @@ def _handle_detections(
Logger.debug(f"New detection at {d.detect_time}")
# Cope with no picks and hence no origins - these events have to be removed
family.detections = [d for d in family if len(d.event.origins)]
if family.template.name not in _detected_templates:
self.party.families.append(family)
else:
self.party.select(family.template.name).detections.extend(
family.detections)

Logger.info("Removing duplicate detections")
Logger.info(f"Party contained {len(self.party)} before decluster")
if len(self.party) > 0:
# TODO: Need to remove detections from disk that are removed in decluster
self.party.decluster(
trig_int=trig_int, timing="origin", metric="cor_sum",
hypocentral_separation=hypocentral_separation)
Logger.info("Completed decluster")
Logger.info(f"Party contains {len(self.party)} after decluster")
Logger.info("Writing detections to disk")

# Cope with not being given a stream
read_st = False
if st is None and backfill_dir is None:
read_st = True

# TODO: Need a better way to keep track of written detections - unique keys for detections?
# TODO: This is slow, and for Kaikoura, this is what stops it from running in real time
for family in self.party:
for detection in family:
# TODO: this check doesn't necassarily work well - detections may be the same physical detection, but different Detection objects
if detection in self.detections:
continue
detect_file_base = _detection_filename(
detection=detection, detect_directory=detect_directory)
_filename = f"{detect_file_base}.xml"
if os.path.isfile(f"{detect_file_base}.xml") and skip_existing:
Logger.info(f"{_filename} exists, skipping")
continue
Logger.debug(f"Writing detection: {detection.detect_time}")
# TODO: Do not do this, let some other process work on making the waveforms.
if read_st:
max_shift = (
max(tr.stats.endtime for tr in family.template.st) -
min(tr.stats.starttime for tr in family.template.st))
bulk = [
(tr.stats.network,
tr.stats.station,
tr.stats.location,
tr.stats.channel,
(detection.detect_time - 5),
(detection.detect_time + max_shift + 5))
for tr in family.template.st]
st = self.wavebank.get_waveforms_bulk(bulk)
st_read = True
self._fig = _write_detection(
detection=detection,
detect_file_base=detect_file_base,
save_waveform=save_waveforms,
plot_detection=plot_detections, stream=st,
fig=self._fig, backfill_dir=backfill_dir,
detect_dir=detect_directory)
Logger.info("Expiring old detections")
# Empty self.detections
self.detections.clear()
for family in self.party:
Logger.debug(f"Checking for {family.template.name}")
family.detections = [
d for d in family.detections if d.detect_time >= earliest_detection_time]
Logger.debug(f"Appending {len(family)} detections")
for detection in family:
# Need to append rather than create a new object
self.detections.append(detection)
_filename = f"{detect_file_base}.pkl"
if not os.path.exists(_filename) or not skip_existing:
Logger.debug(f"Writing detection: {detection.detect_time}")
with open(_filename, "wb") as f:
pickle.dump(detection, f)
self.detections.add(detection.detect_time.datetime)
return

# TODO: Move all of this old handling of detections to a seperate post-process process.

#
# if family.template.name not in _detected_templates:
# self.party.families.append(family)
# else:
# self.party.select(family.template.name).detections.extend(
# family.detections)
#
# Logger.info("Removing duplicate detections")
# Logger.info(f"Party contained {len(self.party)} before decluster")
# if len(self.party) > 0:
# # TODO: Need to remove detections from disk that are removed in decluster
# self.party.decluster(
# trig_int=trig_int, timing="origin", metric="cor_sum",
# hypocentral_separation=hypocentral_separation)
# Logger.info("Completed decluster")
# Logger.info(f"Party contains {len(self.party)} after decluster")
# Logger.info("Writing detections to disk")

# # Cope with not being given a stream
# read_st = False
# if st is None and backfill_dir is None:
# read_st = True
#
# # TODO: Need a better way to keep track of written detections - unique keys for detections?
# # TODO: This is slow, and for Kaikoura, this is what stops it from running in real time
# for family in self.party:
# for detection in family:
# # TODO: this check doesn't necassarily work well - detections may be the same physical detection, but different Detection objects
# if detection in self.detections:
# continue
# detect_file_base = _detection_filename(
# detection=detection, detect_directory=detect_directory)
# _filename = f"{detect_file_base}.xml"
# if os.path.isfile(f"{detect_file_base}.xml") and skip_existing:
# Logger.info(f"{_filename} exists, skipping")
# continue
# Logger.debug(f"Writing detection: {detection.detect_time}")
# # TODO: Do not do this, let some other process work on making the waveforms.
# if read_st:
# max_shift = (
# max(tr.stats.endtime for tr in family.template.st) -
# min(tr.stats.starttime for tr in family.template.st))
# bulk = [
# (tr.stats.network,
# tr.stats.station,
# tr.stats.location,
# tr.stats.channel,
# (detection.detect_time - 5),
# (detection.detect_time + max_shift + 5))
# for tr in family.template.st]
# st = self.wavebank.get_waveforms_bulk(bulk)
# st_read = True
# self._fig = _write_detection(
# detection=detection,
# detect_file_base=detect_file_base,
# save_waveform=save_waveforms,
# plot_detection=plot_detections, stream=st,
# fig=self._fig, backfill_dir=backfill_dir,
# detect_dir=detect_directory)
# Logger.info("Expiring old detections")
# # Empty self.detections
# self.detections.clear()
# for family in self.party:
# Logger.debug(f"Checking for {family.template.name}")
# family.detections = [
# d for d in family.detections if d.detect_time >= earliest_detection_time]
# Logger.debug(f"Appending {len(family)} detections")
# for detection in family:
# # Need to append rather than create a new object
# self.detections.append(detection)
# return

def _plot(self) -> None: # pragma: no cover
""" Plot the data as it comes in. """
from rt_eqcorrscan.plotting.plot_buffer import EQcorrscanPlot
Expand All @@ -520,7 +534,7 @@ def _plot(self) -> None: # pragma: no cover
self.plotter = EQcorrscanPlot(
rt_client=self.rt_client, plot_length=self.plot_length,
tribe=self, inventory=self.inventory,
detections=self.detections,
detections=[],
exclude_channels=self.plotting_exclude_channels,
update_interval=update_interval, plot_height=plot_height,
plot_width=plot_width, offline=offline,
Expand Down Expand Up @@ -947,7 +961,7 @@ def run(
break
if minimum_rate and UTCDateTime.now() > run_start + self._min_run_length:
_rate = average_rate(
self.detections,
list(self.detections),
starttime=max(
self._stream_end - keep_detections, first_data),
endtime=self._stream_end)
Expand Down

0 comments on commit b4ba0cd

Please sign in to comment.