-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'dev' into feature/terra15
- Loading branch information
Showing
14 changed files
with
845 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,11 +4,12 @@ | |
:maxdepth: 1 | ||
xdas | ||
atoms | ||
io | ||
fft | ||
signal | ||
processing | ||
parallel | ||
processing | ||
signal | ||
synthetics | ||
virtual | ||
atoms | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
```{eval-rst} | ||
.. currentmodule:: xdas.io | ||
``` | ||
|
||
# xdas.io | ||
|
||
```{eval-rst} | ||
.. autosummary:: | ||
:toctree: ../_autosummary | ||
get_free_port | ||
``` | ||
|
||
```{eval-rst} | ||
.. currentmodule:: xdas.io.asn | ||
``` | ||
|
||
|
||
## ASN | ||
|
||
```{eval-rst} | ||
.. autosummary:: | ||
:toctree: ../_autosummary | ||
ZMQPublisher | ||
ZMQSubscriber | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,4 +12,6 @@ | |
DataArrayLoader | ||
RealTimeLoader | ||
DataArrayWriter | ||
ZMQPublisher | ||
ZMQSubscriber | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,4 +10,5 @@ interpolated-coordinates | |
convert-displacement | ||
atoms | ||
processing | ||
streaming | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
--- | ||
file_format: mystnb | ||
kernelspec: | ||
name: python3 | ||
--- | ||
|
||
# Streaming data | ||
|
||
Xdas allows to stream data over any network using [ZeroMQ](https://zeromq.org). Xdas use the Publisher and Subscriber patterns meaning that on one node the data is published and that any number of subscribers can receive the data stream. | ||
|
||
Streaming data with Xdas is done by simply dumping each chunk to NetCDF binaries and to send those as packets. This ensure that each packet is self described and that feature such as compression are available (which can be very helpful to minimize the used bandwidth). | ||
|
||
Xdas implements the {py:class}`~xdas.processing.ZMQPublisher` and {py:class}`~xdas.processing.ZMQSubscriber`.Those object can respectively be used as a Writer and a Loader as described in the [](processing) section. Both are initialized by giving an network address. The publisher use the `submit` method to send packets while the subscriber is an infinite iterator that yields packets. | ||
|
||
In this section, we will mimic the use of several machine by using multithreading, where each thread is supposed to be a different machine. In real-life application, the publisher and subscriber are generally called in different machine or software. | ||
|
||
## Simple use case | ||
|
||
```{code-cell} | ||
import threading | ||
import time | ||
import xdas as xd | ||
from xdas.processing import ZMQPublisher, ZMQSubscriber | ||
``` | ||
|
||
First we generate some data and split it into packets | ||
|
||
```{code-cell} | ||
da = xd.synthetics.dummy() | ||
packets = xd.split(da, 5) | ||
``` | ||
|
||
We then publish the packets on machine 1. | ||
|
||
```{code-cell} | ||
address = f"tcp://localhost:{xd.io.get_free_port()}" | ||
publisher = ZMQPublisher(address) | ||
def publish(): | ||
for packet in packets: | ||
publisher.submit(packet) | ||
# give a chance to the subscriber to connect in time and to get the last packet | ||
time.sleep(0.1) | ||
machine1 = threading.Thread(target=publish) | ||
machine1.start() | ||
``` | ||
|
||
Let's receive the packets on machine 2. | ||
|
||
```{code-cell} | ||
subscriber = ZMQSubscriber(address) | ||
packets = [] | ||
def subscribe(): | ||
for packet in subscriber: | ||
packets.append(packet) | ||
machine2 = threading.Thread(target=subscribe) | ||
machine2.start() | ||
``` | ||
|
||
Now we wait for machine 1 to finish sending its packet and see if everything went well. | ||
|
||
```{code-cell} | ||
machine1.join() | ||
print(f"We received {len(packets)} packets!") | ||
assert xd.concatenate(packets).equals(da) | ||
``` | ||
|
||
## Using encoding | ||
|
||
To reduce the volume of the transmitted data, compression is often useful. Xdas enable the use of the ZFP algorithm when storing data but also when streaming it. Encoding is declared the same way. | ||
|
||
```{code-cell} | ||
:tags: [remove-output] | ||
import hdf5plugin | ||
address = f"tcp://localhost:{xd.io.get_free_port()}" | ||
encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)} | ||
publisher = ZMQPublisher(address, encoding) # Add encoding here, the rest is the same | ||
``` | ||
|
||
{py:class}`~xdas.io.asn.ZMQSubscriber` | ||
|
||
```{note} | ||
Xdas also implements the ZeroMQ protocol used by the OptoDAS interrogators by ASN. Equivalent {py:class}`~xdas.io.asn.ZMQPublisher` and {py:class}`~xdas.io.asn.ZMQSubscriber` can be found in {py:mod}`xdas.io.asn`. This can be useful get data in real-time from one instrument of that kind. Note that compression is not available with that protocol yet. | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ dependencies = [ | |
"watchdog", | ||
"xarray", | ||
"xinterp", | ||
"pyzmq", | ||
] | ||
|
||
[project.optional-dependencies] | ||
|
Oops, something went wrong.