From 9c501a9e6bab0d8dff4b35eab4d0f25dbbdc3708 Mon Sep 17 00:00:00 2001 From: Leo Singer Date: Tue, 3 Sep 2024 15:37:48 -0400 Subject: [PATCH] Add support for Comet-style server side filtering See https://comet.transientskp.org/en/stable/filtering.html. --- README.md | 19 ++++++++++++++++++- gcn/cmdline.py | 7 ++++++- gcn/voeventclient.py | 30 +++++++++++++++++++++++------- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index bcef271..9d717a2 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,24 @@ def handler(payload, root): gcn.listen(handler=handler) ``` -## Filtering +## Server-Side Filtering + +VOEvent brokers that are powered by [Comet](https://comet.transientskp.org/) support [server-side filtering of alerts](https://comet.transientskp.org/en/stable/filtering.html). You configure the server-side filtering when you connect by supplying an [XPath expression](https://www.w3schools.com/xml/xpath_syntax.asp) in the optional `filter` argument for `gcn.listen`: + +```python +gcn.listen(handler=handler, filter='insert-filter-here') +``` + +Here is a cheat sheet for some common filter expressions. + +| Filter expression | What it does | +| - | - | +| `//Param[@name="Packet_Type" and @value="115"]` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) | +| `//Param[@name="Packet_Type" and @value="115"] and //Error2Radius<=6` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) with error radius less than or equal to 6° | +| `//Param[@name="Packet_Type" and (@value="112" or @value="115")]` | Pass only alerts of notice type 112 (`FERMI_GBM_GND_POS`) or 115 (`FERMI_GBM_FIN_POS`) | +| `starts-with(@ivorn, "ivo://gwnet/") and @role!="test"` | Pass only LIGO-Virgo-KAGRA gravitational-wave alerts that are not test alerts | + +## Client-Side Filtering You can also filter events by notice type using `gcn.include_notice_types` or `gcn.exclude_notice_types`. diff --git a/gcn/cmdline.py b/gcn/cmdline.py index cad594b..aecc7db 100644 --- a/gcn/cmdline.py +++ b/gcn/cmdline.py @@ -76,6 +76,11 @@ def listen_main(args=None): 'provided, then loop over hosts until a connection ' 'with one of them is established. ' '(default: %(default)s)') + parser.add_argument( + '--filter', help='Optional XPath expression for server-side event ' + 'filtering. Only supported by Comet brokers. See ' + 'https://comet.transientskp.org/en/stable/filtering.html for ' + 'supported syntax') parser.add_argument('--version', action='version', version='pygcn ' + __version__) args = parser.parse_args(args) @@ -85,7 +90,7 @@ def listen_main(args=None): # Listen for GCN notices (until interrupted or killed) host, port = [list(_) for _ in zip(*args.addr)] - listen(host=host, port=port, handler=handlers.archive) + listen(host=host, port=port, handler=handlers.archive, filter=args.filter) def serve_main(args=None): diff --git a/gcn/voeventclient.py b/gcn/voeventclient.py index 33d2753..dc6c335 100644 --- a/gcn/voeventclient.py +++ b/gcn/voeventclient.py @@ -25,6 +25,7 @@ import struct import time import itertools +from xml.sax.saxutils import quoteattr from lxml.etree import fromstring, XMLSyntaxError @@ -42,7 +43,7 @@ def _get_now_iso8601(): """Get current date-time in ISO 8601 format.""" - return datetime.datetime.now().isoformat() + return datetime.datetime.now().isoformat() + "Z" def _open_socket(hosts_ports, iamalive_timeout, max_reconnect_timeout, log): @@ -121,7 +122,7 @@ def _send_packet(sock, payload): sock.sendall(_size_struct.pack(len(payload)) + payload) -def _form_response(role, origin, response, timestamp): +def _form_response(role, origin, response, timestamp, meta=''): """Form a VOEvent Transport Protocol packet suitable for sending an `ack` or `iamalive` response.""" return ( @@ -133,11 +134,12 @@ def _form_response(role, origin, response, timestamp): 'Transport/v1.1 ' 'http://telescope-networks.org/schema/Transport-v1.1.xsd">' + origin + '' + response + - '' + timestamp + - '').encode('UTF-8') + '' + '' + meta + '' + + '' + timestamp + '' + ).encode('UTF-8') -def _ingest_packet(sock, ivorn, handler, log): +def _ingest_packet(sock, ivorn, handler, log, filter): """Ingest one VOEvent Transport Protocol packet and act on it, first sending the appropriate response and then calling the handler if the payload is a VOEvent.""" @@ -163,6 +165,13 @@ def _ingest_packet(sock, ivorn, handler, log): root.find("Origin").text, ivorn, _get_now_iso8601())) log.debug("sent iamalive response") + elif root.attrib["role"] == "authenticate" and filter is not None: + log.debug("received authenticate message") + _send_packet(sock, _form_response("authenticate", + root.find("Origin").text, ivorn, + _get_now_iso8601(), + f'')) + log.debug("sent authenticate response") else: log.error( 'received transport message with unrecognized role: %s', @@ -212,7 +221,7 @@ def _validate_host_port(host, port): def listen(host=("45.58.43.186", "68.169.57.253"), port=8099, ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150, - max_reconnect_timeout=1024, handler=None, log=None): + max_reconnect_timeout=1024, handler=None, log=None, filter=None): """Connect to a VOEvent Transport Protocol server on the given `host` and `port`, then listen for VOEvents until interrupted (i.e., by a keyboard interrupt, `SIGINTR`, or `SIGTERM`). @@ -233,10 +242,17 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099, used for reporting the client's status. If `log` is not provided, a default logger will be used. + If `filter` is provided, then it is passed to the server as an + `XPath filtering expression + `_. + Note that this function does not return.""" if log is None: log = logging.getLogger('gcn.listen') + if filter is not None: + filter = quoteattr(filter) + hosts_ports = itertools.cycle(zip(*_validate_host_port(host, port))) while True: @@ -246,7 +262,7 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099, try: while True: - _ingest_packet(sock, ivorn, handler, log) + _ingest_packet(sock, ivorn, handler, log, filter) except socket.timeout: log.warn("timed out") except socket.error: