Skip to content

Commit

Permalink
Merge pull request #7 from dylanmcreynolds/activemq
Browse files Browse the repository at this point in the history
Add activemq implementation
  • Loading branch information
dylanmcreynolds committed Mar 4, 2024
2 parents 88ae305 + 23c243f commit dfdc344
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [

dependencies = [
"pika",
"stomp.py",
"tiled[server]==0.1.0a114",
"python-dotenv"
]
Expand All @@ -30,6 +31,7 @@ dynamic = ["version"]
[project.optional-dependencies]
dev = [
"pytest",
"pytest-asyncio",
"pre-commit",
"flake8"
]
Expand Down
26 changes: 26 additions & 0 deletions src/tiled_ingestor/_tests/test_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
from unittest.mock import MagicMock
from collections import deque

from tiled_ingestor.activemq.consumer import ScanListener


@pytest.fixture
def listener():
return ScanListener()


def test_on_error(listener):
message = "Error message"
listener.on_error(message)
assert listener.messages == deque()


def test_on_message(listener, monkeypatch):
message = MagicMock()
message.body = '{"status": "COMPLETE", "filePath": "/path/to/file"}'


listener.on_message(message)
assert listener.messages == deque(['/path/to/file'])

48 changes: 48 additions & 0 deletions src/tiled_ingestor/_tests/test_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# import asyncio
# from unittest.mock import patch

import pytest
# from tiled_ingestor.ingest import process_file, get_tiled_config


@pytest.fixture
def tiled_config(tmpdir):
return {
"trees": [
{
"path": "/",
"tree": "catalog",
"args": {
"uri": f"sqlite+aiosqlite:///{tmpdir}/catalog.db",
"writable_storage": str(tmpdir / "data"),
"readable_storage": [str(tmpdir / "data")],
"init_if_not_exists": True,
},
}
]
}


# this test doesn't quite work, but since we'll be converting to the
# client soon, I'll wait

# @pytest.mark.asyncio
# async def test_process_file(tiled_config):
# file_path = "/path/to/file"
# tiled_config_tree_path = "/"
# path_prefix = "/"

# with patch("tiled.catalog.from_uri") as mock_from_uri:
# # Mock the return value of 'from_uri' function
# mock_catalog_adapter = mock_from_uri.return_value

# # Call the process_file function
# await process_file(
# file_path,
# tiled_config,
# tiled_config_tree_path,
# path_prefix,
# )

# # Assert that the file was registered with the catalog
# mock_catalog_adapter.get_entry.assert_called_once_with(file_path, prefix=path_prefix)
Empty file.
59 changes: 59 additions & 0 deletions src/tiled_ingestor/activemq/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
from collections import deque
import json
from time import sleep
import os

import stomp

from .schemas import DIAMOND_FILEPATH_KEY, DIAMOND_STATUS_KEY, STOMP_TOPIC_NAME
from ..ingest import get_tiled_config, process_file
import logging

TILED_INGEST_TILED_CONFIG_PATH = os.getenv("TILED_INGEST_TILED_CONFIG_PATH")
STOMP_SERVER = os.getenv("STOMP_SERVER")

STOMP_LOG_LEVEL = os.getenv("STOMP_LOG_LEVEL", "INFO")
logging.getLogger("stomp").setLevel(logging.getLevelName(STOMP_LOG_LEVEL))
logging.getLogger("asyncio").setLevel(logging.INFO)


class ScanListener(stomp.ConnectionListener):
def __init__(self):
self.messages = deque()

def on_error(self, message):
print("received an error %s" % message)

def on_message(self, message):
# this might be a difference in version of stomp from what
# diamond is using. In their example, messsage and headers are
# separate parameters. But in the version I'm using, the message
# is an object that contains body and headers
ob = json.loads(message.body)
# if (ob["status"] == "STARTED"):
if ob[DIAMOND_STATUS_KEY] == "COMPLETE":
self.messages.append(ob[DIAMOND_FILEPATH_KEY])


def start_consumer():
tiled_config = get_tiled_config(TILED_INGEST_TILED_CONFIG_PATH)
conn = stomp.Connection([(STOMP_SERVER, 61613)])
scan_listener = ScanListener()
conn.set_listener("", scan_listener)
conn.connect()
conn.subscribe(destination=STOMP_TOPIC_NAME, id=1, ack="auto")
while True:
if scan_listener.messages:
new_file_path = scan_listener.messages.popleft()
try:
asyncio.run(process_file(new_file_path, tiled_config))
except Exception as e:
print("Failed to process file " + new_file_path)
print(str(e))

sleep(1)


if __name__ == "__main__":
start_consumer()
38 changes: 38 additions & 0 deletions src/tiled_ingestor/activemq/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json
import logging
import os
import sys

import stomp

from .schemas import DIAMOND_FILEPATH_KEY, DIAMOND_STATUS_KEY, STOMP_TOPIC_NAME

STOMP_LOG_LEVEL = os.getenv("STOMP_LOG_LEVEL", "INFO")
STOMP_SERVER = os.getenv("STOMP_SERVER")
logging.getLogger("stomp").setLevel(logging.getLevelName(STOMP_LOG_LEVEL))


def send_message(new_file: str):
logging.info(f"Received request for {new_file}")
json_message = json.dumps(
{DIAMOND_STATUS_KEY: "COMPLETE", DIAMOND_FILEPATH_KEY: new_file}
)

conn = stomp.Connection([(STOMP_SERVER, 61613)])
conn.connect(wait=True)

conn.send(
body=json_message,
destination=STOMP_TOPIC_NAME,
headers={"content-type": "application/json"},
)

conn.disconnect()


if __name__ == "__main__":
if len(sys.argv) == 1:
new_file = "../mlex_tomo_framework/data/tiled_storage/recons/rec20240207_120829_test_no_xrays_n1313"
else:
new_file = sys.argv[1]
send_message(new_file=new_file)
3 changes: 3 additions & 0 deletions src/tiled_ingestor/activemq/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DIAMOND_FILEPATH_KEY = "filePath"
DIAMOND_STATUS_KEY = "status"
STOMP_TOPIC_NAME = "/topic/org.eclipse.scanning.status.topic"

0 comments on commit dfdc344

Please sign in to comment.