Skip to content

Commit

Permalink
Add support for Comet-style server side filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
lpsinger committed Sep 4, 2024
1 parent e453216 commit 9c501a9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 9 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,24 @@ def handler(payload, root):
gcn.listen(handler=handler)
```

## Filtering
## Server-Side Filtering

VOEvent brokers that are powered by [Comet](https://comet.transientskp.org/) support [server-side filtering of alerts](https://comet.transientskp.org/en/stable/filtering.html). You configure the server-side filtering when you connect by supplying an [XPath expression](https://www.w3schools.com/xml/xpath_syntax.asp) in the optional `filter` argument for `gcn.listen`:

```python
gcn.listen(handler=handler, filter='insert-filter-here')
```

Here is a cheat sheet for some common filter expressions.

| Filter expression | What it does |
| - | - |
| `//Param[@name="Packet_Type" and @value="115"]` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) |
| `//Param[@name="Packet_Type" and @value="115"] and //Error2Radius<=6` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) with error radius less than or equal to 6° |
| `//Param[@name="Packet_Type" and (@value="112" or @value="115")]` | Pass only alerts of notice type 112 (`FERMI_GBM_GND_POS`) or 115 (`FERMI_GBM_FIN_POS`) |
| `starts-with(@ivorn, "ivo://gwnet/") and @role!="test"` | Pass only LIGO-Virgo-KAGRA gravitational-wave alerts that are not test alerts |

## Client-Side Filtering

You can also filter events by notice type using
`gcn.include_notice_types` or `gcn.exclude_notice_types`.
Expand Down
7 changes: 6 additions & 1 deletion gcn/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ def listen_main(args=None):
'provided, then loop over hosts until a connection '
'with one of them is established. '
'(default: %(default)s)')
parser.add_argument(
'--filter', help='Optional XPath expression for server-side event '
'filtering. Only supported by Comet brokers. See '
'https://comet.transientskp.org/en/stable/filtering.html for '
'supported syntax')
parser.add_argument('--version', action='version',
version='pygcn ' + __version__)
args = parser.parse_args(args)
Expand All @@ -85,7 +90,7 @@ def listen_main(args=None):

# Listen for GCN notices (until interrupted or killed)
host, port = [list(_) for _ in zip(*args.addr)]
listen(host=host, port=port, handler=handlers.archive)
listen(host=host, port=port, handler=handlers.archive, filter=args.filter)

Check warning on line 93 in gcn/cmdline.py

View check run for this annotation

Codecov / codecov/patch

gcn/cmdline.py#L93

Added line #L93 was not covered by tests


def serve_main(args=None):
Expand Down
30 changes: 23 additions & 7 deletions gcn/voeventclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import struct
import time
import itertools
from xml.sax.saxutils import quoteattr

from lxml.etree import fromstring, XMLSyntaxError

Expand All @@ -42,7 +43,7 @@

def _get_now_iso8601():
"""Get current date-time in ISO 8601 format."""
return datetime.datetime.now().isoformat()
return datetime.datetime.now().isoformat() + "Z"


def _open_socket(hosts_ports, iamalive_timeout, max_reconnect_timeout, log):
Expand Down Expand Up @@ -121,7 +122,7 @@ def _send_packet(sock, payload):
sock.sendall(_size_struct.pack(len(payload)) + payload)


def _form_response(role, origin, response, timestamp):
def _form_response(role, origin, response, timestamp, meta=''):
"""Form a VOEvent Transport Protocol packet suitable for sending an `ack`
or `iamalive` response."""
return (
Expand All @@ -133,11 +134,12 @@ def _form_response(role, origin, response, timestamp):
'Transport/v1.1 '
'http://telescope-networks.org/schema/Transport-v1.1.xsd"><Origin>' +
origin + '</Origin><Response>' + response +
'</Response><TimeStamp>' + timestamp +
'</TimeStamp></trn:Transport>').encode('UTF-8')
'</Response>' + '<Meta>' + meta + '</Meta>'
+ '<TimeStamp>' + timestamp + '</TimeStamp></trn:Transport>'
).encode('UTF-8')


def _ingest_packet(sock, ivorn, handler, log):
def _ingest_packet(sock, ivorn, handler, log, filter):
"""Ingest one VOEvent Transport Protocol packet and act on it, first
sending the appropriate response and then calling the handler if the
payload is a VOEvent."""
Expand All @@ -163,6 +165,13 @@ def _ingest_packet(sock, ivorn, handler, log):
root.find("Origin").text, ivorn,
_get_now_iso8601()))
log.debug("sent iamalive response")
elif root.attrib["role"] == "authenticate" and filter is not None:
log.debug("received authenticate message")
_send_packet(sock, _form_response("authenticate",

Check warning on line 170 in gcn/voeventclient.py

View check run for this annotation

Codecov / codecov/patch

gcn/voeventclient.py#L168-L170

Added lines #L168 - L170 were not covered by tests
root.find("Origin").text, ivorn,
_get_now_iso8601(),
f'<Param name="xpath-filter" value={filter}/>'))
log.debug("sent authenticate response")

Check warning on line 174 in gcn/voeventclient.py

View check run for this annotation

Codecov / codecov/patch

gcn/voeventclient.py#L174

Added line #L174 was not covered by tests
else:
log.error(
'received transport message with unrecognized role: %s',
Expand Down Expand Up @@ -212,7 +221,7 @@ def _validate_host_port(host, port):

def listen(host=("45.58.43.186", "68.169.57.253"), port=8099,
ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150,
max_reconnect_timeout=1024, handler=None, log=None):
max_reconnect_timeout=1024, handler=None, log=None, filter=None):
"""Connect to a VOEvent Transport Protocol server on the given `host` and
`port`, then listen for VOEvents until interrupted (i.e., by a keyboard
interrupt, `SIGINTR`, or `SIGTERM`).
Expand All @@ -233,10 +242,17 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099,
used for reporting the client's status. If `log` is not provided, a default
logger will be used.
If `filter` is provided, then it is passed to the server as an
`XPath filtering expression
<https://comet.transientskp.org/en/stable/filtering.html>`_.
Note that this function does not return."""
if log is None:
log = logging.getLogger('gcn.listen')

if filter is not None:
filter = quoteattr(filter)

Check warning on line 254 in gcn/voeventclient.py

View check run for this annotation

Codecov / codecov/patch

gcn/voeventclient.py#L254

Added line #L254 was not covered by tests

hosts_ports = itertools.cycle(zip(*_validate_host_port(host, port)))

while True:
Expand All @@ -246,7 +262,7 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099,

try:
while True:
_ingest_packet(sock, ivorn, handler, log)
_ingest_packet(sock, ivorn, handler, log, filter)
except socket.timeout:
log.warn("timed out")
except socket.error:
Expand Down

0 comments on commit 9c501a9

Please sign in to comment.