Skip to content

Commit

Permalink
Merge pull request #2 from dylanmcreynolds/waskly_wabbit
Browse files Browse the repository at this point in the history
Ingest via rabbitMQ
  • Loading branch information
dylanmcreynolds committed Feb 19, 2024
2 parents afeccd3 + 6218e9f commit 26b2f38
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 99 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ RUN pip install --no-cache-dir --upgrade .



# CMD ["python", "main.py"]s
CMD ["python", "-m", "tiled_ingestor.rabbitmq.consumer"]
40 changes: 0 additions & 40 deletions src/rabbitmq/consumer.py

This file was deleted.

43 changes: 0 additions & 43 deletions src/rabbitmq/producer.py

This file was deleted.

1 change: 0 additions & 1 deletion src/rabbitmq/schemas.py

This file was deleted.

File renamed without changes.
19 changes: 19 additions & 0 deletions src/tiled_ingestor/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging
import os

logger = logging.getLogger(__name__)

TILED_INGEST_PIKA_LOG_LEVEL = os.get_env(
"TILED_INGEST_PIKE_LOG_LEVEL", logging.CRITICAL
)
TILED_INGEST_RMQ_HOST = os.getenv("TILED_INGEST_RMQ_HOST")
TILED_INGEST_RMQ_USER = os.getenv("TILED_INGEST_RMQ_USER")
TILED_INGEST_RMQ_PW = os.getenv("TILED_INGEST_RMQ_PW")

TILED_INGEST_TILED_CONFIG_PATH = os.getenv("TILED_INGEST_TILED_CONFIG_PATH")

logger.info(f"TILED_INGEST_PIKA_LOG_LEVEL {TILED_INGEST_PIKA_LOG_LEVEL}")
logger.info(f"TILED_INGEST_RMQ_HOST {TILED_INGEST_RMQ_HOST}")
logger.info(f"TILED_INGEST_RMQ_USER {TILED_INGEST_RMQ_USER}")
logger.info("TILED_INGEST_RMQ_PW not logged")
logger.info(f"TILED_INGEST_TILED_CONFIG_PATH {TILED_INGEST_TILED_CONFIG_PATH}")
45 changes: 31 additions & 14 deletions src/ingest.py → src/tiled_ingestor/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


def get_tiled_config(config_path: str):
return tiled.config.parse_configs(config_path)


async def process_file(
file_path: str,
tiled_config: dict,
tiled_config_tree_path: str = "/",
config_path: str = "/deploy/config",
path_prefix: str = "/"
path_prefix: str = "/",
):
"""
Process a file that already exists and register it with tiled as a catalog.
Process a file that already exists and register it with tiled as a catalog.
We looks for a match in the tiled config file based on tiled_config_tree_path. This will be
the tree that we import to. Should work with folders of TIFF sequence as well as single filed like
the tree that we import to. Should work with folders of TIFF sequence as well as single filed like
hdf5 or datasets like zarr. But honestly, on tiff sequence is tested.
Args:
Expand All @@ -36,12 +40,19 @@ async def process_file(
Returns:
None
"""
config = tiled.config.parse_configs(config_path)

# find the tree in tiled configuration that matches the provided tiled_tree_path
matching_tree = next(
(tree for tree in config["trees"] if tree["path"] == tiled_config_tree_path), None
(
tree
for tree in tiled_config["trees"]
if tree["path"] == tiled_config_tree_path
),
None,
)
assert matching_tree, f"No tiled tree configured for tree path {tiled_config_tree_path}"
assert (
matching_tree
), f"No tiled tree configured for tree path {tiled_config_tree_path}"
assert (
matching_tree["tree"] == "catalog"
), f"Matching tiled tree {tiled_config_tree_path} is not a catalog"
Expand All @@ -50,7 +61,7 @@ async def process_file(
catalog_adapter = from_uri(
matching_tree["args"]["uri"],
readable_storage=matching_tree["args"]["readable_storage"],
adapters_by_mimetype=matching_tree["args"].get("adapters_by_mimetype")
adapters_by_mimetype=matching_tree["args"].get("adapters_by_mimetype"),
)

# Register with tiled. This writes entries into the database for all of the nodes down to the data node
Expand All @@ -59,31 +70,37 @@ async def process_file(
key_from_filename=identity,
path=file_path,
prefix=path_prefix,
overwrite=False)
overwrite=False,
)


if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "outside_container":
# if we're debugging this outside of a container, we might want our
# own settings
import dotenv

dotenv.load_dotenv()
tiled_config = get_tiled_config("../mlex_tomo_framework/tiled/deploy/config")
asyncio.run(
process_file(
"../mlex_tomo_framework/data/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313",
config_path="../mlex_tomo_framework/tiled/deploy/config",
path_prefix="/beamlines/8.3.2/recons/"
tiled_config,
path_prefix="/beamlines/8.3.2/recons/",
)
)
else:
from pprint import pprint
import os

