Skip to content

Commit

Permalink
feat: reporter that publishes to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
maffettone committed Apr 3, 2024
1 parent 14285cd commit 10f9593
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion pdf_agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def get_beamline_objects() -> dict:
)

kafka_producer = Publisher(
topic=f"{beamline_tla}.bluesky.adjudicators",
topic=f"{beamline_tla}.mmm.bluesky.adjudicators",
bootstrap_servers=",".join(kafka_config["bootstrap_servers"]),
key="{beamline_tla}.key",
producer_config=kafka_config["runengine_producer_config"],
Expand Down Expand Up @@ -290,3 +290,42 @@ def tell(self, x, y) -> Dict[str, ArrayLike]:
doc = super().tell(x, y)
doc["background"] = self.background
return doc


class PDFReporterMixin:
"""Mixin for sending reports to Kafka as well as Tiled.
Parameters
----------
report_producer : Publisher
Bluesky Kafka publisher to produce document stream of agent reports.
Examples
--------
>>> class PassiveKmeansAgentReporter(PDFReporterMixin, PassiveKmeansAgent)
>>> agent = PassiveKmeansAgentReporter(report_producer=Publisher(...), k_clusters=3)
"""

def __init__(self, *args, report_producer: Publisher, **kwargs):
self._report_producer = report_producer
super().__init__(*args, **kwargs)

def generate_report(self, **kwargs):
doc = self.report(**kwargs)
uid = self._write_event("report", doc)
self._report_producer("report", doc)
logger.info(f"Generated report. Tiled: {uid}\n Kafka: {doc.get('uid', 'No UID')}")

@classmethod
def get_beamline_objects(cls) -> dict:
ret = super().get_beamline_objects()
beamline_tla = "pdf"
kafka_config = nslsii.kafka_utils._read_bluesky_kafka_config_file(
config_file_path="/etc/bluesky/kafka.yml"
)
ret["report_producer"] = Publisher(
topic=f"{beamline_tla}.mmm.bluesky.agents",
bootstrap_servers=",".join(kafka_config["bootstrap_servers"]),
key="{beamline_tla}.key",
producer_config=kafka_config["runengine_producer_config"],
)

0 comments on commit 10f9593

Please sign in to comment.