Skip to content

Commit

Permalink
Adds async version of local concurrency flow
Browse files Browse the repository at this point in the history
  • Loading branch information
EmilRex committed Dec 16, 2024
1 parent 1db630a commit 4d6ac44
Showing 1 changed file with 69 additions and 0 deletions.
69 changes: 69 additions & 0 deletions flows/local-concurrency-with-async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Implement extract flow using Python native async concurrency"""

import asyncio

import httpx
from prefect import flow, get_run_logger, tags, task

BASE_URL = "https://dev.to/api"
CONCURRENCY = 10


@task(
retries=3,
retry_delay_seconds=[10, 30, 60],
)
async def fetch_url(
client: httpx.AsyncClient,
semaphore: asyncio.BoundedSemaphore,
url: str,
params: dict | None = None,
) -> dict:
"""Generic task for fetching a URL"""
async with semaphore:
get_run_logger().info(f"Fetching {url}")
response = await client.get(url, params=params)
response.raise_for_status()
return response.json()


@task
async def list_articles(
client: httpx.AsyncClient,
semaphore: asyncio.BoundedSemaphore,
pages: int,
per_page: int = 10,
) -> list[str]:
"""List (pages * per_page) article URLs from the Dev.to API"""
_tasks = [
fetch_url(
client,
semaphore,
f"{BASE_URL}/articles",
{"page": page, "per_page": per_page},
)
for page in range(1, pages + 1)
]
_pages = await asyncio.gather(*_tasks)

return [f"{BASE_URL}/articles/{_item['id']}" for _page in _pages for _item in _page]


@flow
async def extract(pages: int) -> None:
"""Extract articles from the Dev.to API"""
semaphore = asyncio.BoundedSemaphore(CONCURRENCY)

async with httpx.AsyncClient() as client:
article_urls = await list_articles(client, semaphore, pages)

articles = [
fetch_url(client, semaphore, article_url) for article_url in article_urls
]

await asyncio.gather(*articles)


if __name__ == "__main__":
with tags("local"):
asyncio.run(extract(pages=10))

0 comments on commit 4d6ac44

Please sign in to comment.