Skip to content

Commit

Permalink
updates from real time testing
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanmcreynolds committed Mar 12, 2024
1 parent dfdc344 commit cf58c5d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 38 deletions.
17 changes: 15 additions & 2 deletions src/tiled_ingestor/activemq/consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from collections import deque
import json
from pathlib import Path
from time import sleep
import os

Expand All @@ -12,10 +13,16 @@

TILED_INGEST_TILED_CONFIG_PATH = os.getenv("TILED_INGEST_TILED_CONFIG_PATH")
STOMP_SERVER = os.getenv("STOMP_SERVER")

STOMP_LOG_LEVEL = os.getenv("STOMP_LOG_LEVEL", "INFO")

logging.getLogger("stomp").setLevel(logging.getLevelName(STOMP_LOG_LEVEL))
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("aiosqlite").setLevel(logging.INFO)

logger = logging.getLogger("activemq_consumer")
logger.info(f"TILED_INGEST_TILED_CONFIG_PATH: {TILED_INGEST_TILED_CONFIG_PATH} ")
logger.info(f"STOMPSERVER: {STOMP_SERVER} ")
logger.info(f"STOMP_TOPIC_NAME: {STOMP_TOPIC_NAME} ")


class ScanListener(stomp.ConnectionListener):
Expand All @@ -31,8 +38,10 @@ def on_message(self, message):
# separate parameters. But in the version I'm using, the message
# is an object that contains body and headers
ob = json.loads(message.body)
logger.info(f"Received message: {ob['status']}")
# if (ob["status"] == "STARTED"):
if ob[DIAMOND_STATUS_KEY] == "COMPLETE":
print(ob)
self.messages.append(ob[DIAMOND_FILEPATH_KEY])


Expand All @@ -47,7 +56,11 @@ def start_consumer():
if scan_listener.messages:
new_file_path = scan_listener.messages.popleft()
try:
asyncio.run(process_file(new_file_path, tiled_config))
logger.info(f"Ingesting file: {new_file_path}")
# we get a path to the nexus file, but we want the TiffSaver_3 file next to it
nxs_path = Path(new_file_path)
tiff_path = nxs_path.parent / Path("TiffSaver_3")
asyncio.run(process_file(str(tiff_path), tiled_config, path_prefix="reconstructions"))
except Exception as e:
print("Failed to process file " + new_file_path)
print(str(e))
Expand Down
45 changes: 12 additions & 33 deletions src/tiled_ingestor/activemq/producer.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,17 @@
import stomp
import json
import logging
import os
import sys

import stomp

from .schemas import DIAMOND_FILEPATH_KEY, DIAMOND_STATUS_KEY, STOMP_TOPIC_NAME

STOMP_LOG_LEVEL = os.getenv("STOMP_LOG_LEVEL", "INFO")
STOMP_SERVER = os.getenv("STOMP_SERVER")
logging.getLogger("stomp").setLevel(logging.getLevelName(STOMP_LOG_LEVEL))


def send_message(new_file: str):
logging.info(f"Received request for {new_file}")
json_message = json.dumps(
{DIAMOND_STATUS_KEY: "COMPLETE", DIAMOND_FILEPATH_KEY: new_file}
)

conn = stomp.Connection([(STOMP_SERVER, 61613)])
conn.connect(wait=True)

conn.send(
body=json_message,
destination=STOMP_TOPIC_NAME,
headers={"content-type": "application/json"},
)

conn.disconnect()
conn = stomp.Connection([("k11-control", 61613)],auto_content_length=False)
# conn.start()
conn.connect()
print('Connected!')

# replace thhe string below with the desired filepatha
filepath = "//dls/k11/data/2024/mg37376-1/processing/20240311120527_37086/k11-37086_processed.nxs"
message = json.dumps({'filePath': filepath})
destination = '/topic/org.dawnsci.file.topic'
conn.send(destination, message, ack='auto')

if __name__ == "__main__":
if len(sys.argv) == 1:
new_file = "../mlex_tomo_framework/data/tiled_storage/recons/rec20240207_120829_test_no_xrays_n1313"
else:
new_file = sys.argv[1]
send_message(new_file=new_file)
print('Exiting...')
sys.exit(1)
9 changes: 6 additions & 3 deletions src/tiled_ingestor/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,14 @@ async def process_file(
import os

pprint(os.environ)
tiled_config = get_tiled_config("../mlex_tomo_framework/tiled/deploy/config")
config_path = os.getenv("TILED_INGEST_TILED_CONFIG_PATH")
tiled_config = get_tiled_config("../mlex_tomo_framework/tiled_conda/deploy/config")
asyncio.run(
process_file(
"/tiled_storage/recons/rec20240207_120550_test_no_xrays_n257",
# "/dls/k11/data/2024/mg37376-2/processing/mg32801-1/processed/Savu_k11-37074_full_fd_Fresnel_rmrings_vo_AST_tiff/TiffSaver_5",
# "/dls/tmp/mlex/mlex_tomo_framework/data/tiled_storage/recons/rec20240207_120550_test_no_xrays_n257",
"/dls/k11/data/2024/mg37376-1/processed/Savu_k11-38639_3x_fd_vo_AST_tiff/TiffSaver_3",
tiled_config,
path_prefix="/recons",
path_prefix="reconstructions",
)
)

0 comments on commit cf58c5d

Please sign in to comment.