pprint(os.environ)
tiled_config = get_tiled_config(
"/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313"
)
asyncio.run(
process_file(
# "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257",
"/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313",
path_prefix="/beamlines/8.3.2/recons/"
tiled_config,
path_prefix="/beamlines/8.3.2/recons/",
)
)

File renamed without changes.
66 changes: 66 additions & 0 deletions src/tiled_ingestor/rabbitmq/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio
import logging
import json
import os

import pika

# import config
from ..ingest import get_tiled_config, process_file

import tiled_ingestor.rabbitmq.schemas as schemas

TILED_INGEST_PIKA_LOG_LEVEL = os.getenv("TILED_INGEST_PIKE_LOG_LEVEL", logging.CRITICAL)
logging.basicConfig(level=logging.CRITICAL)
logging.getLogger("pika").setLevel(TILED_INGEST_PIKA_LOG_LEVEL)

TILED_INGEST_RMQ_HOST = os.getenv("TILED_INGEST_RMQ_HOST")
TILED_INGEST_RMQ_USER = os.getenv("TILED_INGEST_RMQ_USER")
TILED_INGEST_RMQ_PW = os.getenv("TILED_INGEST_RMQ_PW")

TILED_INGEST_TILED_CONFIG_PATH = os.getenv("TILED_INGEST_TILED_CONFIG_PATH")
tiled_config = get_tiled_config(TILED_INGEST_TILED_CONFIG_PATH)


def process_message(ch, method, properties, body):
# Decode the JSON message
message = json.loads(body)
# Prcess the message
# TODO: Add your logic here

# Acknowledge the message
ch.basic_ack(delivery_tag=method.delivery_tag)
new_file_path = message.get(schemas.NEW_FILE_PATH_KEY)
assert (
new_file_path
), f"Message received from rabbitMQ does not contain {schemas.NEW_FILE_PATH_KEY}"
asyncio.run(process_file(new_file_path, tiled_config))


def start_consumer():
# Connect to RabbitMQ

credentials = pika.PlainCredentials(TILED_INGEST_RMQ_USER, TILED_INGEST_RMQ_PW)
parameters = pika.ConnectionParameters(
TILED_INGEST_RMQ_HOST, credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# # Declare the queue
channel.queue_declare(queue=schemas.RABBITMQ_TOMO_QUEUE)

# Set the prefetch count to limit the number of unacknowledged messages
channel.basic_qos(prefetch_count=1)

# Start consuming messages
channel.basic_consume(
queue="tomo_reconstruction", on_message_callback=process_message
)

# Enter a loop to continuously consume messages
channel.start_consuming()


if __name__ == "__main__":
start_consumer()
59 changes: 59 additions & 0 deletions src/tiled_ingestor/rabbitmq/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
import logging
import os
import sys

import pika
from pika import DeliveryMode

# from pika.exchange_type import ExchangeType

# import config
import schemas

TILED_INGEST_PIKA_LOG_LEVEL = os.getenv("TILED_INGEST_PIKE_LOG_LEVEL", logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("pika").setLevel(TILED_INGEST_PIKA_LOG_LEVEL)


TILED_INGEST_RMQ_HOST = os.getenv("TILED_INGEST_RMQ_HOST")
TILED_INGEST_RMQ_USER = os.getenv("TILED_INGEST_RMQ_USER")
TILED_INGEST_RMQ_PW = os.getenv("TILED_INGEST_RMQ_PW")


def send_message(new_file: str):
logging.info(f"Received request for {new_file}")
json_message = json.dumps({schemas.NEW_FILE_PATH_KEY: new_file})

credentials = pika.PlainCredentials(TILED_INGEST_RMQ_USER, TILED_INGEST_RMQ_PW)
parameters = pika.ConnectionParameters(
TILED_INGEST_RMQ_HOST, credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# channel.exchange_declare(
# exchange=schemas.RABBITMQ_EXCHANGE,
# exchange_type=ExchangeType.direct,
# passive=False,
# durable=True,
# auto_delete=False,
# )

logging.info(f"Sending {json_message} to queue {schemas.RABBITMQ_TOMO_QUEUE}")
channel.basic_publish(
"",
schemas.RABBITMQ_TOMO_QUEUE,
json_message,
pika.BasicProperties(
content_type="application/json", delivery_mode=DeliveryMode.Transient
),
)
connection.close()


if __name__ == "__main__":
if len(sys.argv) == 1:
new_file = "../mlex_tomo_framework/data/tiled_storage/recons/rec20240207_120829_test_no_xrays_n1313"
else:
new_file = sys.argv[1]
send_message(new_file=new_file)
3 changes: 3 additions & 0 deletions src/tiled_ingestor/rabbitmq/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
NEW_FILE_PATH_KEY = "new_file_path"
RABBITMQ_EXCHANGE = "mlexchange_exchange"
RABBITMQ_TOMO_QUEUE = "tomo_reconstruction"

0 comments on commit 26b2f38

Please sign in to comment.