Skip to content

Commit

Permalink
fix: write all docs to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
maffettone committed Apr 11, 2024
1 parent ee298e6 commit 0e33411
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions pdf_agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,25 +396,41 @@ def __init__(self, *args, report_producer: Publisher, **kwargs):
self._report_producer = report_producer
super().__init__(*args, **kwargs)

def _stream_event(self, doc, uid):
stream = "report"
def start(self, *args, **kwargs):
super().start(*args, **kwargs)
self._report_producer("start", self._compose_run_bundle.start_doc)

def stop(self, *args, **kwargs):
super().stop(*args, **kwargs)
self._report_producer("stop", self._compose_run_bundle.stop_doc)

def _write_event(self, stream, doc, uid=None):
"""Add event to builder as event page, and publish to catalog
Taken from bluesky adaptive and modified to write to kafka as well as tiled.
TODO: OVERRIDE FROM ADAPTIVE. MAKE PR TO FIX UPSTREAM.
"""
if not doc:
logger.info(f"No doc presented to write_event for stream {stream}")
return
if stream not in self._compose_descriptor_bundles:
data_keys = infer_data_keys(doc)
self._compose_descriptor_bundles[stream] = self._compose_run_bundle.compose_descriptor(
name=stream, data_keys=data_keys
)
self.agent_catalog.v1.insert("descriptor", self._compose_descriptor_bundles[stream].descriptor_doc)
self._report_producer("descriptor", self._compose_descriptor_bundles[stream].descriptor_doc)

t = ttime.time()
event_doc = self._compose_descriptor_bundles[stream].compose_event(
data=doc, timestamps={k: t for k in doc}, uid=uid
)
self.agent_catalog.v1.insert("event", event_doc)
self._report_producer("event", event_doc)

return event_doc["uid"]

def generate_report(self, **kwargs):
doc = self.report(**kwargs)
uid = self._write_event("report", doc)
self._stream_event(doc, uid)
logger.info(f"Generated report. Tiled: {uid}\n Kafka: {doc.get('uid', 'No UID')}")
super().generate_report(**kwargs)
self.close_and_restart(clear_tell_cache=False, retell_all=False, reason="Per-Run Subscribers")

@classmethod
Expand Down

0 comments on commit 0e33411

Please sign in to comment.