Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Review and Downstream Polling #208

Merged
merged 13 commits into from
Feb 3, 2025
Merged

Auto Review and Downstream Polling #208

merged 13 commits into from
Feb 3, 2025

Conversation

mawelborn
Copy link
Contributor

@mawelborn mawelborn commented Jan 14, 2025

This PR adds two classes that implement and encapsulate best-practices behavior for Auto Review and Downstream polling patterns. Combined with the results and etloutput modules, this factors out much of the scaffolding of an Auto Review or Downstream pod.

A project need only define one or both of these functions:

async def auto_review(result: Result, etl_outputs: dict[Document, EtlOutput]) -> AutoReviewed:
    """
    Apply auto review rules to predictions and determine straight through processing.
    Any IO performed (network requests, file reads/writes, etc) must be `await`ed to
    avoid blocking the asyncio loop that runs this coroutine.
    """
    predictions = result.pre_review

    # Auto review rules here.

    return AutoReviewed(
        changes=predictions.to_changes(result),
        stp=True,  # Defaults to `False` and may be omitted.
    )
async def downstream(submission: Submission) -> None:
    """
    Send a submission downstream.
    """
    await httpx.post(
        "https://dowstream_endpoint",
        json={
            "id": submission.id,
            "status": submission.status,
            "output_url": submission.result_file,
        },
    )

And instantiate one or both of the polling classes in a package's __main__.py, CLI, or other entrypoint:

asyncio.run(AutoReviewPoller(IndicoConfig(), workflow_id, auto_review).poll_forever())
asyncio.run(DownstreamPoller(IndicoConfig(), workflow_id, downstream).poll_forever())
asyncio.run(
    asyncio.gather(
        AutoReviewPoller(IndicoConfig(), workflow_id, auto_review).poll_forever(),
        DownstreamPoller(IndicoConfig(), workflow_id, downstream).poll_forever(),
    )
)

AutoReviewPoller loads the result file and etl output as dataclasses, applies use-case-specific logic with the auto_review callback, and submits the changes with optional STP.

DownstreamPoller loads the submission metadata, sends it downstream with the downstream callback, and marks the submission retrieved.

Implemented best-practices behavior includes:

  • Asyncio workers process submissions concurrently, up to a maximum number of workers configurable by the worker_count kwarg.
  • Continuous polling for new submissions keeps the worker queue saturated, configurable by the poll_delay kwarg.
  • Retry logic gives workers the best chance to recover from a network error, configurable by the retry_count kwarg and friends.
  • Robust error handling and logging and ensures the poller stays alive and errors are logged when workers fail.
  • Max workers, spawn rate, etl output loading, and retry behavior have sane defaults and are all configurable by kwargs.

See examples/poll_auto_review.py and the class definitions for AutoReviewPoller and DownstreamPoller for more details.

@mawelborn mawelborn marked this pull request as ready for review January 15, 2025 21:27
@mawelborn mawelborn merged commit 09524e9 into main Feb 3, 2025
9 checks passed
@mawelborn mawelborn deleted the mawelborn/polling branch February 3, 2025 22:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant