Skip to content

Commit

Permalink
Merge branch 'tickets/DM-47656'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed Nov 20, 2024
2 parents 41d9508 + b72e1c6 commit 3ef17f9
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient,
tasks: collections.abc.MutableSet[asyncio.Task],
send_info: Submission,
gauges: collections.abc.Mapping[str, Metrics],
*,
retry_knative: bool,
):
"""Package and send the fanned-out messages to Prompt Processing.
Expand All @@ -346,6 +348,8 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient,
The data and address to submit.
gauges : mapping [`str`, `Metrics`]
A mapping from instrument name to metrics for that instrument.
retry_knative : `bool`
Whether or not Knative requests can be retried.
"""
try:
attributes = {
Expand All @@ -372,6 +376,7 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient,
headers,
body,
str(info),
retry=retry_knative,
)
)
tasks.add(task)
Expand All @@ -389,22 +394,28 @@ async def knative_request(
headers: dict[str, str],
body: bytes,
info: str,
*,
retry: bool,
) -> None:
"""Makes knative http request.
Parameters
----------
client: `httpx.AsyncClient`
in_process_requests_gauge : `prometheus_client.Gauge`
A gauge to be updated with the start and end of the request.
client : `httpx.AsyncClient`
The async httpx client.
knative_serving_url : `string`
The url for the knative instance.
headers: dict[`str,'str']
headers : dict[`str,'str']
The headers to pass to knative.
body: `bytes`
body : `bytes`
The next visit message body.
info: `str`
info : `str`
Information such as some fields of the next visit message to identify
this request and to log with.
retry : `bool`
Whether or not requests can be retried.
"""
with in_process_requests_gauge.track_inprogress():
result = await client.post(
Expand All @@ -418,8 +429,12 @@ async def knative_request(
f"nextVisit {info} status code {result.status_code} for initial request {result.content}"
)

'''
if result.status_code == 502 or result.status_code == 503:
if retry and result.status_code == 503:
if 'Retry-After' in result.headers:
delay = int(result.headers['Retry-After'])
logging.info("Waiting %d seconds before retrying nextVisit %s...", delay, info)
await asyncio.sleep(delay)

logging.info(
f"retry after status code {result.status_code} for nextVisit {info}"
)
Expand All @@ -430,9 +445,9 @@ async def knative_request(
timeout=None,
)
logging.info(
f"nextVisit {info} retried request {retry_result.content}"
f"nextVisit {info} status code {retry_result.status_code} for "
f"retried request {retry_result.content}"
)
'''


async def main() -> None:
Expand All @@ -446,6 +461,7 @@ async def main() -> None:
expire = float(os.environ["MESSAGE_EXPIRATION"])
kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"]
max_outgoing = int(os.environ["MAX_FAN_OUT_MESSAGES"])
retry_knative = os.environ["RETRY_KNATIVE_REQUESTS"].lower() == "true"

# kafka auth
sasl_username = os.environ["SASL_USERNAME"]
Expand Down Expand Up @@ -513,7 +529,8 @@ async def main() -> None:
gauges,
hsc_upload_detectors,
)
dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges)
dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges,
retry_knative=retry_knative)
except UnsupportedMessageError:
logging.exception("Could not process message, continuing.")
finally:
Expand Down

0 comments on commit 3ef17f9

Please sign in to comment.