From d4baa8d2217809975dac07e247c4ec439942298f Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Fri, 6 Sep 2024 15:57:21 +0200 Subject: [PATCH 01/19] Add ZMQ streaming capabilities for ASN. --- xdas/io/asn.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/xdas/io/asn.py b/xdas/io/asn.py index dc6f8a2..e2e0e8b 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -1,5 +1,9 @@ +import json +import struct + import h5py import numpy as np +import zmq from ..core.dataarray import DataArray from ..virtual import VirtualSource @@ -16,3 +20,144 @@ def read(fname): time = {"tie_indices": [0, nt - 1], "tie_values": [t0, t0 + (nt - 1) * dt]} distance = {"tie_indices": [0, nx - 1], "tie_values": [0.0, (nx - 1) * dx]} return DataArray(data, {"time": time, "distance": distance}) + + +class ZMQStream: + """ + A class representing a ZeroMQ stream. + + Parameters + ---------- + address : str + The address to connect to. + + Attributes + ---------- + socket : zmq.Socket + The ZeroMQ socket used for communication. + packet_size : int + The size of each packet in bytes. + shape : tuple + The shape of the data array. + format : str + The format string used for unpacking the data. + distance : dict + The distance information. + dt : numpy.timedelta64 + The sampling time interval. + nt : int + The number of time samples per message. + + Methods + ------- + connect(address) + Connects to the specified address. + get_message() + Receives a message from the socket. + is_packet(message) + Checks if the message is a valid packet. + update_header(message) + Updates the header information based on the received message. + stream_packet(message) + Processes a packet and returns a DataArray object. + """ + + def __init__(self, address): + """ + Initializes a ZMQStream object. + + Parameters + ---------- + address : str + The address to connect to. + """ + self.connect(address) + message = self.get_message() + self.update_header(message) + + def __iter__(self): + return self + + def __next__(self): + message = self.get_message() + if not self.is_packet(message): + self.update_header(message) + return self.__next__() + else: + return self.stream_packet(message) + + def connect(self, address): + context = zmq.Context() + socket = context.socket(zmq.SUB) + socket.connect(address) + socket.setsockopt_string(zmq.SUBSCRIBE, "") + self.socket = socket + + def get_message(self): + return self.socket.recv() + + def is_packet(self, message): + return len(message) == self.packet_size + + def update_header(self, message): + header = json.loads(message.decode("utf-8")) + + self.packet_size = 8 + header["bytesPerPackage"] * header["nPackagesPerMessage"] + self.shape = (header["nPackagesPerMessage"], header["nChannels"]) + + self.format = "%d%s" % ( + header["nChannels"] * header["nPackagesPerMessage"], + "f" if header["dataType"] == "float" else "h", + ) + + roiTable = header["roiTable"][0] + di = roiTable["roiStart"] * header["dx"] + de = roiTable["roiEnd"] * header["dx"] + self.distance = { + "tie_indices": [0, header["nChannels"] - 1], + "tie_values": [di, de], + } + + self.dt = float_to_timedelta(header["dt"], header["dtUnit"]) + self.nt = header["nPackagesPerMessage"] + + def stream_packet(self, message): + t0 = np.datetime64(struct.unpack(">> float_to_timedelta(1.5, 'ms') + numpy.timedelta64(1500000,'ns') + """ + conversion_factors = { + "ns": 1e0, + "us": 1e3, + "ms": 1e6, + "s": 1e9, + } + conversion_factor = conversion_factors[unit] + return np.timedelta64(round(value * conversion_factor), "ns") From 573fc9e292b3d262a48ce7d29502b9239c2ed271 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Fri, 6 Sep 2024 16:14:53 +0200 Subject: [PATCH 02/19] Add example. --- xdas/io/asn.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/xdas/io/asn.py b/xdas/io/asn.py index e2e0e8b..f473fe8 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -60,6 +60,56 @@ class ZMQStream: Updates the header information based on the received message. stream_packet(message) Processes a packet and returns a DataArray object. + + Examples + -------- + >>> import numpy as np + >>> import xdas as xd + >>> from xdas.io.asn import ZMQStream + >>> import holoviews as hv + >>> from holoviews.streams import Pipe + >>> hv.extension("bokeh") + + >>> stream = ZMQStream("tcp://pisco.unice.fr:3333") + + >>> nbuffer = 100 + >>> buffer = np.zeros((nbuffer, stream.shape[1])) + >>> pipe = Pipe(data=buffer) + + >>> bounds = ( + ... stream.distance["tie_values"][0], + ... 0, + ... stream.distance["tie_values"][1], + ... (nbuffer * stream.dt) / np.timedelta64(1, "s"), + ... ) + + >>> def image(data): + ... return hv.Image(data, bounds=bounds) + + >>> dmap = hv.DynamicMap(image, streams=[pipe]) + >>> dmap.opts( + ... xlabel="distance", + ... ylabel="time", + ... invert_yaxis=True, + ... clim=(-1, 1), + ... cmap="viridis", + ... width=800, + ... height=400, + ... ) + >>> dmap + + >>> atom = xd.atoms.Sequential( + ... [ + ... xd.signal.integrate(..., dim="distance"), + ... xd.signal.sliding_mean_removal(..., wlen=1000.0, dim="distance"), + ... ] + ... ) + >>> for da in stream: + ... da = atom(da) / 100.0 + ... buffer = np.concatenate([buffer, da.values], axis=0) + ... buffer = buffer[-nbuffer:None] + ... pipe.send(buffer) + """ def __init__(self, address): From 2804cf64f9f492e00017b1dae28df13381d4d270 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Fri, 6 Sep 2024 17:00:46 +0200 Subject: [PATCH 03/19] Ensure reproducibility. --- xdas/synthetics.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/xdas/synthetics.py b/xdas/synthetics.py index b5a4c65..1c1de2e 100644 --- a/xdas/synthetics.py +++ b/xdas/synthetics.py @@ -45,6 +45,9 @@ def wavelet_wavefronts( True """ + # ensure reporducibility + np.random.seed(42) + # sampling starttime = np.datetime64(starttime).astype("datetime64[ns]") span = (np.timedelta64(6, "s"), 10000.0) # (6 s, 10 km) @@ -68,7 +71,6 @@ def wavelet_wavefronts( data[:, k] += sp.gausspulse(t - ttp[k] - t0, fc) / 2 # P is twice weaker data[:, k] += sp.gausspulse(t - tts[k] - t0, fc) data /= np.max(np.abs(data), axis=0, keepdims=True) # normalize - np.random.seed(42) data += np.random.randn(*shape) / snr # add noise # strain rate like response @@ -96,6 +98,9 @@ def wavelet_wavefronts( def randn_wavefronts(): + # ensure reporducibility + np.random.seed(42) + # sampling resolution = (np.timedelta64(10, "ms"), 100.0) starttime = np.datetime64("2024-01-01T00:00:00").astype("datetime64[ns]") @@ -119,7 +124,6 @@ def randn_wavefronts(): data[:, k] += (t > (t0 + ttp[k])) * np.random.randn(shape[0]) / 2 data[:, k] += (t > (t0 + tts[k])) * np.random.randn(shape[0]) data /= np.max(np.abs(data), axis=0, keepdims=True) # normalize - np.random.seed(42) data += np.random.randn(*shape) / snr # add noise # pack data and coordinates as Database or DataCollection if chunking. From 1aaa4901174abfacd16b9ddba50941ab822d1b87 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Fri, 6 Sep 2024 17:01:06 +0200 Subject: [PATCH 04/19] Add ZeroMQ pub/sub reader/loader. --- xdas/processing/__init__.py | 2 + xdas/processing/core.py | 91 +++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/xdas/processing/__init__.py b/xdas/processing/__init__.py index f427119..50e16f3 100644 --- a/xdas/processing/__init__.py +++ b/xdas/processing/__init__.py @@ -4,5 +4,7 @@ DataArrayWriter, DataFrameWriter, RealTimeLoader, + ZMQPublisher, + ZMQSubscriber, process, ) diff --git a/xdas/processing/core.py b/xdas/processing/core.py index a8123db..aa682b1 100644 --- a/xdas/processing/core.py +++ b/xdas/processing/core.py @@ -1,9 +1,11 @@ import os from concurrent.futures import ThreadPoolExecutor from queue import Queue +from tempfile import TemporaryDirectory import numpy as np import pandas as pd +import zmq from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -297,3 +299,92 @@ def result(self): except pd.errors.EmptyDataError: out = pd.DataFrame() return out + + +class ZMQPublisher: + """ + A class for publishing DataArray chunks over ZeroMQ. + + Parameters + ---------- + address : str + The address to bind the publisher to. + + Examples + -------- + >>> from xdas.processing import ZMQPublisher + + >>> address = "tcp://*:5556" + >>> pub = ZMQPublisher(address) # doctest: +SKIP + + """ + + def __init__(self, address): + import zmq + + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.bind(address) + + def write(self, da): + """ + Send a DataArray over ZeroMQ. + + Parameters + ---------- + da : DataArray + The DataArray to be sent. + + """ + self.socket.send(tobytes(da)) + + def result(): + return None + + +class ZMQSubscriber: + """ + A class for subscribing to DataArray chunks over ZeroMQ. + + Parameters + ---------- + address : str + The address to connect the subscriber to. + + Examples + -------- + >>> from xdas.processing import ZMQSubscriber + + >>> address = "tcp://localhost:5556" + >>> sub = ZMQSubscriber(address) # doctest: +SKIP + + """ + + def __init__(self, address): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.connect(address) + self.socket.setsockopt_string(zmq.SUBSCRIBE, "") + + def __iter__(self): + return self + + def __next__(self): + message = self.socket.recv() + return frombuffer(message) + + +def tobytes(da): + with TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "tmp.nc") + da.to_netcdf(path, virtual=False) + with open(path, "rb") as file: + return file.read() + + +def frombuffer(da): + with TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "tmp.nc") + with open(path, "wb") as file: + file.write(da) + return open_dataarray(path).load() From e296048210d002f2e0ed4663f2cdefb3483164bf Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Fri, 6 Sep 2024 17:03:25 +0200 Subject: [PATCH 05/19] Add examples. --- xdas/processing/core.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/xdas/processing/core.py b/xdas/processing/core.py index aa682b1..3f7035f 100644 --- a/xdas/processing/core.py +++ b/xdas/processing/core.py @@ -312,11 +312,15 @@ class ZMQPublisher: Examples -------- + >>> import xdas as xd >>> from xdas.processing import ZMQPublisher - >>> address = "tcp://*:5556" - >>> pub = ZMQPublisher(address) # doctest: +SKIP + >>> publisher = ZMQPublisher("tcp://*:5556") + >>> packets = xd.split(xd.synthetics.randn_wavefronts(), 10) + >>> for n, da in enumerate(packets, start=1): + ... print(f"Sending packet {n}") + ... publisher.write(da) """ def __init__(self, address): @@ -353,11 +357,21 @@ class ZMQSubscriber: Examples -------- + >>> import xdas as xd >>> from xdas.processing import ZMQSubscriber - >>> address = "tcp://localhost:5556" - >>> sub = ZMQSubscriber(address) # doctest: +SKIP + >>> subscriber = ZMQSubscriber("tcp://localhost:5556") + >>> packets = [] + >>> for n, da in enumerate(subscriber, start=1): + ... print(f"Received packet {n}") + ... packets.append(da) + ... if n == 10: + ... break + + >>> da = xd.concatenate(packets) + + >>> assert da.equals(xd.synthetics.randn_wavefronts()) """ def __init__(self, address): From 359bf91cd3a535dc3015fe62ea211a5ae1dca9ab Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 08:35:25 +0200 Subject: [PATCH 06/19] Add zmq dependency. --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 1130f72..c794588 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "watchdog", "xarray", "xinterp", + "pyzmq", ] [project.optional-dependencies] From 69e80e52e5a4dda7626f04ba1527e4a9ce2f6479 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 08:38:02 +0200 Subject: [PATCH 07/19] Rename ZMQSubscriber --- xdas/io/asn.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xdas/io/asn.py b/xdas/io/asn.py index f473fe8..49be5b6 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -22,9 +22,9 @@ def read(fname): return DataArray(data, {"time": time, "distance": distance}) -class ZMQStream: +class ZMQSubscriber: """ - A class representing a ZeroMQ stream. + A class used to subscribe to a ZeroMQ stream. Parameters ---------- From db4aae3c2a31105477a7f8569bbe0576db5c28ba Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 13:32:44 +0200 Subject: [PATCH 08/19] Add ZMQPublisher with tests. --- tests/io/test_asn.py | 175 ++++++++++++++++++++++++++++++++++++++++++- xdas/io/asn.py | 80 +++++++++++++++++--- 2 files changed, 245 insertions(+), 10 deletions(-) diff --git a/tests/io/test_asn.py b/tests/io/test_asn.py index 4640904..9d90ee5 100644 --- a/tests/io/test_asn.py +++ b/tests/io/test_asn.py @@ -1 +1,174 @@ -# TODO +import time +import numpy as np + +import xdas as xd +from xdas.io.asn import ZMQPublisher, ZMQSubscriber +import socket +import json +import zmq + +import threading + +from concurrent.futures import ThreadPoolExecutor + +executor = ThreadPoolExecutor() + + +def get_free_address(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + port = s.getsockname()[1] + address = f"tcp://localhost:{port}" + return address + + +coords = { + "time": { + "tie_indices": [0, 99], + "tie_values": [ + np.datetime64("2020-01-01T00:00:00.000"), + np.datetime64("2020-01-01T00:00:09.900"), + ], + }, + "distance": {"tie_indices": [0, 9], "tie_values": [0.0, 90.0]}, +} + +da_float32 = xd.DataArray( + np.random.randn(100, 10).astype("float32"), + coords, +) + +da_int16 = xd.DataArray( + np.random.randn(100, 10).astype("int16"), + coords, +) + + +class TestZMQPublisher: + def test_get_header(self): + header = ZMQPublisher.get_header(da_float32) + assert header["bytesPerPackage"] == 40 + assert header["nPackagesPerMessage"] == 100 + assert header["nChannels"] == 10 + assert header["dataType"] == "float" + assert header["dx"] == 10.0 + assert header["dt"] == 0.1 + assert header["dtUnit"] == "s" + assert header["dxUnit"] == "m" + assert header["roiTable"] == [{"roiStart": 0, "roiEnd": 9, "roiDec": 1}] + header = ZMQPublisher.get_header(da_int16) + assert header["dataType"] == "short" + + def test_init_conect_set_header(self): + address = get_free_address() + pub = ZMQPublisher(address) + pub.submit(da_float32) + assert pub.header == ZMQPublisher.get_header(da_float32) + + def test_send_header(self): + address = get_free_address() + pub = ZMQPublisher(address) + pub.submit(da_float32) + socket = self.get_socket(address) + pub.submit(da_float32) # a packet must be sent once subscriber is connected + assert socket.recv() == json.dumps(pub.header).encode("utf-8") + + def test_send_data(self): + address = get_free_address() + pub = ZMQPublisher(address) + pub.submit(da_float32) + socket = self.get_socket(address) + pub.submit(da_float32) # a packet must be sent once subscriber is connected + socket.recv() + message = socket.recv() + assert message[:8] == da_float32["time"][0].values.astype("M8[ns]").tobytes() + assert message[8:] == da_float32.data.tobytes() + + def test_send_chunks(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = xd.split(da_float32, 10) + pub.submit(chunks[0]) + time.sleep(0.001) + socket = self.get_socket(address) + for chunk in chunks[1:]: + pub.submit(chunk) + time.sleep(0.001) + assert socket.recv() == json.dumps(pub.header).encode("utf-8") + for chunk in chunks[1:]: # first was sent before subscriber connected + message = socket.recv() + assert message[:8] == chunk["time"][0].values.astype("M8[ns]").tobytes() + assert message[8:] == chunk.data.tobytes() + + def test_several_subscribers(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = xd.split(da_float32, 10) + pub.submit(chunks[0]) + time.sleep(0.001) + socket1 = self.get_socket(address) + for chunk in chunks[1:5]: + pub.submit(chunk) + time.sleep(0.001) + socket2 = self.get_socket(address) + for chunk in chunks[5:]: + pub.submit(chunk) + time.sleep(0.001) + assert socket1.recv() == json.dumps(pub.header).encode("utf-8") + for chunk in chunks[1:]: # first was sent before subscriber connected + message = socket1.recv() + assert message[:8] == chunk["time"][0].values.astype("M8[ns]").tobytes() + assert message[8:] == chunk.data.tobytes() + assert socket2.recv() == json.dumps(pub.header).encode("utf-8") + for chunk in chunks[5:]: # first was sent before subscriber connected + message = socket2.recv() + assert message[:8] == chunk["time"][0].values.astype("M8[ns]").tobytes() + assert message[8:] == chunk.data.tobytes() + + def test_change_header(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = xd.split(da_float32, 10) + pub.submit(chunks[0]) + time.sleep(0.001) + socket = self.get_socket(address) + for chunk in chunks[1:5]: + pub.submit(chunk) + header1 = pub.header + time.sleep(0.001) + for chunk in chunks[5:]: + pub.submit(chunk.isel(distance=slice(0, 5))) + header2 = pub.header + time.sleep(0.001) + assert socket.recv() == json.dumps(header1).encode("utf-8") + for chunk in chunks[1:5]: # first was sent before subscriber connected + message = socket.recv() + assert message[:8] == chunk["time"][0].values.astype("M8[ns]").tobytes() + assert message[8:] == chunk.data.tobytes() + assert socket.recv() == json.dumps(header2).encode("utf-8") + for chunk in chunks[5:]: # first was sent before subscriber connected + message = socket.recv() + assert message[:8] == chunk["time"][0].values.astype("M8[ns]").tobytes() + assert message[8:] == chunk.isel(distance=slice(0, 5)).data.tobytes() + + def get_socket(self, address): + socket = zmq.Context().socket(zmq.SUB) + socket.connect(address) + socket.setsockopt(zmq.SUBSCRIBE, b"") + time.sleep(0.001) + return socket + + +# class TestZMQSubscriber: +# def test_init_connect_update_header(self): +# address = get_free_address() +# pub = ZMQPublisher(address) +# pub.submit(da_float32) +# sub = ZMQSubscriber(address) +# assert sub.packet_size == 40 +# assert sub.shape == (100, 10) +# assert sub.format == "1000f" +# assert sub.distance == {"tie_indices": [0, 9], "tie_values": [0.0, 90.0]} +# assert sub.dt == 0.1 +# assert sub.nt == 100 +# pub.socket.close() diff --git a/xdas/io/asn.py b/xdas/io/asn.py index 49be5b6..d47c1d2 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -5,6 +5,8 @@ import numpy as np import zmq +from xdas.core.coordinates import get_sampling_interval + from ..core.dataarray import DataArray from ..virtual import VirtualSource @@ -134,7 +136,7 @@ def __next__(self): self.update_header(message) return self.__next__() else: - return self.stream_packet(message) + return self.unpack(message) def connect(self, address): context = zmq.Context() @@ -154,11 +156,7 @@ def update_header(self, message): self.packet_size = 8 + header["bytesPerPackage"] * header["nPackagesPerMessage"] self.shape = (header["nPackagesPerMessage"], header["nChannels"]) - - self.format = "%d%s" % ( - header["nChannels"] * header["nPackagesPerMessage"], - "f" if header["dataType"] == "float" else "h", - ) + self.dtype = np.float32 if header["dataType"] == "float" else np.int16 roiTable = header["roiTable"][0] di = roiTable["roiStart"] * header["dx"] @@ -171,9 +169,9 @@ def update_header(self, message): self.dt = float_to_timedelta(header["dt"], header["dtUnit"]) self.nt = header["nPackagesPerMessage"] - def stream_packet(self, message): - t0 = np.datetime64(struct.unpack(" Date: Tue, 17 Sep 2024 13:35:46 +0200 Subject: [PATCH 09/19] Add transpose for safety. --- xdas/io/asn.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/xdas/io/asn.py b/xdas/io/asn.py index d47c1d2..954b862 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -207,6 +207,7 @@ def connect(self, address): @staticmethod def get_header(da): + da = da.transpose("time", "distance") header = { "bytesPerPackage": da.dtype.itemsize * da.shape[1], "nPackagesPerMessage": da.shape[0], @@ -221,6 +222,7 @@ def get_header(da): return header def send(self, da): + da = da.transpose("time", "distance") header = self.get_header(da) if self.header is None: self.header = header @@ -234,6 +236,7 @@ def send_header(self): self.send_message(message) def send_data(self, da): + da = da.transpose("time", "distance") t0 = da["time"][0].values.astype("datetime64[ns]") data = da.values message = t0.tobytes() + data.tobytes() From c5bbdd13e8880ede2af06e510ec1cdd040520dbd Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 13:39:47 +0200 Subject: [PATCH 10/19] test int16. --- tests/io/test_asn.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/io/test_asn.py b/tests/io/test_asn.py index 9d90ee5..1ccf3de 100644 --- a/tests/io/test_asn.py +++ b/tests/io/test_asn.py @@ -79,10 +79,15 @@ def test_send_data(self): pub.submit(da_float32) socket = self.get_socket(address) pub.submit(da_float32) # a packet must be sent once subscriber is connected - socket.recv() + socket.recv() # header message = socket.recv() assert message[:8] == da_float32["time"][0].values.astype("M8[ns]").tobytes() assert message[8:] == da_float32.data.tobytes() + pub.submit(da_int16) + socket.recv() # header + message = socket.recv() + assert message[:8] == da_int16["time"][0].values.astype("M8[ns]").tobytes() + assert message[8:] == da_int16.data.tobytes() def test_send_chunks(self): address = get_free_address() From a40ce246d588f13e2ee1ccbd44fbf53c472939b9 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 14:23:03 +0200 Subject: [PATCH 11/19] Refactor Publisher and add tests --- tests/io/test_asn.py | 85 +++++++++++++++++++++++++++++++++----------- xdas/io/asn.py | 19 ++++------ 2 files changed, 72 insertions(+), 32 deletions(-) diff --git a/tests/io/test_asn.py b/tests/io/test_asn.py index 1ccf3de..fa82953 100644 --- a/tests/io/test_asn.py +++ b/tests/io/test_asn.py @@ -1,15 +1,14 @@ +import json +import socket +import threading import time +from concurrent.futures import ThreadPoolExecutor + import numpy as np +import zmq import xdas as xd from xdas.io.asn import ZMQPublisher, ZMQSubscriber -import socket -import json -import zmq - -import threading - -from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor() @@ -164,16 +163,62 @@ def get_socket(self, address): return socket -# class TestZMQSubscriber: -# def test_init_connect_update_header(self): -# address = get_free_address() -# pub = ZMQPublisher(address) -# pub.submit(da_float32) -# sub = ZMQSubscriber(address) -# assert sub.packet_size == 40 -# assert sub.shape == (100, 10) -# assert sub.format == "1000f" -# assert sub.distance == {"tie_indices": [0, 9], "tie_values": [0.0, 90.0]} -# assert sub.dt == 0.1 -# assert sub.nt == 100 -# pub.socket.close() +class TestZMQSubscriber: + def test_one_chunk(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = [da_float32] + threading.Thread(target=self.publish, args=(pub, chunks)).start() + sub = ZMQSubscriber(address) + assert sub.packet_size == 4008 + assert sub.shape == (100, 10) + assert sub.dtype == np.float32 + assert sub.distance == {"tie_indices": [0, 9], "tie_values": [0.0, 90.0]} + assert sub.delta == np.timedelta64(100, "ms") + result = next(sub) + assert result.equals(da_float32) + chunks = [da_int16] + threading.Thread(target=self.publish, args=(pub, chunks)).start() + result = next(sub) + assert sub.packet_size == 2008 + assert sub.dtype == np.int16 + assert result.equals(da_int16) + + def test_several_chunks(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = xd.split(da_float32, 5) + threading.Thread(target=self.publish, args=(pub, chunks)).start() + sub = ZMQSubscriber(address) + assert sub.packet_size == 808 + assert sub.shape == (20, 10) + assert sub.dtype == np.float32 + assert sub.distance == {"tie_indices": [0, 9], "tie_values": [0.0, 90.0]} + assert sub.delta == np.timedelta64(100, "ms") + for chunk in chunks: + result = next(sub) + assert result.equals(chunk) + + def test_several_subscribers(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = xd.split(da_float32, 5) + thread = threading.Thread(target=self.publish, args=(pub, chunks[:2])) + thread.start() + sub1 = ZMQSubscriber(address) + thread.join() + thread = threading.Thread(target=self.publish, args=(pub, chunks[2:])) + thread.start() + sub2 = ZMQSubscriber(address) + + for chunk in chunks: + result = next(sub1) + assert result.equals(chunk) + for chunk in chunks[2:]: + result = next(sub2) + assert result.equals(chunk) + + def publish(self, pub, chunks): + for chunk in chunks: + time.sleep(0.001) + pub.submit(chunk) diff --git a/xdas/io/asn.py b/xdas/io/asn.py index 954b862..fbc66a6 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -41,14 +41,12 @@ class ZMQSubscriber: The size of each packet in bytes. shape : tuple The shape of the data array. - format : str - The format string used for unpacking the data. + dtype : numpy.dtype + The data type of the array. distance : dict The distance information. - dt : numpy.timedelta64 + delta : numpy.timedelta64 The sampling time interval. - nt : int - The number of time samples per message. Methods ------- @@ -153,11 +151,9 @@ def is_packet(self, message): def update_header(self, message): header = json.loads(message.decode("utf-8")) - self.packet_size = 8 + header["bytesPerPackage"] * header["nPackagesPerMessage"] self.shape = (header["nPackagesPerMessage"], header["nChannels"]) self.dtype = np.float32 if header["dataType"] == "float" else np.int16 - roiTable = header["roiTable"][0] di = roiTable["roiStart"] * header["dx"] de = roiTable["roiEnd"] * header["dx"] @@ -165,16 +161,14 @@ def update_header(self, message): "tie_indices": [0, header["nChannels"] - 1], "tie_values": [di, de], } - - self.dt = float_to_timedelta(header["dt"], header["dtUnit"]) - self.nt = header["nPackagesPerMessage"] + self.delta = float_to_timedelta(header["dt"], header["dtUnit"]) def unpack(self, message): - t0 = np.frombuffer(message[:8], "datetime64[ns]") + t0 = np.frombuffer(message[:8], "datetime64[ns]").reshape(()) data = np.frombuffer(message[8:], self.dtype).reshape(self.shape) time = { "tie_indices": [0, self.shape[0] - 1], - "tie_values": [t0, t0 + (self.shape[0] - 1) * self.dt], + "tie_values": [t0, t0 + (self.shape[0] - 1) * self.delta], } return DataArray(data, {"time": time, "distance": self.distance}) @@ -202,6 +196,7 @@ def write(self, da): def connect(self, address): context = zmq.Context() socket = context.socket(zmq.XPUB) + socket.setsockopt(zmq.XPUB_VERBOSE, True) socket.bind(address) self.socket = socket From fb8b13572aa84c02effac6193924c170a036ac14 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 14:28:12 +0200 Subject: [PATCH 12/19] Add test change header for ASN subscriber --- tests/io/test_asn.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/io/test_asn.py b/tests/io/test_asn.py index fa82953..564a2a3 100644 --- a/tests/io/test_asn.py +++ b/tests/io/test_asn.py @@ -218,6 +218,17 @@ def test_several_subscribers(self): result = next(sub2) assert result.equals(chunk) + def test_change_header(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = xd.split(da_float32, 5) + chunks = [chunk.isel(distance=slice(0, 5)) for chunk in chunks[:2]] + chunks[2:] + threading.Thread(target=self.publish, args=(pub, chunks)).start() + sub = ZMQSubscriber(address) + for chunk in chunks: + result = next(sub) + assert result.equals(chunk) + def publish(self, pub, chunks): for chunk in chunks: time.sleep(0.001) From 2e2568019b5928f8741be2b02e944ae88f289bc8 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 14:37:26 +0200 Subject: [PATCH 13/19] test iter for xdas.io.asn.ZMQPublisher. --- tests/io/test_asn.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/io/test_asn.py b/tests/io/test_asn.py index 564a2a3..e3c9966 100644 --- a/tests/io/test_asn.py +++ b/tests/io/test_asn.py @@ -229,6 +229,16 @@ def test_change_header(self): result = next(sub) assert result.equals(chunk) + def test_iter(self): + address = get_free_address() + pub = ZMQPublisher(address) + chunks = xd.split(da_float32, 5) + threading.Thread(target=self.publish, args=(pub, chunks)).start() + sub = ZMQSubscriber(address) + sub = (chunk for _, chunk in zip(range(5), sub)) + result = xd.concatenate([chunk for chunk in sub]) + assert result.equals(da_float32) + def publish(self, pub, chunks): for chunk in chunks: time.sleep(0.001) From ea6bb0c91465327ed418613dadc6909b7432873f Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 15:13:00 +0200 Subject: [PATCH 14/19] Add get_free_port and refactor/update docstring. --- tests/io/test_asn.py | 41 +++++----- xdas/io/__init__.py | 1 + xdas/io/asn.py | 180 +++++++++++++++---------------------------- xdas/io/core.py | 20 +++++ 4 files changed, 103 insertions(+), 139 deletions(-) create mode 100644 xdas/io/core.py diff --git a/tests/io/test_asn.py b/tests/io/test_asn.py index e3c9966..05f1ea5 100644 --- a/tests/io/test_asn.py +++ b/tests/io/test_asn.py @@ -2,7 +2,6 @@ import socket import threading import time -from concurrent.futures import ThreadPoolExecutor import numpy as np import zmq @@ -10,15 +9,10 @@ import xdas as xd from xdas.io.asn import ZMQPublisher, ZMQSubscriber -executor = ThreadPoolExecutor() - -def get_free_address(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - port = s.getsockname()[1] - address = f"tcp://localhost:{port}" - return address +def get_free_local_address(): + port = xd.io.get_free_port() + return f"tcp://localhost:{port}" coords = { @@ -45,7 +39,7 @@ def get_free_address(): class TestZMQPublisher: def test_get_header(self): - header = ZMQPublisher.get_header(da_float32) + header = ZMQPublisher._get_header(da_float32) assert header["bytesPerPackage"] == 40 assert header["nPackagesPerMessage"] == 100 assert header["nChannels"] == 10 @@ -55,17 +49,17 @@ def test_get_header(self): assert header["dtUnit"] == "s" assert header["dxUnit"] == "m" assert header["roiTable"] == [{"roiStart": 0, "roiEnd": 9, "roiDec": 1}] - header = ZMQPublisher.get_header(da_int16) + header = ZMQPublisher._get_header(da_int16) assert header["dataType"] == "short" def test_init_conect_set_header(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) pub.submit(da_float32) - assert pub.header == ZMQPublisher.get_header(da_float32) + assert pub.header == ZMQPublisher._get_header(da_float32) def test_send_header(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) pub.submit(da_float32) socket = self.get_socket(address) @@ -73,7 +67,7 @@ def test_send_header(self): assert socket.recv() == json.dumps(pub.header).encode("utf-8") def test_send_data(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) pub.submit(da_float32) socket = self.get_socket(address) @@ -89,7 +83,7 @@ def test_send_data(self): assert message[8:] == da_int16.data.tobytes() def test_send_chunks(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = xd.split(da_float32, 10) pub.submit(chunks[0]) @@ -105,7 +99,7 @@ def test_send_chunks(self): assert message[8:] == chunk.data.tobytes() def test_several_subscribers(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = xd.split(da_float32, 10) pub.submit(chunks[0]) @@ -130,7 +124,7 @@ def test_several_subscribers(self): assert message[8:] == chunk.data.tobytes() def test_change_header(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = xd.split(da_float32, 10) pub.submit(chunks[0]) @@ -165,11 +159,12 @@ def get_socket(self, address): class TestZMQSubscriber: def test_one_chunk(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = [da_float32] threading.Thread(target=self.publish, args=(pub, chunks)).start() sub = ZMQSubscriber(address) + assert sub.address == address assert sub.packet_size == 4008 assert sub.shape == (100, 10) assert sub.dtype == np.float32 @@ -185,7 +180,7 @@ def test_one_chunk(self): assert result.equals(da_int16) def test_several_chunks(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = xd.split(da_float32, 5) threading.Thread(target=self.publish, args=(pub, chunks)).start() @@ -200,7 +195,7 @@ def test_several_chunks(self): assert result.equals(chunk) def test_several_subscribers(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = xd.split(da_float32, 5) thread = threading.Thread(target=self.publish, args=(pub, chunks[:2])) @@ -219,7 +214,7 @@ def test_several_subscribers(self): assert result.equals(chunk) def test_change_header(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = xd.split(da_float32, 5) chunks = [chunk.isel(distance=slice(0, 5)) for chunk in chunks[:2]] + chunks[2:] @@ -230,7 +225,7 @@ def test_change_header(self): assert result.equals(chunk) def test_iter(self): - address = get_free_address() + address = get_free_local_address() pub = ZMQPublisher(address) chunks = xd.split(da_float32, 5) threading.Thread(target=self.publish, args=(pub, chunks)).start() diff --git a/xdas/io/__init__.py b/xdas/io/__init__.py index e630de2..84d91ec 100644 --- a/xdas/io/__init__.py +++ b/xdas/io/__init__.py @@ -1 +1,2 @@ from . import asn, febus, optasense, sintela +from .core import get_free_port diff --git a/xdas/io/asn.py b/xdas/io/asn.py index fbc66a6..ac4b0df 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -1,5 +1,4 @@ import json -import struct import h5py import numpy as np @@ -25,93 +24,6 @@ def read(fname): class ZMQSubscriber: - """ - A class used to subscribe to a ZeroMQ stream. - - Parameters - ---------- - address : str - The address to connect to. - - Attributes - ---------- - socket : zmq.Socket - The ZeroMQ socket used for communication. - packet_size : int - The size of each packet in bytes. - shape : tuple - The shape of the data array. - dtype : numpy.dtype - The data type of the array. - distance : dict - The distance information. - delta : numpy.timedelta64 - The sampling time interval. - - Methods - ------- - connect(address) - Connects to the specified address. - get_message() - Receives a message from the socket. - is_packet(message) - Checks if the message is a valid packet. - update_header(message) - Updates the header information based on the received message. - stream_packet(message) - Processes a packet and returns a DataArray object. - - Examples - -------- - >>> import numpy as np - >>> import xdas as xd - >>> from xdas.io.asn import ZMQStream - >>> import holoviews as hv - >>> from holoviews.streams import Pipe - >>> hv.extension("bokeh") - - >>> stream = ZMQStream("tcp://pisco.unice.fr:3333") - - >>> nbuffer = 100 - >>> buffer = np.zeros((nbuffer, stream.shape[1])) - >>> pipe = Pipe(data=buffer) - - >>> bounds = ( - ... stream.distance["tie_values"][0], - ... 0, - ... stream.distance["tie_values"][1], - ... (nbuffer * stream.dt) / np.timedelta64(1, "s"), - ... ) - - >>> def image(data): - ... return hv.Image(data, bounds=bounds) - - >>> dmap = hv.DynamicMap(image, streams=[pipe]) - >>> dmap.opts( - ... xlabel="distance", - ... ylabel="time", - ... invert_yaxis=True, - ... clim=(-1, 1), - ... cmap="viridis", - ... width=800, - ... height=400, - ... ) - >>> dmap - - >>> atom = xd.atoms.Sequential( - ... [ - ... xd.signal.integrate(..., dim="distance"), - ... xd.signal.sliding_mean_removal(..., wlen=1000.0, dim="distance"), - ... ] - ... ) - >>> for da in stream: - ... da = atom(da) / 100.0 - ... buffer = np.concatenate([buffer, da.values], axis=0) - ... buffer = buffer[-nbuffer:None] - ... pipe.send(buffer) - - """ - def __init__(self, address): """ Initializes a ZMQStream object. @@ -121,35 +33,36 @@ def __init__(self, address): address : str The address to connect to. """ - self.connect(address) - message = self.get_message() - self.update_header(message) + self.address = address + self._connect(self.address) + message = self._get_message() + self._update_header(message) def __iter__(self): return self def __next__(self): - message = self.get_message() - if not self.is_packet(message): - self.update_header(message) + message = self._get_message() + if not self._is_packet(message): + self._update_header(message) return self.__next__() else: - return self.unpack(message) + return self._unpack(message) - def connect(self, address): + def _connect(self, address): context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(address) socket.setsockopt_string(zmq.SUBSCRIBE, "") - self.socket = socket + self._socket = socket - def get_message(self): - return self.socket.recv() + def _get_message(self): + return self._socket.recv() - def is_packet(self, message): + def _is_packet(self, message): return len(message) == self.packet_size - def update_header(self, message): + def _update_header(self, message): header = json.loads(message.decode("utf-8")) self.packet_size = 8 + header["bytesPerPackage"] * header["nPackagesPerMessage"] self.shape = (header["nPackagesPerMessage"], header["nChannels"]) @@ -163,7 +76,7 @@ def update_header(self, message): } self.delta = float_to_timedelta(header["dt"], header["dtUnit"]) - def unpack(self, message): + def _unpack(self, message): t0 = np.frombuffer(message[:8], "datetime64[ns]").reshape(()) data = np.frombuffer(message[8:], self.dtype).reshape(self.shape) time = { @@ -174,8 +87,43 @@ def unpack(self, message): class ZMQPublisher: + """ + A class to stream data using ZeroMQ. + + Parameters + ---------- + address : str + The address to bind the ZeroMQ socket. + + Attributes + ---------- + address : str + The address where the ZeroMQ is bound to. + + Methods + ------- + submit(da) + Submits the data array for publishing. + + Examples + -------- + >>> import xdas as xd + >>> from xdas.io.asn import ZMQPublisher + + >>> da = xd.synthetics.randn_wavefronts() + + >>> port = xd.io.get_free_port() + >>> address = f"tcp://localhost:{port}" + >>> publisher = ZMQPublisher(address) + >>> chunks = xd.split(da, 10) + >>> for chunk in chunks: + ... publisher.submit(chunk) + + """ + def __init__(self, address): - self.connect(address) + self.address = address + self._connect(address) self._header = None @property @@ -188,12 +136,12 @@ def header(self, header): self.socket.setsockopt(zmq.XPUB_WELCOME_MSG, json.dumps(header).encode("utf-8")) def submit(self, da): - self.send(da) + self._send(da) def write(self, da): - self.send(da) + self._send(da) - def connect(self, address): + def _connect(self, address): context = zmq.Context() socket = context.socket(zmq.XPUB) socket.setsockopt(zmq.XPUB_VERBOSE, True) @@ -201,7 +149,7 @@ def connect(self, address): self.socket = socket @staticmethod - def get_header(da): + def _get_header(da): da = da.transpose("time", "distance") header = { "bytesPerPackage": da.dtype.itemsize * da.shape[1], @@ -216,28 +164,28 @@ def get_header(da): } return header - def send(self, da): + def _send(self, da): da = da.transpose("time", "distance") - header = self.get_header(da) + header = self._get_header(da) if self.header is None: self.header = header if not header == self.header: self.header = header - self.send_header() - self.send_data(da) + self._send_header() + self._send_data(da) - def send_header(self): + def _send_header(self): message = json.dumps(self.header).encode("utf-8") - self.send_message(message) + self._send_message(message) - def send_data(self, da): + def _send_data(self, da): da = da.transpose("time", "distance") t0 = da["time"][0].values.astype("datetime64[ns]") data = da.values message = t0.tobytes() + data.tobytes() - self.send_message(message) + self._send_message(message) - def send_message(self, message): + def _send_message(self, message): self.socket.send(message) diff --git a/xdas/io/core.py b/xdas/io/core.py new file mode 100644 index 0000000..a4171a7 --- /dev/null +++ b/xdas/io/core.py @@ -0,0 +1,20 @@ +import socket + + +def get_free_port(): + """ + Find and return a free port on the host machine. + + This function creates a temporary socket, binds it to an available port + provided by the host, retrieves the port number, and then closes the socket. + This is useful for finding an available port for network communication. + + Returns + ------- + int: + A free port number on the host machine. + + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] From 0445bb095c620510a680c4539aae40101c66624a Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 15:29:09 +0200 Subject: [PATCH 15/19] Add example to ASN ZMQSubscriber. --- xdas/io/asn.py | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/xdas/io/asn.py b/xdas/io/asn.py index ac4b0df..230b388 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -23,6 +23,15 @@ def read(fname): return DataArray(data, {"time": time, "distance": distance}) +type_map = { + "short": np.int16, + "int": np.int32, + "long": np.int64, + "float": np.float32, + "double": np.float64, +} + + class ZMQSubscriber: def __init__(self, address): """ @@ -32,6 +41,32 @@ def __init__(self, address): ---------- address : str The address to connect to. + + Examples + -------- + >>> import time + >>> import threading + + >>> import xdas as xd + >>> from xdas.io.asn import ZMQSubscriber + + >>> port = xd.io.get_free_port() + >>> address = f"tcp://localhost:{port}" + >>> publisher = ZMQPublisher(address) + >>> da = xd.synthetics.randn_wavefronts() + >>> chunks = xd.split(da, 10) + + >>> def publish(): + ... for chunk in chunks: + ... publisher.submit(chunk) + ... time.sleep(0.001) # so that the subscriber can connect in time + >>> threading.Thread(target=publish).start() + + >>> subscriber = ZMQSubscriber(address) + >>> for nchunk in range(10): + ... chunk = next(subscriber) + ... # do something with the chunk + """ self.address = address self._connect(self.address) @@ -66,7 +101,7 @@ def _update_header(self, message): header = json.loads(message.decode("utf-8")) self.packet_size = 8 + header["bytesPerPackage"] * header["nPackagesPerMessage"] self.shape = (header["nPackagesPerMessage"], header["nChannels"]) - self.dtype = np.float32 if header["dataType"] == "float" else np.int16 + self.dtype = type_map[header["dataType"]] roiTable = header["roiTable"][0] di = roiTable["roiStart"] * header["dx"] de = roiTable["roiEnd"] * header["dx"] @@ -155,7 +190,7 @@ def _get_header(da): "bytesPerPackage": da.dtype.itemsize * da.shape[1], "nPackagesPerMessage": da.shape[0], "nChannels": da.shape[1], - "dataType": "float" if da.dtype == np.float32 else "short", + "dataType": next((k for k, v in type_map.items() if v == da.dtype), None), "dx": get_sampling_interval(da, "distance"), "dt": get_sampling_interval(da, "time"), "dtUnit": "s", From 6bf2a1f73a48b7ba8be7d00bc13b95b958bd8fc2 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 16:14:02 +0200 Subject: [PATCH 16/19] Add dummy fast sythetics and fix doc examples of zmq. --- xdas/io/asn.py | 7 ++++--- xdas/processing/core.py | 37 ++++++++++++++++++++++++++++--------- xdas/synthetics.py | 14 ++++++++++++++ 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/xdas/io/asn.py b/xdas/io/asn.py index 230b388..14efe06 100644 --- a/xdas/io/asn.py +++ b/xdas/io/asn.py @@ -53,13 +53,14 @@ def __init__(self, address): >>> port = xd.io.get_free_port() >>> address = f"tcp://localhost:{port}" >>> publisher = ZMQPublisher(address) - >>> da = xd.synthetics.randn_wavefronts() + + >>> da = xd.synthetics.dummy() >>> chunks = xd.split(da, 10) >>> def publish(): ... for chunk in chunks: - ... publisher.submit(chunk) ... time.sleep(0.001) # so that the subscriber can connect in time + ... publisher.submit(chunk) >>> threading.Thread(target=publish).start() >>> subscriber = ZMQSubscriber(address) @@ -145,7 +146,7 @@ class ZMQPublisher: >>> import xdas as xd >>> from xdas.io.asn import ZMQPublisher - >>> da = xd.synthetics.randn_wavefronts() + >>> da = xd.synthetics.dummy() >>> port = xd.io.get_free_port() >>> address = f"tcp://localhost:{port}" diff --git a/xdas/processing/core.py b/xdas/processing/core.py index 3f7035f..9ab2124 100644 --- a/xdas/processing/core.py +++ b/xdas/processing/core.py @@ -315,11 +315,11 @@ class ZMQPublisher: >>> import xdas as xd >>> from xdas.processing import ZMQPublisher - >>> publisher = ZMQPublisher("tcp://*:5556") - >>> packets = xd.split(xd.synthetics.randn_wavefronts(), 10) + >>> address = f"tcp://localhost:{xd.io.get_free_port()}" + >>> publisher = ZMQPublisher(address) + >>> packets = xd.split(xd.synthetics.dummy(), 10) >>> for n, da in enumerate(packets, start=1): - ... print(f"Sending packet {n}") ... publisher.write(da) """ @@ -330,7 +330,7 @@ def __init__(self, address): self.socket = self.context.socket(zmq.PUB) self.socket.bind(address) - def write(self, da): + def submit(self, da): """ Send a DataArray over ZeroMQ. @@ -342,6 +342,9 @@ def write(self, da): """ self.socket.send(tobytes(da)) + def write(self, da): + self.submit(da) + def result(): return None @@ -357,21 +360,37 @@ class ZMQSubscriber: Examples -------- + >>> import threading + >>> import xdas as xd >>> from xdas.processing import ZMQSubscriber - >>> subscriber = ZMQSubscriber("tcp://localhost:5556") + First we generate some data and split it into packets + + >>> da = xd.synthetics.dummy() + >>> packets = xd.split(da, 10) + + We then publish the packets asynchronously + >>> address = f"tcp://localhost:{xd.io.get_free_port()}" + >>> publisher = ZMQPublisher(address) + + >>> def publish(): + ... for packet in packets: + ... publisher.submit(packet) + + >>> threading.Thread(target=publish).start() + + Now let's receive the packets + + >>> subscriber = ZMQSubscriber(address) >>> packets = [] >>> for n, da in enumerate(subscriber, start=1): - ... print(f"Received packet {n}") ... packets.append(da) ... if n == 10: ... break - >>> da = xd.concatenate(packets) - - >>> assert da.equals(xd.synthetics.randn_wavefronts()) + >>> assert da.equals(da) """ def __init__(self, address): diff --git a/xdas/synthetics.py b/xdas/synthetics.py index 1c1de2e..884c4ca 100644 --- a/xdas/synthetics.py +++ b/xdas/synthetics.py @@ -141,3 +141,17 @@ def randn_wavefronts(): }, ) return da + + +def dummy(shape=(1000, 100)): + starttime = np.datetime64("2024-01-01T00:00:00") + endtime = starttime + shape[0] * np.timedelta64(100, "ms") + time = {"tie_indices": [0, shape[0] - 1], "tie_values": [starttime, endtime]} + distance = {"tie_indices": [0, shape[1] - 1], "tie_values": [0.0, 1000.0]} + return DataArray( + data=np.random.randn(*shape), + coords={ + "time": time, + "distance": distance, + }, + ) From a08b46cfd4d6348860b6de5bee68cb5424fd22e2 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 16:51:32 +0200 Subject: [PATCH 17/19] Add tests for ZMQPublisher/Subscriber --- tests/test_processing.py | 45 ++++++++++++++++++++++++++++++++++++++++ xdas/processing/core.py | 43 ++++++++++++++++++++++++++++---------- xdas/synthetics.py | 2 +- 3 files changed, 78 insertions(+), 12 deletions(-) diff --git a/tests/test_processing.py b/tests/test_processing.py index 3434d5d..5256f24 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -1,6 +1,10 @@ import os import tempfile +import threading +import time +import hdf5plugin +import numpy as np import pandas as pd import scipy.signal as sp @@ -10,6 +14,8 @@ DataArrayLoader, DataArrayWriter, DataFrameWriter, + ZMQPublisher, + ZMQSubscriber, process, ) from xdas.signal import sosfilt @@ -160,3 +166,42 @@ def test_write_and_result_with_existing_file(self): # Check if the output file contains the correct data output_df = pd.read_csv(writer.path) assert output_df.equals(expected_result) + + +class TestZMQ: + def _publish_and_subscribe(self, packets, address, encoding=None): + publisher = ZMQPublisher(address) + + def publish(): + for packet in packets: + time.sleep(0.001) + publisher.submit(packet, encoding=encoding) + + threading.Thread(target=publish).start() + + subscriber = ZMQSubscriber(address) + result = [] + for n, packet in enumerate(subscriber, start=1): + result.append(packet) + if n == len(packets): + break + return xdas.concatenate(result) + + def test_publish_and_subscribe(self): + expected = xdas.synthetics.dummy() + packets = xdas.split(expected, 10) + address = f"tcp://localhost:{xdas.io.get_free_port()}" + + result = self._publish_and_subscribe(packets, address) + assert result.equals(expected) + + def test_encoding(self): + expected = xdas.synthetics.dummy() + packets = xdas.split(expected, 10) + address = f"tcp://localhost:{xdas.io.get_free_port()}" + encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)} + + result = self._publish_and_subscribe(packets, address, encoding=encoding) + assert np.allclose(result.values, expected.values, atol=1e-6) + result.data = expected.data + assert result.equals(expected) diff --git a/xdas/processing/core.py b/xdas/processing/core.py index 9ab2124..754307f 100644 --- a/xdas/processing/core.py +++ b/xdas/processing/core.py @@ -315,12 +315,28 @@ class ZMQPublisher: >>> import xdas as xd >>> from xdas.processing import ZMQPublisher + First we generate some data and split it into packets + + >>> packets = xd.split(xd.synthetics.dummy(), 10) + + We initialize the publisher at a given address + >>> address = f"tcp://localhost:{xd.io.get_free_port()}" >>> publisher = ZMQPublisher(address) - >>> packets = xd.split(xd.synthetics.dummy(), 10) - >>> for n, da in enumerate(packets, start=1): - ... publisher.write(da) + We can then publish the packets + + >>> for da in packets: + ... publisher.submit(da) + + To reduce the size of the packets, we can also specify an encoding + + >>> import hdf5plugin + + >>> encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)} + >>> for da in packets: + ... publisher.submit(da, encoding=encoding) + """ def __init__(self, address): @@ -330,7 +346,7 @@ def __init__(self, address): self.socket = self.context.socket(zmq.PUB) self.socket.bind(address) - def submit(self, da): + def submit(self, da, encoding=None): """ Send a DataArray over ZeroMQ. @@ -340,10 +356,10 @@ def submit(self, da): The DataArray to be sent. """ - self.socket.send(tobytes(da)) + self.socket.send(tobytes(da, encoding)) - def write(self, da): - self.submit(da) + def write(self, da, encoding=None): + self.submit(da, encoding) def result(): return None @@ -358,6 +374,11 @@ class ZMQSubscriber: address : str The address to connect the subscriber to. + Methods + ------- + submit(da, encoding=None) + Send a DataArray over ZeroMQ. + Examples -------- >>> import threading @@ -366,7 +387,7 @@ class ZMQSubscriber: >>> from xdas.processing import ZMQSubscriber First we generate some data and split it into packets - + >>> da = xd.synthetics.dummy() >>> packets = xd.split(da, 10) @@ -382,7 +403,7 @@ class ZMQSubscriber: >>> threading.Thread(target=publish).start() Now let's receive the packets - + >>> subscriber = ZMQSubscriber(address) >>> packets = [] >>> for n, da in enumerate(subscriber, start=1): @@ -407,10 +428,10 @@ def __next__(self): return frombuffer(message) -def tobytes(da): +def tobytes(da, encoding=None): with TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, "tmp.nc") - da.to_netcdf(path, virtual=False) + da.to_netcdf(path, virtual=False, encoding=encoding) with open(path, "rb") as file: return file.read() diff --git a/xdas/synthetics.py b/xdas/synthetics.py index 884c4ca..aa12087 100644 --- a/xdas/synthetics.py +++ b/xdas/synthetics.py @@ -145,7 +145,7 @@ def randn_wavefronts(): def dummy(shape=(1000, 100)): starttime = np.datetime64("2024-01-01T00:00:00") - endtime = starttime + shape[0] * np.timedelta64(100, "ms") + endtime = starttime + (shape[0] - 1) * np.timedelta64(100, "ms") time = {"tie_indices": [0, shape[0] - 1], "tie_values": [starttime, endtime]} distance = {"tie_indices": [0, shape[1] - 1], "tie_values": [0.0, 1000.0]} return DataArray( From c7b3102f0d97b779ed674a55f3128903d1890849 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 18:08:09 +0200 Subject: [PATCH 18/19] Add streaming documentation + encoding is given at init. --- docs/api/processing.md | 2 + docs/user-guide/index.md | 1 + docs/user-guide/streaming.md | 86 ++++++++++++++++++++++++++++++++++++ tests/test_processing.py | 4 +- xdas/processing/core.py | 43 ++++++++++-------- 5 files changed, 116 insertions(+), 20 deletions(-) create mode 100644 docs/user-guide/streaming.md diff --git a/docs/api/processing.md b/docs/api/processing.md index fc6d244..11ed302 100644 --- a/docs/api/processing.md +++ b/docs/api/processing.md @@ -12,4 +12,6 @@ DataArrayLoader RealTimeLoader DataArrayWriter + ZMQPublisher + ZMQSubscriber ``` \ No newline at end of file diff --git a/docs/user-guide/index.md b/docs/user-guide/index.md index a5bdbd7..c8aef43 100644 --- a/docs/user-guide/index.md +++ b/docs/user-guide/index.md @@ -10,4 +10,5 @@ interpolated-coordinates convert-displacement atoms processing +streaming ``` \ No newline at end of file diff --git a/docs/user-guide/streaming.md b/docs/user-guide/streaming.md new file mode 100644 index 0000000..cba8412 --- /dev/null +++ b/docs/user-guide/streaming.md @@ -0,0 +1,86 @@ +--- +file_format: mystnb +kernelspec: + name: python3 +--- + +# Streaming data + +Xdas allows to stream data over any network using [ZeroMQ](https://zeromq.org). Xdas use the Publisher and Subscriber patterns meaning that on one node the data is published and that any number of subscribers can receive the data stream. + +Streaming data with Xdas is done by simply dumping each chunk to NetCDF binaries and to send those as packets. This ensure that each packet is self described and that feature such as compression are available (which can be very helpful to minimize the used bandwidth). + +Xdas implements the {py:class}`~xdas.processing.ZMQPublisher` and {py:class}`~xdas.processing.ZMQSubscriber`.Those object can respectively be used as a Writer and a Loader as described in the [](processing) section. Both are initialized by giving an network address. The publisher use the `submit` method to send packets while the subscriber is an infinite iterator that yields packets. + +In this section, we will mimic the use of several machine by using multithreading, where each thread is supposed to be a different machine. In real-life application, the publisher and subscriber are generally called in different machine or software. + +## Simple use case + +```{code-cell} +import threading +import time + +import xdas as xd +from xdas.processing import ZMQPublisher, ZMQSubscriber +``` + +First we generate some data and split it into packets + +```{code-cell} +da = xd.synthetics.dummy() +packets = xd.split(da, 5) +``` + +We then publish the packets on machine 1. + +```{code-cell} +address = f"tcp://localhost:{xd.io.get_free_port()}" +publisher = ZMQPublisher(address) + +def publish(): + for packet in packets: + publisher.submit(packet) + # give a chance to the subscriber to connect in time and to get the last packet + time.sleep(0.1) + +machine1 = threading.Thread(target=publish) +machine1.start() +``` + +Let's receive the packets on machine 2. + +```{code-cell} +subscriber = ZMQSubscriber(address) + +packets = [] + +def subscribe(): + for packet in subscriber: + packets.append(packet) + +machine2 = threading.Thread(target=subscribe) +machine2.start() +``` + +Now we wait for machine 1 to finish sending its packet and see if everything went well. + +```{code-cell} +machine1.join() +print(f"We received {len(packets)} packets!") +assert xd.concatenate(packets).equals(da) +``` + +## Using encoding + +To reduce the volume of the transmitted data, compression is often useful. Xdas enable the use of the ZFP algorithm when storing data but also when streaming it. Encoding is declared the same way. + +```{code-cell} +:tags: [remove-output] + +import hdf5plugin + +encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)} +publisher = ZMQPublisher(address, encoding) # Add encoding here, the rest is the same +``` + + diff --git a/tests/test_processing.py b/tests/test_processing.py index 5256f24..820a908 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -170,12 +170,12 @@ def test_write_and_result_with_existing_file(self): class TestZMQ: def _publish_and_subscribe(self, packets, address, encoding=None): - publisher = ZMQPublisher(address) + publisher = ZMQPublisher(address, encoding) def publish(): for packet in packets: time.sleep(0.001) - publisher.submit(packet, encoding=encoding) + publisher.submit(packet) threading.Thread(target=publish).start() diff --git a/xdas/processing/core.py b/xdas/processing/core.py index 754307f..c3c42a5 100644 --- a/xdas/processing/core.py +++ b/xdas/processing/core.py @@ -170,6 +170,8 @@ class DataArrayWriter: dirpath : str or path The directory to store the output of a processing pipeline. The directory needs to exist and be empty. + encoding : dict + The encoding to use when dumping the DataArrays to bytes. Examples -------- @@ -309,11 +311,13 @@ class ZMQPublisher: ---------- address : str The address to bind the publisher to. + encoding : dict + The encoding to use when dumping the DataArrays to bytes. Examples -------- >>> import xdas as xd - >>> from xdas.processing import ZMQPublisher + >>> from xdas.processing import ZMQPublisher, ZMQSubscriber First we generate some data and split it into packets @@ -333,20 +337,22 @@ class ZMQPublisher: >>> import hdf5plugin + >>> address = f"tcp://localhost:{xd.io.get_free_port()}" >>> encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)} + >>> publisher = ZMQPublisher(address, encoding) >>> for da in packets: - ... publisher.submit(da, encoding=encoding) + ... publisher.submit(da) """ - def __init__(self, address): - import zmq - - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PUB) - self.socket.bind(address) + def __init__(self, address, encoding=None): + self.address = address + self.encoding = encoding + self._context = zmq.Context() + self._socket = self._context.socket(zmq.PUB) + self._socket.bind(self.address) - def submit(self, da, encoding=None): + def submit(self, da): """ Send a DataArray over ZeroMQ. @@ -356,10 +362,10 @@ def submit(self, da, encoding=None): The DataArray to be sent. """ - self.socket.send(tobytes(da, encoding)) + self._socket.send(tobytes(da, self.encoding)) - def write(self, da, encoding=None): - self.submit(da, encoding) + def write(self, da): + self.submit(da) def result(): return None @@ -376,7 +382,7 @@ class ZMQSubscriber: Methods ------- - submit(da, encoding=None) + submit(da) Send a DataArray over ZeroMQ. Examples @@ -415,16 +421,17 @@ class ZMQSubscriber: """ def __init__(self, address): - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) - self.socket.connect(address) - self.socket.setsockopt_string(zmq.SUBSCRIBE, "") + self.address = address + self._context = zmq.Context() + self._socket = self._context.socket(zmq.SUB) + self._socket.connect(address) + self._socket.setsockopt_string(zmq.SUBSCRIBE, "") def __iter__(self): return self def __next__(self): - message = self.socket.recv() + message = self._socket.recv() return frombuffer(message) From 3705ac46d2503d51eca3bbc5f8d8b5b7f62fe283 Mon Sep 17 00:00:00 2001 From: Alister Trabattoni Date: Tue, 17 Sep 2024 18:30:03 +0200 Subject: [PATCH 19/19] Add note about ASN protocol and fix references. --- docs/api/index.md | 7 ++++--- docs/api/io.md | 27 +++++++++++++++++++++++++++ docs/user-guide/streaming.md | 5 +++++ 3 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 docs/api/io.md diff --git a/docs/api/index.md b/docs/api/index.md index 6a1c662..3f2d665 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -4,11 +4,12 @@ :maxdepth: 1 xdas +atoms +io fft -signal -processing parallel +processing +signal synthetics virtual -atoms ``` \ No newline at end of file diff --git a/docs/api/io.md b/docs/api/io.md new file mode 100644 index 0000000..4e2836c --- /dev/null +++ b/docs/api/io.md @@ -0,0 +1,27 @@ +```{eval-rst} +.. currentmodule:: xdas.io +``` + +# xdas.io + +```{eval-rst} +.. autosummary:: + :toctree: ../_autosummary + + get_free_port +``` + +```{eval-rst} +.. currentmodule:: xdas.io.asn +``` + + +## ASN + +```{eval-rst} +.. autosummary:: + :toctree: ../_autosummary + + ZMQPublisher + ZMQSubscriber +``` diff --git a/docs/user-guide/streaming.md b/docs/user-guide/streaming.md index cba8412..b582eb6 100644 --- a/docs/user-guide/streaming.md +++ b/docs/user-guide/streaming.md @@ -79,8 +79,13 @@ To reduce the volume of the transmitted data, compression is often useful. Xdas import hdf5plugin +address = f"tcp://localhost:{xd.io.get_free_port()}" encoding = {"chunks": (10, 10), **hdf5plugin.Zfp(accuracy=1e-6)} publisher = ZMQPublisher(address, encoding) # Add encoding here, the rest is the same ``` +{py:class}`~xdas.io.asn.ZMQSubscriber` +```{note} +Xdas also implements the ZeroMQ protocol used by the OptoDAS interrogators by ASN. Equivalent {py:class}`~xdas.io.asn.ZMQPublisher` and {py:class}`~xdas.io.asn.ZMQSubscriber` can be found in {py:mod}`xdas.io.asn`. This can be useful get data in real-time from one instrument of that kind. Note that compression is not available with that protocol yet. +```