diff --git a/src/main.py b/src/main.py index b156727..717334d 100644 --- a/src/main.py +++ b/src/main.py @@ -4,6 +4,7 @@ import datetime import json import logging +import math import os from pathlib import Path import sys @@ -121,15 +122,21 @@ class InstrumentConfig: instrument: str """The instrument whose metrics are held by this object (`str`).""" - url: str - """The address of the Knative Serving instance for this instrument (`str`).""" + urls: collections.abc.Sequence[str] + """The addresses of the Knative Serving instance for this instrument (sequence [`str`]).""" detectors: collections.abc.Sequence[int] """The active detectors for this instrument (sequence [`int`]).""" def __init__(self, conf, instrument): super().__setattr__("instrument", instrument) - super().__setattr__("url", conf["knative-urls"][instrument]) + super().__setattr__("urls", conf["knative-urls"][instrument]) + if not self.urls: + raise ValueError(f"Must provide at least one URL for the {instrument} service.") super().__setattr__("detectors", self.detector_load(conf, instrument)) + if not self.detectors: + raise ValueError(f"Must provide at least one detector for the {instrument} service. " + "If the service is not yet ready, remove it from the supported " + "instruments list.") @staticmethod def detector_load(conf: dict, instrument: str) -> list[int]: @@ -300,10 +307,10 @@ def fan_out(next_visit, inst_config): Returns ------- - fanned_out : `Submission` + fanned_out : collection [`Submission`] The submission information for the fanned-out messages. """ - return Submission(inst_config.url, next_visit.add_detectors(inst_config.detectors)) + return fan_out_hsc(next_visit, inst_config, inst_config.detectors) def fan_out_hsc(next_visit, inst_config, detectors): @@ -320,16 +327,48 @@ def fan_out_hsc(next_visit, inst_config, detectors): Returns ------- - fanned_out : `Submission` + fanned_out : collection [`Submission`] The submission information for the fanned-out messages. """ - return Submission(inst_config.url, next_visit.add_detectors(detectors)) + n_services = len(inst_config.urls) + n_detectors = len(detectors) + # Block-based assignment is more intuitive for humans monitoring the services. + # The last blocks may be underfilled or (if n_detectors < n_services) empty. + block_size = int(math.ceil(n_detectors / n_services)) + + blocks = _batched(detectors, block_size) + # len(blocks) = min(n_services, n_detectors) + return [Submission(url, next_visit.add_detectors(block)) + for url, block in zip(inst_config.urls, blocks)] + + +def _batched(seq, size): + """Batch a sequence data into smaller sequences. + + Parameters + ---------- + seq : sequence + The sequence to batch. + size : `int` + The desired batch size. + + Returns + ------- + batched : sequence [sequence] + A sequence of consecutive subsequences, in the same order as the + original. Each sequence is of length ``size`` until there are + not enough elements. + """ + batches = [] + for i in range(0, len(seq), size): + batches.append(seq[i:i + size]) + return batches def dispatch_fanned_out_messages(client: httpx.AsyncClient, topic: str, tasks: collections.abc.MutableSet[asyncio.Task], - send_info: Submission, + send_infos: collections.abc.Collection[Submission], gauges: collections.abc.Mapping[str, Metrics], ): """Package and send the fanned-out messages to Prompt Processing. @@ -342,8 +381,8 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient, The topic to which to upload the messages. tasks : set [`asyncio.Task`] Collection for holding the requests. - send_info : `Submission` - The data and address to submit. + send_infos : collection [`Submission`] + The data and addresses to submit. gauges : mapping [`str`, `Metrics`] A mapping from instrument name to metrics for that instrument. """ @@ -353,29 +392,30 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient, "source": topic, } - for fan_out_message in send_info.fan_out_messages: - data = fan_out_message - data_json = json.dumps(data) - - logging.info(f"data after json dump {data_json}") - event = CloudEvent(attributes, data_json) - headers, body = to_structured(event) - info = { - key: data[key] for key in ["instrument", "groupId", "detector"] - } - - task = asyncio.create_task( - knative_request( - gauges[fan_out_message["instrument"]].in_process, - client, - send_info.url, - headers, - body, - str(info), + for send_info in send_infos: + for fan_out_message in send_info.fan_out_messages: + data = fan_out_message + data_json = json.dumps(data) + + logging.info(f"data after json dump {data_json}") + event = CloudEvent(attributes, data_json) + headers, body = to_structured(event) + info = { + key: data[key] for key in ["instrument", "groupId", "detector"] + } + + task = asyncio.create_task( + knative_request( + gauges[fan_out_message["instrument"]].in_process, + client, + send_info.url, + headers, + body, + str(info), + ) ) - ) - tasks.add(task) - task.add_done_callback(tasks.discard) + tasks.add(task) + task.add_done_callback(tasks.discard) except ValueError: logging.exception("Error while sending fanned-out messages.")