Skip to content

Commit

Permalink
fix: stream events according to the event model
Browse files Browse the repository at this point in the history
  • Loading branch information
maffettone committed Apr 11, 2024
1 parent 559abc4 commit ee298e6
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions pdf_agents/agents.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ast
import copy
import os
import time as ttime
import uuid
from abc import ABC
from logging import getLogger
Expand All @@ -10,7 +11,7 @@
import numpy as np
import redis
import tiled
from bluesky_adaptive.agents.base import Agent, AgentConsumer
from bluesky_adaptive.agents.base import Agent, AgentConsumer, infer_data_keys
from bluesky_adaptive.agents.simple import SequentialAgentBase
from bluesky_kafka import Publisher
from bluesky_queueserver_api.http import REManagerAPI
Expand Down Expand Up @@ -395,10 +396,24 @@ def __init__(self, *args, report_producer: Publisher, **kwargs):
self._report_producer = report_producer
super().__init__(*args, **kwargs)

def _stream_event(self, doc, uid):
stream = "report"
if stream not in self._compose_descriptor_bundles:
data_keys = infer_data_keys(doc)
self._compose_descriptor_bundles[stream] = self._compose_run_bundle.compose_descriptor(
name=stream, data_keys=data_keys
)
self._report_producer("descriptor", self._compose_descriptor_bundles[stream].descriptor_doc)
t = ttime.time()
event_doc = self._compose_descriptor_bundles[stream].compose_event(
data=doc, timestamps={k: t for k in doc}, uid=uid
)
self._report_producer("event", event_doc)

def generate_report(self, **kwargs):
doc = self.report(**kwargs)
uid = self._write_event("report", doc)
self._report_producer("report", doc)
self._stream_event(doc, uid)
logger.info(f"Generated report. Tiled: {uid}\n Kafka: {doc.get('uid', 'No UID')}")
self.close_and_restart(clear_tell_cache=False, retell_all=False, reason="Per-Run Subscribers")

Expand Down

0 comments on commit ee298e6

Please sign in to comment.