diff --git a/Dockerfile b/Dockerfile index 38b6a9e..f86a71e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ FROM python:3.10-slim-buster -ENV PYTHONUNBUFFERED True +ENV PYTHONUNBUFFERED=True -#ENV PYTHONASYINCIODEBUG 1 +#ENV PYTHONASYINCIODEBUG=1 WORKDIR /app diff --git a/src/detector.yaml b/src/detector.yaml deleted file mode 100644 index 16a9981..0000000 --- a/src/detector.yaml +++ /dev/null @@ -1,329 +0,0 @@ -LATISS: - detectors: - 0: True -LSSTComCam: - detectors: - 0: True - 1: True - 2: True - 3: True - 4: True - 5: True - 6: True - 7: True - 8: True -LSSTCam: - detectors: - 0: False - 1: False - 2: False - 3: False - 4: False - 5: False - 6: False - 7: False - 8: False - 9: False - 10: False - 11: False - 12: False - 13: False - 14: False - 15: False - 16: False - 17: False - 18: False - 19: False - 20: False - 21: False - 22: False - 23: False - 24: False - 25: False - 26: False - 27: False - 28: False - 29: False - 30: False - 31: False - 32: False - 33: False - 34: False - 35: False - 36: False - 37: False - 38: False - 39: False - 40: False - 41: False - 42: False - 43: False - 44: False - 45: False - 46: False - 47: False - 48: False - 49: False - 50: False - 51: False - 52: False - 53: False - 54: False - 55: False - 56: False - 57: False - 58: False - 59: False - 60: False - 61: False - 62: False - 63: False - 64: False - 65: False - 66: False - 67: False - 68: False - 69: False - 70: False - 71: False - 72: False - 73: False - 74: False - 75: False - 76: False - 77: False - 78: False - 79: False - 80: False - 81: False - 82: False - 83: False - 84: False - 85: False - 86: False - 87: False - 88: False - 89: False - 90: False - 91: False - 92: False - 93: False - 94: False - 95: False - 96: False - 97: False - 98: False - 99: False - 100: False - 101: False - 102: False - 103: False - 104: False - 105: False - 106: False - 107: False - 108: False - 109: False - 110: False - 111: False - 112: False - 113: False - 114: False - 115: False - 116: False - 117: False - 118: False - 119: False - 120: False - 121: False - 122: False - 123: False - 124: False - 125: False - 126: False - 127: False - 128: False - 129: False - 130: False - 131: False - 132: False - 133: False - 134: False - 135: False - 136: False - 137: False - 138: False - 139: False - 140: False - 141: False - 142: False - 143: False - 144: False - 145: False - 146: False - 147: False - 148: False - 149: False - 150: False - 151: False - 152: False - 153: False - 154: False - 155: False - 156: False - 157: False - 158: False - 159: False - 160: False - 161: False - 162: False - 163: False - 164: False - 165: False - 166: False - 167: False - 168: False - 169: False - 170: False - 171: False - 172: False - 173: False - 174: False - 175: False - 176: False - 177: False - 178: False - 179: False - 180: False - 181: False - 182: False - 183: False - 184: False - 185: False - 186: False - 187: False - 188: False -HSC: - detectors: - 0: True - 1: True - 2: True - 3: True - 4: True - 5: True - 6: True - 7: True - 8: True - 9: False - 10: True - 11: True - 12: True - 13: True - 14: True - 15: True - 16: True - 17: True - 18: True - 19: True - 20: True - 21: True - 22: True - 23: True - 24: True - 25: True - 26: True - 27: True - 28: True - 29: True - 30: True - 31: True - 32: True - 33: True - 34: True - 35: True - 36: True - 37: True - 38: True - 39: True - 40: True - 41: True - 42: True - 43: True - 44: True - 45: True - 46: True - 47: True - 48: True - 49: True - 50: True - 51: True - 52: True - 53: True - 54: True - 55: True - 56: True - 57: True - 58: True - 59: True - 60: True - 61: True - 62: True - 63: True - 64: True - 65: True - 66: True - 67: True - 68: True - 69: True - 70: True - 71: True - 72: True - 73: True - 74: True - 75: True - 76: True - 77: True - 78: True - 79: True - 80: True - 81: True - 82: True - 83: True - 84: True - 85: True - 86: True - 87: True - 88: True - 89: True - 90: True - 91: True - 92: True - 93: True - 94: True - 95: True - 96: True - 97: True - 98: True - 99: True - 100: True - 101: True - 102: True - 103: True -HSC-TEST-59134: - detectors: - 0: True - 4: True - 5: True -HSC-TEST-59142: - detectors: - 0: True - 5: True - 11: True -HSC-TEST-59150: - detectors: - 50: True - 58: True -HSC-TEST-59160: - detectors: - 43: True - 51: True diff --git a/src/main.py b/src/main.py index a6ef077..244b943 100644 --- a/src/main.py +++ b/src/main.py @@ -1,4 +1,5 @@ import asyncio +import collections.abc import dataclasses import datetime import json @@ -30,7 +31,7 @@ class NextVisitModel: instrument: str groupId: str coordinateSystem: int - position: typing.List[int] + position: list[float] startTime: float rotationSystem: int cameraAngle: float @@ -42,25 +43,58 @@ class NextVisitModel: totalCheckpoints: int private_sndStamp: float + @staticmethod + def from_raw_message(message: dict[str, typing.Any]): + """Factory creating a NextVisitModel from an unpacked message. + + Parameters + ---------- + message : `dict` [`str`] + A mapping containing message fields. + + Returns + ------- + model : `NextVisitModel` + An object containing the fields in the message. + """ + # Message may contain fields that aren't in NextVisitModel + return NextVisitModel( + salIndex=message["salIndex"], + scriptSalIndex=message["scriptSalIndex"], + instrument=message["instrument"], + groupId=message["groupId"], + coordinateSystem=message["coordinateSystem"], + position=message["position"], + startTime=message["startTime"], + rotationSystem=message["rotationSystem"], + cameraAngle=message["cameraAngle"], + filters=message["filters"], + dome=message["dome"], + duration=message["duration"], + nimages=message["nimages"], + survey=message["survey"], + totalCheckpoints=message["totalCheckpoints"], + private_sndStamp=message["private_sndStamp"], + ) + def add_detectors( self, - message: dict, active_detectors: list, - ) -> list[dict[str, str]]: - """Adds and duplicates next visit messages for fanout. + ) -> list[dict[str, typing.Any]]: + """Adds and duplicates this message for fanout. Parameters ---------- - message: `str` - The next visit message. active_detectors: `list` The active detectors for an instrument. - Yields - ------ - message_list : `list` + + Returns + ------- + message_list : `list` [`dict`] The message list for fan out. """ - message_list: list[dict[str, str]] = [] + message = dataclasses.asdict(self) + message_list: list[dict[str, typing.Any]] = [] for active_detector in active_detectors: temp_message = message.copy() temp_message["detector"] = active_detector @@ -73,28 +107,278 @@ def add_detectors( return message_list -def detector_load(conf: dict, instrument: str) -> list[int]: - """Load active instrument detectors from yaml configiration file of - true false values for each detector. +@dataclasses.dataclass(frozen=True) +class InstrumentConfig: + """The configuration used for sending messages to a specific instrument. Parameters ---------- conf : `dict` - The instrument configuration from the yaml file. - instrument: `str` - The instrument to load detectors for. - Yields + A hierarchical instrument configuration, whose keys are instruments. + instrument : `str` + The instrument to configure. + """ + + instrument: str + """The instrument whose metrics are held by this object (`str`).""" + url: str + """The address of the Knative Serving instance for this instrument (`str`).""" + detectors: collections.abc.Sequence[int] + """The active detectors for this instrument (sequence [`int`]).""" + + def __init__(self, conf, instrument): + super().__setattr__("instrument", instrument) + super().__setattr__("url", conf["knative-urls"][instrument]) + super().__setattr__("detectors", self.detector_load(conf, instrument)) + + @staticmethod + def detector_load(conf: dict, instrument: str) -> list[int]: + """Load active instrument detectors from yaml configiration file of + true false values for each detector. + + Parameters + ---------- + conf : `dict` + The instrument configuration from the yaml file. + instrument : `str` + The instrument to load detectors for. + + Returns + ------- + active_detectors : `list` [`int`] + The active detectors for the instrument. + """ + detectors = conf["detectors"][instrument] + active_detectors: list[int] = [] + for detector, active in detectors.items(): + if active: + active_detectors.append(int(detector)) + return active_detectors + + +@dataclasses.dataclass(frozen=True) +class Metrics: + """A container for all metrics associated with a specific instrument. + + Parameters + ---------- + instrument : `str` + The instrument whose metrics are held by this object. + """ + + instrument: str + """The instrument whose metrics are held by this object (`str`).""" + total_received: Gauge + """The number of incoming messages processed by this instance (`prometheus_client.Gauge`).""" + in_process: Gauge + """The number of fanned-out messages currently being processed (`prometheus_client.Gauge`).""" + + def __init__(self, instrument): + super().__setattr__("instrument", instrument) + word_instrument = instrument.lower().replace(" ", "_") + super().__setattr__("total_received", + Gauge(word_instrument + "_next_visit_messages", + f"next visit messages with {instrument} as instrument")) + super().__setattr__("in_process", + Gauge(word_instrument + "_prompt_processing_in_process_requests", + f"{instrument} in process requests for next visit")) + + +@dataclasses.dataclass(frozen=True) +class Submission: + """The batched requests to be submitted to a Knative instance. + """ + + url: str + """The address of the Knative Serving instance to send requests to (`str`).""" + fan_out_messages: collections.abc.Collection[dict[str, typing.Any]] + """The messages to send to ``url`` (collection [`dict`]).""" + + +class UnsupportedMessageError(RuntimeError): + """Exception raised if there is no Prompt Processing instance for a given + nextVisit message. + """ + pass + + +def is_handleable(message: dict[str, typing.Any], expire: float) -> bool: + """Test whether a nextVisit message has enough data to be handled by + fan-out. + + This function emits explanatory logs as a side effect. + + Parameters + ---------- + message : `dict` [`str`] + An unpacked mapping of message fields. + expire : `float` + The maximum age, in seconds, that a message can still be handled. + + Returns + ------- + handleable : `bool` + `True` is the message can be processed, `False` otherwise. + """ + if not message["instrument"]: + logging.info("Message does not have an instrument. Assuming it's not an observation.") + return False + + # efdStamp is visit publication, in seconds since 1970-01-01 UTC + if message["private_efdStamp"]: + published = message["private_efdStamp"] + age = round(time.time() - published) # Microsecond precision is distracting + if age > expire: + logging.warning("Message published on %s UTC is %s old, ignoring.", + time.ctime(published), + datetime.timedelta(seconds=age) + ) + return False + else: + logging.warning("Message does not have private_efdStamp, can't determine age.") + return True + + +def make_fanned_out_messages(message: NextVisitModel, + instruments: collections.abc.Mapping[str, InstrumentConfig], + gauges: collections.abc.Mapping[str, Metrics], + hsc_upload_detectors: collections.abc.Mapping[int, + collections.abc.Collection[int]] + ) -> Submission: + """Create appropriate fanned-out messages for an incoming message. + + Parameters + ---------- + message : `NextVisitModel` + The message to fan out. + instruments : mapping [`str`, `InstrumentConfig`] + A mapping from instrument name to configuration information. + gauges : mapping [`str`, `Metrics`] + A mapping from instrument name to metrics for that instrument. + hsc_upload_detectors : mapping [`int`, collection [`int`]] + A mapping from HSC-Cosmos visit to the supported detectors for that visit. + + Returns + ------- + send_info : `Submission` + The fanned out messages, along with where to send them. + + Raises ------ - active_detectors : `list` - The active detectors for the instrument. + UnsupportedMessageError + Raised if ``message`` cannot be fanned-out or sent. + """ + match message.instrument: + case "HSC": + # HSC has extra active detector configurations just for the + # upload.py test. + match message.salIndex: + case 999: # HSC datasets from using upload_from_repo.py + gauges["HSC"].total_received.inc() + return fan_out(message, instruments["HSC"]) + case visit if visit in hsc_upload_detectors: # upload.py test datasets + gauges["HSC"].total_received.inc() + return fan_out_hsc(message, instruments["HSC"], hsc_upload_detectors[visit]) + case _: + raise UnsupportedMessageError(f"No matching case for HSC salIndex {message.salIndex}") + case instrument if instrument in instruments: + gauges[instrument].total_received.inc() + return fan_out(message, instruments[instrument]) + case _: + raise UnsupportedMessageError(f"no matching case for instrument {message.instrument}.") + + +def fan_out(next_visit, inst_config): + """Prepare fanned-out messages for sending to the Prompt Processing service. + + Parameters + ---------- + next_visit : `NextVisitModel` + The nextVisit message to fan out. + inst_config : `InstrumentConfig` + The configuration information for the active instrument. + + Returns + ------- + fanned_out : `Submission` + The submission information for the fanned-out messages. """ + return Submission(inst_config.url, next_visit.add_detectors(inst_config.detectors)) + - detectors = conf[instrument]["detectors"] - active_detectors: list[int] = [] - for k, v in detectors.items(): - if v: - active_detectors.append(k) - return active_detectors +def fan_out_hsc(next_visit, inst_config, detectors): + """Prepare fanned-out messages for HSC upload.py. + + Parameters + ---------- + next_visit : `NextVisitModel` + The nextVisit message to fan out. + inst_config : `InstrumentConfig` + The configuration information for the active instrument. + detectors : collection [`int`] + The detectors to send. + + Returns + ------- + fanned_out : `Submission` + The submission information for the fanned-out messages. + """ + return Submission(inst_config.url, next_visit.add_detectors(detectors)) + + +def dispatch_fanned_out_messages(client: httpx.AsyncClient, + topic: str, + tasks: collections.abc.MutableSet[asyncio.Task], + send_info: Submission, + gauges: collections.abc.Mapping[str, Metrics], + ): + """Package and send the fanned-out messages to Prompt Processing. + + Parameters + ---------- + client : `httpx.AsyncClient` + The client to which to upload the messages. + topic : `str` + The topic to which to upload the messages. + tasks : set [`asyncio.Task`] + Collection for holding the requests. + send_info : `Submission` + The data and address to submit. + gauges : mapping [`str`, `Metrics`] + A mapping from instrument name to metrics for that instrument. + """ + try: + attributes = { + "type": "com.example.kafka", + "source": topic, + } + + for fan_out_message in send_info.fan_out_messages: + data = fan_out_message + data_json = json.dumps(data) + + logging.info(f"data after json dump {data_json}") + event = CloudEvent(attributes, data_json) + headers, body = to_structured(event) + info = { + key: data[key] for key in ["instrument", "groupId", "detector"] + } + + task = asyncio.create_task( + knative_request( + gauges[fan_out_message["instrument"]].in_process, + client, + send_info.url, + headers, + body, + str(info), + ) + ) + tasks.add(task) + task.add_done_callback(tasks.discard) + + except ValueError: + logging.exception("Error while sending fanned-out messages.") @REQUEST_TIME.time() @@ -122,53 +406,46 @@ async def knative_request( Information such as some fields of the next visit message to identify this request and to log with. """ - in_process_requests_gauge.inc() - - result = await client.post( - knative_serving_url, - headers=headers, - data=body, # type:ignore - timeout=None, - ) - - logging.info( - f"nextVisit {info} status code {result.status_code} for initial request {result.content}" - ) - - ''' - if result.status_code == 502 or result.status_code == 503: - logging.info( - f"retry after status code {result.status_code} for nextVisit {info}" - ) - retry_result = await client.post( + with in_process_requests_gauge.track_inprogress(): + result = await client.post( knative_serving_url, headers=headers, data=body, # type:ignore timeout=None, ) + logging.info( - f"nextVisit {info} retried request {retry_result.content}" + f"nextVisit {info} status code {result.status_code} for initial request {result.content}" ) - ''' - in_process_requests_gauge.dec() + ''' + if result.status_code == 502 or result.status_code == 503: + logging.info( + f"retry after status code {result.status_code} for nextVisit {info}" + ) + retry_result = await client.post( + knative_serving_url, + headers=headers, + data=body, # type:ignore + timeout=None, + ) + logging.info( + f"nextVisit {info} retried request {retry_result.content}" + ) + ''' async def main() -> None: - # Get environment variables - detector_config_file = os.environ["DETECTOR_CONFIG_FILE"] + supported_instruments = os.environ["SUPPORTED_INSTRUMENTS"].split() + instrument_config_file = os.environ["INSTRUMENT_CONFIG_FILE"] kafka_cluster = os.environ["KAFKA_CLUSTER"] group_id = os.environ["CONSUMER_GROUP"] topic = os.environ["NEXT_VISIT_TOPIC"] offset = os.environ["OFFSET"] expire = float(os.environ["MESSAGE_EXPIRATION"]) kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"] - latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"] - lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"] - lsstcomcamsim_knative_serving_url = os.environ["LSSTCOMCAMSIM_KNATIVE_SERVING_URL"] - lsstcam_knative_serving_url = os.environ["LSSTCAM_KNATIVE_SERVING_URL"] - hsc_knative_serving_url = os.environ["HSC_KNATIVE_SERVING_URL"] + max_outgoing = int(os.environ["MAX_FAN_OUT_MESSAGES"]) # kafka auth sasl_username = os.environ["SASL_USERNAME"] @@ -177,21 +454,16 @@ async def main() -> None: security_protocol = os.environ["SECURITY_PROTOCOL"] # Logging config - logging.basicConfig(stream=sys.stdout, level=logging.INFO) - logging.basicConfig(stream=sys.stderr, level=logging.WARNING) - - conf = yaml.safe_load(Path(detector_config_file).read_text()) + if os.environ.get("DEBUG_LOGS") == "true": + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + else: + logging.basicConfig(stream=sys.stdout, level=logging.INFO) - # list based on keys in config. Data class - latiss_active_detectors = detector_load(conf, "LATISS") - lsstcomcam_active_detectors = detector_load(conf, "LSSTComCam") - lsstcam_active_detectors = detector_load(conf, "LSSTCam") - hsc_active_detectors = detector_load(conf, "HSC") + conf = yaml.safe_load(Path(instrument_config_file).read_text()) + instruments = {inst: InstrumentConfig(conf, inst) for inst in supported_instruments} # These four groups are for the small dataset used in the upload.py test - hsc_active_detectors_59134 = detector_load(conf, "HSC-TEST-59134") - hsc_active_detectors_59142 = detector_load(conf, "HSC-TEST-59142") - hsc_active_detectors_59150 = detector_load(conf, "HSC-TEST-59150") - hsc_active_detectors_59160 = detector_load(conf, "HSC-TEST-59160") + hsc_upload_detectors = {visit: InstrumentConfig.detector_load(conf, f"HSC-TEST-{visit}") + for visit in {59134, 59142, 59150, 59160}} # Start Prometheus endpoint start_http_server(8000) @@ -207,53 +479,14 @@ async def main() -> None: sasl_plain_password=sasl_password, ) - latiss_gauge = Gauge( - "latiss_next_visit_messages", "next visit nessages with latiss as instrument" - ) - lsstcam_gauge = Gauge( - "lsstcam_next_visit_messages", "next visit nessages with lsstcam as instrument" - ) - lsstcomcam_gauge = Gauge( - "lsstcomcam_next_visit_messages", - "next visit nessages with lsstcomcam as instrument", - ) - lsstcomcamsim_gauge = Gauge( - "lsstcomcamsim_next_visit_messages", - "next visit nessages with lsstcomcamsim as instrument", - ) - hsc_gauge = Gauge( - "hsc_next_visit_messages", "next visit nessages with hsc as instrument" - ) - hsc_in_process_requests_gauge = Gauge( - "hsc_prompt_processing_in_process_requests", - "hsc in process requests for next visit", - ) - - latiss_in_process_requests_gauge = Gauge( - "latiss_prompt_processing_in_process_requests", - "latiss in process requests for next visit", - ) - - lsstcam_in_process_requests_gauge = Gauge( - "lsstcam_prompt_processing_in_process_requests", - "lsstcam in process requests for next visit", - ) - - lsstcomcam_in_process_requests_gauge = Gauge( - "lsstcomcam_prompt_processing_in_process_requests", - "lsstcomcam in process requests for next visit", - ) - - lsstcomcamsim_in_process_requests_gauge = Gauge( - "lsstcomcamsim_prompt_processing_in_process_requests", - "lsstcomcamsim in process requests for next visit", - ) + gauges = {inst: Metrics(inst) for inst in supported_instruments} await consumer.start() tasks = set() - async with httpx.AsyncClient() as client: + limits = httpx.Limits(max_connections=max_outgoing) + async with httpx.AsyncClient(limits=limits) as client: try: # Setup kafka schema registry connection and deserialzer @@ -264,184 +497,25 @@ async def main() -> None: while True: # run continously async for msg in consumer: - - next_visit_message_initial = await deserializer.deserialize( - data=msg.value - ) - - logging.info(f"message deserialized {next_visit_message_initial}") - - if not next_visit_message_initial["message"]["instrument"]: - logging.info("Message does not have an instrument. Assuming " - "it's not an observation.") - continue - - # efdStamp is visit publication, in seconds since 1970-01-01 UTC - if next_visit_message_initial["message"]["private_efdStamp"]: - published = next_visit_message_initial["message"]["private_efdStamp"] - age = round(time.time() - published) # Microsecond precision is distracting - if age > expire: - logging.warning("Message published on %s UTC is %s old, ignoring.", - time.ctime(published), - datetime.timedelta(seconds=age) - ) - continue - else: - logging.warning("Message does not have private_efdStamp, can't determine age.") - - next_visit_message_updated = NextVisitModel( - salIndex=next_visit_message_initial["message"]["salIndex"], - scriptSalIndex=next_visit_message_initial["message"][ - "scriptSalIndex" - ], - instrument=next_visit_message_initial["message"]["instrument"], - groupId=next_visit_message_initial["message"]["groupId"], - coordinateSystem=next_visit_message_initial["message"][ - "coordinateSystem" - ], - position=next_visit_message_initial["message"]["position"], - startTime=next_visit_message_initial["message"]["startTime"], - rotationSystem=next_visit_message_initial["message"][ - "rotationSystem" - ], - cameraAngle=next_visit_message_initial["message"][ - "cameraAngle" - ], - filters=next_visit_message_initial["message"]["filters"], - dome=next_visit_message_initial["message"]["dome"], - duration=next_visit_message_initial["message"]["duration"], - nimages=next_visit_message_initial["message"]["nimages"], - survey=next_visit_message_initial["message"]["survey"], - totalCheckpoints=next_visit_message_initial["message"][ - "totalCheckpoints" - ], - private_sndStamp=next_visit_message_initial["message"][ - "private_sndStamp" - ], - ) - - match next_visit_message_updated.instrument: - case "LATISS": - latiss_gauge.inc() - fan_out_message_list = ( - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - latiss_active_detectors, - ) - ) - knative_serving_url = latiss_knative_serving_url - in_process_requests_gauge = latiss_in_process_requests_gauge - case "LSSTComCamSim": - lsstcomcamsim_gauge.inc() - fan_out_message_list = ( - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - # Just use ComCam active detector config. - lsstcomcam_active_detectors, - ) - ) - knative_serving_url = lsstcomcamsim_knative_serving_url - in_process_requests_gauge = lsstcomcamsim_in_process_requests_gauge - case "LSSTComCam": - logging.info(f"Ignore LSSTComCam message {next_visit_message_updated}" - " as the prompt service for this is not yet deployed.") - continue - case "LSSTCam": - logging.info(f"Ignore LSSTCam message {next_visit_message_updated}" - " as the prompt service for this is not yet deployed.") - continue - case "HSC": - # HSC has extra active detector configurations just for the - # upload.py test. - match next_visit_message_updated.salIndex: - case 999: # HSC datasets from using upload_from_repo.py - hsc_gauge.inc() - fan_out_message_list = ( - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors, - ) - ) - knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge - case 59134: # HSC upload.py test dataset - hsc_gauge.inc() - fan_out_message_list = ( - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59134, - ) - ) - knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge - case 59142: # HSC upload.py test dataset - hsc_gauge.inc() - fan_out_message_list = ( - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59142, - ) - ) - knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge - case 59150: # HSC upload.py test dataset - hsc_gauge.inc() - fan_out_message_list = ( - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59150, - ) - ) - knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge - case 59160: # HSC upload.py test dataset - hsc_gauge.inc() - fan_out_message_list = ( - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59160, - ) - ) - knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge - case _: - raise Exception( - f"no matching case for instrument {next_visit_message_updated.instrument}." - ) - try: - attributes = { - "type": "com.example.kafka", - "source": topic, - } - - for fan_out_message in fan_out_message_list: - data = fan_out_message - data_json = json.dumps(data) - - logging.info(f"data after json dump {data_json}") - event = CloudEvent(attributes, data_json) - headers, body = to_structured(event) - info = { - key: data[key] for key in ["instrument", "groupId", "detector"] - } - - task = asyncio.create_task( - knative_request( - in_process_requests_gauge, - client, - knative_serving_url, - headers, - body, - str(info), - ) - ) - tasks.add(task) - task.add_done_callback(tasks.discard) - - except ValueError as e: - logging.info("Error ", e) + next_visit_message_initial = await deserializer.deserialize( + data=msg.value + ) + logging.info(f"message deserialized {next_visit_message_initial}") + if not is_handleable(next_visit_message_initial["message"], expire): + continue + next_visit_message_updated = NextVisitModel.from_raw_message( + next_visit_message_initial["message"] + ) + send_info = make_fanned_out_messages(next_visit_message_updated, + instruments, + gauges, + hsc_upload_detectors, + ) + dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges) + except UnsupportedMessageError: + logging.exception("Could not process message, continuing.") finally: await consumer.stop()