From a06c48066eb6ff93902e125d2c3a844bdfbd5a03 Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sat, 2 Mar 2024 12:46:25 -0800 Subject: [PATCH 1/3] Add activemq implementation --- pyproject.toml | 2 ++ src/tiled_ingestor/ingest.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c8890e3..b2d6bc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ classifiers = [ dependencies = [ "pika", + "stomp.py", "tiled[server]==0.1.0a114", "python-dotenv" ] @@ -30,6 +31,7 @@ dynamic = ["version"] [project.optional-dependencies] dev = [ "pytest", + "pytest-asyncio", "pre-commit", "flake8" ] diff --git a/src/tiled_ingestor/ingest.py b/src/tiled_ingestor/ingest.py index b3df05a..e0dcb13 100644 --- a/src/tiled_ingestor/ingest.py +++ b/src/tiled_ingestor/ingest.py @@ -84,7 +84,8 @@ async def process_file( tiled_config = get_tiled_config("../mlex_tomo_framework/tiled/deploy/config") asyncio.run( process_file( - "../mlex_tomo_framework/data/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313", + "../mlex_tomo_framework/data/tiled_storage/beamlines/8.3.2/recons/" + "rec20240207_120829_test_no_xrays_n1313", tiled_config, path_prefix="/beamlines/8.3.2/recons/", ) From a14bd11f98e7809d25fb3a6afc3233e1988608eb Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sat, 2 Mar 2024 13:11:24 -0800 Subject: [PATCH 2/3] bulk of files --- src/tiled_ingestor/_tests/test_consumer.py | 30 +++++++++++ src/tiled_ingestor/_tests/test_ingest.py | 48 ++++++++++++++++++ src/tiled_ingestor/activemq/__init__.py | 0 src/tiled_ingestor/activemq/consumer.py | 59 ++++++++++++++++++++++ src/tiled_ingestor/activemq/producer.py | 38 ++++++++++++++ src/tiled_ingestor/activemq/schemas.py | 3 ++ 6 files changed, 178 insertions(+) create mode 100644 src/tiled_ingestor/_tests/test_consumer.py create mode 100644 src/tiled_ingestor/_tests/test_ingest.py create mode 100644 src/tiled_ingestor/activemq/__init__.py create mode 100644 src/tiled_ingestor/activemq/consumer.py create mode 100644 src/tiled_ingestor/activemq/producer.py create mode 100644 src/tiled_ingestor/activemq/schemas.py diff --git a/src/tiled_ingestor/_tests/test_consumer.py b/src/tiled_ingestor/_tests/test_consumer.py new file mode 100644 index 0000000..ce82548 --- /dev/null +++ b/src/tiled_ingestor/_tests/test_consumer.py @@ -0,0 +1,30 @@ +import pytest +from unittest.mock import MagicMock +from collections import deque + +from tiled_ingestor.activemq.consumer import ScanListener + + +@pytest.fixture +def listener(): + return ScanListener() + + +def test_on_error(listener): + message = "Error message" + listener.on_error(message) + assert listener.messages == deque() + + +def test_on_message(listener, monkeypatch): + message = MagicMock() + message.body = '{"status": "COMPLETE", "filePath": "/path/to/file"}' + + def mock_process_file(file_path): + assert file_path == "/path/to/file" + + monkeypatch.setattr(process_file, "delay", mock_process_file) + + listener.on_message(message) + assert listener.messages == deque(['/path/to/file']) + diff --git a/src/tiled_ingestor/_tests/test_ingest.py b/src/tiled_ingestor/_tests/test_ingest.py new file mode 100644 index 0000000..bfdc9cc --- /dev/null +++ b/src/tiled_ingestor/_tests/test_ingest.py @@ -0,0 +1,48 @@ +# import asyncio +# from unittest.mock import patch + +import pytest +# from tiled_ingestor.ingest import process_file, get_tiled_config + + +@pytest.fixture +def tiled_config(tmpdir): + return { + "trees": [ + { + "path": "/", + "tree": "catalog", + "args": { + "uri": f"sqlite+aiosqlite:///{tmpdir}/catalog.db", + "writable_storage": str(tmpdir / "data"), + "readable_storage": [str(tmpdir / "data")], + "init_if_not_exists": True, + }, + } + ] + } + + +# this test doesn't quite work, but since we'll be converting to the +# client soon, I'll wait + +# @pytest.mark.asyncio +# async def test_process_file(tiled_config): +# file_path = "/path/to/file" +# tiled_config_tree_path = "/" +# path_prefix = "/" + +# with patch("tiled.catalog.from_uri") as mock_from_uri: +# # Mock the return value of 'from_uri' function +# mock_catalog_adapter = mock_from_uri.return_value + +# # Call the process_file function +# await process_file( +# file_path, +# tiled_config, +# tiled_config_tree_path, +# path_prefix, +# ) + +# # Assert that the file was registered with the catalog +# mock_catalog_adapter.get_entry.assert_called_once_with(file_path, prefix=path_prefix) diff --git a/src/tiled_ingestor/activemq/__init__.py b/src/tiled_ingestor/activemq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tiled_ingestor/activemq/consumer.py b/src/tiled_ingestor/activemq/consumer.py new file mode 100644 index 0000000..0b18073 --- /dev/null +++ b/src/tiled_ingestor/activemq/consumer.py @@ -0,0 +1,59 @@ +import asyncio +from collections import deque +import json +from time import sleep +import os + +import stomp + +from .schemas import DIAMOND_FILEPATH_KEY, DIAMOND_STATUS_KEY, STOMP_TOPIC_NAME +from ..ingest import get_tiled_config, process_file +import logging + +TILED_INGEST_TILED_CONFIG_PATH = os.getenv("TILED_INGEST_TILED_CONFIG_PATH") +STOMP_SERVER = os.getenv("STOMP_SERVER") + +STOMP_LOG_LEVEL = os.getenv("STOMP_LOG_LEVEL", "INFO") +logging.getLogger("stomp").setLevel(logging.getLevelName(STOMP_LOG_LEVEL)) +logging.getLogger("asyncio").setLevel(logging.INFO) + + +class ScanListener(stomp.ConnectionListener): + def __init__(self): + self.messages = deque() + + def on_error(self, message): + print("received an error %s" % message) + + def on_message(self, message): + # this might be a difference in version of stomp from what + # diamond is using. In their example, messsage and headers are + # separate parameters. But in the version I'm using, the message + # is an object that contains body and headers + ob = json.loads(message.body) + # if (ob["status"] == "STARTED"): + if ob[DIAMOND_STATUS_KEY] == "COMPLETE": + self.messages.append(ob[DIAMOND_FILEPATH_KEY]) + + +def start_consumer(): + tiled_config = get_tiled_config(TILED_INGEST_TILED_CONFIG_PATH) + conn = stomp.Connection([(STOMP_SERVER, 61613)]) + scan_listener = ScanListener() + conn.set_listener("", scan_listener) + conn.connect() + conn.subscribe(destination=STOMP_TOPIC_NAME, id=1, ack="auto") + while True: + if scan_listener.messages: + new_file_path = scan_listener.messages.popleft() + try: + asyncio.run(process_file(new_file_path, tiled_config)) + except Exception as e: + print("Failed to process file " + new_file_path) + print(str(e)) + + sleep(1) + + +if __name__ == "__main__": + start_consumer() diff --git a/src/tiled_ingestor/activemq/producer.py b/src/tiled_ingestor/activemq/producer.py new file mode 100644 index 0000000..e7d5ca0 --- /dev/null +++ b/src/tiled_ingestor/activemq/producer.py @@ -0,0 +1,38 @@ +import json +import logging +import os +import sys + +import stomp + +from .schemas import DIAMOND_FILEPATH_KEY, DIAMOND_STATUS_KEY, STOMP_TOPIC_NAME + +STOMP_LOG_LEVEL = os.getenv("STOMP_LOG_LEVEL", "INFO") +STOMP_SERVER = os.getenv("STOMP_SERVER") +logging.getLogger("stomp").setLevel(logging.getLevelName(STOMP_LOG_LEVEL)) + + +def send_message(new_file: str): + logging.info(f"Received request for {new_file}") + json_message = json.dumps( + {DIAMOND_STATUS_KEY: "COMPLETE", DIAMOND_FILEPATH_KEY: new_file} + ) + + conn = stomp.Connection([(STOMP_SERVER, 61613)]) + conn.connect(wait=True) + + conn.send( + body=json_message, + destination=STOMP_TOPIC_NAME, + headers={"content-type": "application/json"}, + ) + + conn.disconnect() + + +if __name__ == "__main__": + if len(sys.argv) == 1: + new_file = "../mlex_tomo_framework/data/tiled_storage/recons/rec20240207_120829_test_no_xrays_n1313" + else: + new_file = sys.argv[1] + send_message(new_file=new_file) diff --git a/src/tiled_ingestor/activemq/schemas.py b/src/tiled_ingestor/activemq/schemas.py new file mode 100644 index 0000000..3ebdade --- /dev/null +++ b/src/tiled_ingestor/activemq/schemas.py @@ -0,0 +1,3 @@ +DIAMOND_FILEPATH_KEY = "filePath" +DIAMOND_STATUS_KEY = "status" +STOMP_TOPIC_NAME = "/topic/org.eclipse.scanning.status.topic" From 23c243f42ea29142f45d046d8fdbb6ca6efa3a6b Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sat, 2 Mar 2024 13:14:26 -0800 Subject: [PATCH 3/3] fix unittest --- src/tiled_ingestor/_tests/test_consumer.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/tiled_ingestor/_tests/test_consumer.py b/src/tiled_ingestor/_tests/test_consumer.py index ce82548..2b151c7 100644 --- a/src/tiled_ingestor/_tests/test_consumer.py +++ b/src/tiled_ingestor/_tests/test_consumer.py @@ -20,10 +20,6 @@ def test_on_message(listener, monkeypatch): message = MagicMock() message.body = '{"status": "COMPLETE", "filePath": "/path/to/file"}' - def mock_process_file(file_path): - assert file_path == "/path/to/file" - - monkeypatch.setattr(process_file, "delay", mock_process_file) listener.on_message(message) assert listener.messages == deque(['/path/to/file'])