Skip to content

Commit

Permalink
Support sending different detectors to different services.
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed Oct 16, 2024
1 parent 5f3afe6 commit 8dcb7ad
Showing 1 changed file with 72 additions and 32 deletions.
104 changes: 72 additions & 32 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime
import json
import logging
import math
import os
from pathlib import Path
import sys
Expand Down Expand Up @@ -121,15 +122,21 @@ class InstrumentConfig:

instrument: str
"""The instrument whose metrics are held by this object (`str`)."""
url: str
"""The address of the Knative Serving instance for this instrument (`str`)."""
urls: collections.abc.Sequence[str]
"""The addresses of the Knative Serving instance for this instrument (sequence [`str`])."""
detectors: collections.abc.Sequence[int]
"""The active detectors for this instrument (sequence [`int`])."""

def __init__(self, conf, instrument):
super().__setattr__("instrument", instrument)
super().__setattr__("url", conf["knative-urls"][instrument])
super().__setattr__("urls", conf["knative-urls"][instrument])
if not self.urls:
raise ValueError(f"Must provide at least one URL for the {instrument} service.")
super().__setattr__("detectors", self.detector_load(conf, instrument))
if not self.detectors:
raise ValueError(f"Must provide at least one detector for the {instrument} service. "
"If the service is not yet ready, remove it from the supported "
"instruments list.")

@staticmethod
def detector_load(conf: dict, instrument: str) -> list[int]:
Expand Down Expand Up @@ -300,10 +307,10 @@ def fan_out(next_visit, inst_config):
Returns
-------
fanned_out : `Submission`
fanned_out : collection [`Submission`]
The submission information for the fanned-out messages.
"""
return Submission(inst_config.url, next_visit.add_detectors(inst_config.detectors))
return fan_out_hsc(next_visit, inst_config, inst_config.detectors)


def fan_out_hsc(next_visit, inst_config, detectors):
Expand All @@ -320,16 +327,48 @@ def fan_out_hsc(next_visit, inst_config, detectors):
Returns
-------
fanned_out : `Submission`
fanned_out : collection [`Submission`]
The submission information for the fanned-out messages.
"""
return Submission(inst_config.url, next_visit.add_detectors(detectors))
n_services = len(inst_config.urls)
n_detectors = len(detectors)
# Block-based assignment is more intuitive for humans monitoring the services.
# The last blocks may be underfilled or (if n_detectors < n_services) empty.
block_size = int(math.ceil(n_detectors / n_services))

blocks = _batched(detectors, block_size)
# len(blocks) = min(n_services, n_detectors)
return [Submission(url, next_visit.add_detectors(block))
for url, block in zip(inst_config.urls, blocks)]


def _batched(seq, size):
"""Batch a sequence data into smaller sequences.
Parameters
----------
seq : sequence
The sequence to batch.
size : `int`
The desired batch size.
Returns
-------
batched : sequence [sequence]
A sequence of consecutive subsequences, in the same order as the
original. Each sequence is of length ``size`` until there are
not enough elements.
"""
batches = []
for i in range(0, len(seq), size):
batches.append(seq[i:i + size])
return batches


def dispatch_fanned_out_messages(client: httpx.AsyncClient,
topic: str,
tasks: collections.abc.MutableSet[asyncio.Task],
send_info: Submission,
send_infos: collections.abc.Collection[Submission],
gauges: collections.abc.Mapping[str, Metrics],
):
"""Package and send the fanned-out messages to Prompt Processing.
Expand All @@ -342,8 +381,8 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient,
The topic to which to upload the messages.
tasks : set [`asyncio.Task`]
Collection for holding the requests.
send_info : `Submission`
The data and address to submit.
send_infos : collection [`Submission`]
The data and addresses to submit.
gauges : mapping [`str`, `Metrics`]
A mapping from instrument name to metrics for that instrument.
"""
Expand All @@ -353,29 +392,30 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient,
"source": topic,
}

for fan_out_message in send_info.fan_out_messages:
data = fan_out_message
data_json = json.dumps(data)

logging.info(f"data after json dump {data_json}")
event = CloudEvent(attributes, data_json)
headers, body = to_structured(event)
info = {
key: data[key] for key in ["instrument", "groupId", "detector"]
}

task = asyncio.create_task(
knative_request(
gauges[fan_out_message["instrument"]].in_process,
client,
send_info.url,
headers,
body,
str(info),
for send_info in send_infos:
for fan_out_message in send_info.fan_out_messages:
data = fan_out_message
data_json = json.dumps(data)

logging.info(f"data after json dump {data_json}")
event = CloudEvent(attributes, data_json)
headers, body = to_structured(event)
info = {
key: data[key] for key in ["instrument", "groupId", "detector"]
}

task = asyncio.create_task(
knative_request(
gauges[fan_out_message["instrument"]].in_process,
client,
send_info.url,
headers,
body,
str(info),
)
)
)
tasks.add(task)
task.add_done_callback(tasks.discard)
tasks.add(task)
task.add_done_callback(tasks.discard)

except ValueError:
logging.exception("Error while sending fanned-out messages.")
Expand Down

0 comments on commit 8dcb7ad

Please sign in to comment.