Skip to content

Commit

Permalink
Add custom rate limit for aiohtpp
Browse files Browse the repository at this point in the history
  • Loading branch information
moisses89 committed Jan 10, 2025
1 parent 7093b3d commit 13b521e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
10 changes: 3 additions & 7 deletions safe_eth/eth/clients/etherscan_client_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urljoin

import aiohttp
import requests

from safe_eth.eth import EthereumNetwork
Expand All @@ -12,6 +11,7 @@
EtherscanClient,
EtherscanRateLimitError,
)
from safe_eth.eth.clients.rate_limiter import get_client_rate_limited


class EtherscanClientV2(EtherscanClient):
Expand Down Expand Up @@ -97,19 +97,15 @@ def __init__(
max_requests: int = int(os.environ.get("ETHERSCAN_CLIENT_MAX_REQUESTS", 100)),
):
super().__init__(network, api_key, request_timeout)
self.async_session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit_per_host=max_requests)
)
self.client = get_client_rate_limited(self.base_api_url, 5) # 5 per second

async def _async_do_request(
self, url: str
) -> Optional[Union[Dict[str, Any], List[Any], str]]:
"""
Async version of _do_request
"""
async with self.async_session.get(
url, timeout=self.request_timeout
) as response:
async with await self.client.get(url, timeout=self.request_timeout) as response:
if response.ok:
response_json = await response.json()
result = response_json["result"]
Expand Down
55 changes: 55 additions & 0 deletions safe_eth/eth/clients/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio
import time
from functools import cache
from logging import getLogger

import aiohttp

logger = getLogger(__name__)


class RateLimiter:
"""
Class to limit the number of requests per second
"""

def __init__(self, client, rate):
self.client = client
self.rate = rate
self.available_conns = rate # Initialize available conns
self.updated_at = time.monotonic()

async def get(self, *args, **kwargs):
await self._wait_for_available_conn()
return self.client.get(*args, **kwargs)

async def post(self, *args, **kwargs):
await self._wait_for_available_conn()
return self.client.post(*args, **kwargs)

async def _wait_for_available_conn(self):
while self.available_conns < 1:
self._release_available_conns()
await asyncio.sleep(0.1)
self.available_conns -= 1

def _release_available_conns(self):
now = time.monotonic()
time_since_update = now - self.updated_at
if time_since_update >= 1:
self.available_conns = self.rate
self.updated_at = now


@cache
def get_client_rate_limited(host: str, rate: int) -> "RateLimiter":
"""
Get a rate limited client by host
Host parameter is just being used to store in cache different instance by host
:param host:
:param rate: number of requests allowed per second
"""
logger.info(f"Initializing rate limiter for {host} by {rate}/s")
async_session = aiohttp.ClientSession()
return RateLimiter(async_session, rate)
17 changes: 17 additions & 0 deletions safe_eth/eth/tests/clients/test_etherscan_client_v2.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import unittest

Expand Down Expand Up @@ -106,3 +107,19 @@ async def test_async_etherscan_get_abi(self):
)
except EtherscanRateLimitError:
self.skipTest("Etherscan rate limit reached")

async def test_async_etherscan_get_abi_rate_limiter(self):
etherscan_api = self.get_etherscan_api(EthereumNetwork.MAINNET)
safe_master_copy_abi = sourcify_safe_metadata["output"]["abi"]
safe_master_copy_address = "0x6851D6fDFAfD08c0295C392436245E5bc78B0185"
tasks = [
asyncio.ensure_future(
etherscan_api.async_get_contract_metadata(safe_master_copy_address)
)
for i in range(20)
]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
self.assertIsNotNone(result)
self.assertEqual(safe_master_copy_abi, result.abi)

0 comments on commit 13b521e

Please sign in to comment.