Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Adds request rate limit throttling to Injective Perpetual #353

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hummingbot/connector/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
NaN = float("nan")
s_decimal_NaN = Decimal("nan")
s_decimal_0 = Decimal(0)
SECOND = 1
MINUTE = 60
TWELVE_HOURS = MINUTE * 60 * 12
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from hummingbot.connector.gateway.gateway_in_flight_order import GatewayInFlightOrder
from hummingbot.connector.trading_rule import TradingRule
from hummingbot.connector.utils import combine_to_hb_trading_pair, split_hb_trading_pair
from hummingbot.core.api_throttler.async_throttler import AsyncThrottler
from hummingbot.core.data_type.common import OrderType, PositionAction, PositionMode, PositionSide, TradeType
from hummingbot.core.data_type.funding_info import FundingInfo, FundingInfoUpdate
from hummingbot.core.data_type.in_flight_order import InFlightOrder, OrderState, OrderUpdate, TradeUpdate
Expand Down Expand Up @@ -89,6 +90,7 @@ def __init__(
self._network_obj = CONSTANTS.NETWORK_CONFIG[self._network]
self._client = AsyncClient(network=self._network_obj)
self._account_address: Optional[str] = None
self._throttler = AsyncThrottler(rate_limits=CONSTANTS.RATE_LIMITS)

self._composer = Composer(network=self._network_obj.string())
self._order_hash_manager: Optional[OrderHashManager] = None
Expand Down Expand Up @@ -171,7 +173,8 @@ async def stop(self):
async def check_network_status(self) -> NetworkStatus:
status = NetworkStatus.CONNECTED
try:
await self._client.ping()
async with self._throttler.execute_task(limit_id=CONSTANTS.PING_LIMIT_ID):
await self._client.ping()
await self._get_gateway_instance().ping_gateway()
except asyncio.CancelledError:
raise
Expand All @@ -188,9 +191,10 @@ async def set_trading_pair_leverage(self, trading_pair: str, leverage: int) -> T
async def get_order_book_snapshot(self, trading_pair: str) -> OrderBookMessage:
market_info = self._markets_info[trading_pair]
price_scaler: Decimal = Decimal(f"1e-{market_info.quote_token_meta.decimals}")
response: OrderbooksV2Response = await self._client.get_derivative_orderbooksV2(
market_ids=[market_info.market_id]
)
async with self._throttler.execute_task(limit_id=CONSTANTS.ORDER_BOOK_LIMIT_ID):
response: OrderbooksV2Response = await self._client.get_derivative_orderbooksV2(
market_ids=[market_info.market_id]
)

snapshot_ob: DerivativeLimitOrderbookV2 = response.orderbooks[0].orderbook
snapshot_timestamp_ms: float = max(
Expand Down Expand Up @@ -287,18 +291,19 @@ async def place_order(
order_hash: str = order_hashes.derivative[0]

try:
order_result: Dict[str, Any] = await self._get_gateway_instance().clob_perp_place_order(
connector=self._connector_name,
chain=self._chain,
network=self._network,
trading_pair=order.trading_pair,
address=self._account_id,
trade_type=order.trade_type,
order_type=order.order_type,
price=order.price,
size=order.amount,
leverage=order.leverage,
)
async with self._throttler.execute_task(limit_id=CONSTANTS.TRANSACTION_POST_LIMIT_ID):
order_result: Dict[str, Any] = await self._get_gateway_instance().clob_perp_place_order(
connector=self._connector_name,
chain=self._chain,
network=self._network,
trading_pair=order.trading_pair,
address=self._account_id,
trade_type=order.trade_type,
order_type=order.order_type,
price=order.price,
size=order.amount,
leverage=order.leverage,
)

transaction_hash: Optional[str] = order_result.get("txHash")
except Exception:
Expand Down Expand Up @@ -337,14 +342,15 @@ async def batch_order_create(self, orders_to_create: List[GatewayInFlightOrder])
spot_orders=[], derivative_orders=derivative_orders_to_create
)
try:
update_result = await self._get_gateway_instance().clob_perp_batch_order_modify(
connector=self._connector_name,
chain=self._chain,
network=self._network,
address=self._account_id,
orders_to_create=orders_to_create,
orders_to_cancel=[],
)
async with self._throttler.execute_task(limit_id=CONSTANTS.TRANSACTION_POST_LIMIT_ID):
update_result = await self._get_gateway_instance().clob_perp_batch_order_modify(
connector=self._connector_name,
chain=self._chain,
network=self._network,
address=self._account_id,
orders_to_create=orders_to_create,
orders_to_cancel=[],
)
except Exception:
await self._update_account_address_and_create_order_hash_manager()
raise
Expand Down Expand Up @@ -378,14 +384,15 @@ async def batch_order_create(self, orders_to_create: List[GatewayInFlightOrder])
async def cancel_order(self, order: GatewayInFlightOrder) -> Tuple[bool, Optional[Dict[str, Any]]]:
await order.get_exchange_order_id()

cancelation_result = await self._get_gateway_instance().clob_perp_cancel_order(
chain=self._chain,
network=self._network,
connector=self._connector_name,
address=self._account_id,
trading_pair=order.trading_pair,
exchange_order_id=order.exchange_order_id,
)
async with self._throttler.execute_task(limit_id=CONSTANTS.TRANSACTION_POST_LIMIT_ID):
cancelation_result = await self._get_gateway_instance().clob_perp_cancel_order(
chain=self._chain,
network=self._network,
connector=self._connector_name,
address=self._account_id,
trading_pair=order.trading_pair,
exchange_order_id=order.exchange_order_id,
)
transaction_hash: Optional[str] = cancelation_result.get("txHash")

if transaction_hash in [None, ""]:
Expand Down Expand Up @@ -422,14 +429,15 @@ async def batch_order_cancel(self, orders_to_cancel: List[InFlightOrder]) -> Lis
if not isinstance(result, asyncio.TimeoutError)
]

update_result = await self._get_gateway_instance().clob_perp_batch_order_modify(
connector=self._connector_name,
chain=self._chain,
network=self._network,
address=self._account_id,
orders_to_create=[],
orders_to_cancel=found_orders_to_cancel,
)
async with self._throttler.execute_task(limit_id=CONSTANTS.TRANSACTION_POST_LIMIT_ID):
update_result = await self._get_gateway_instance().clob_perp_batch_order_modify(
connector=self._connector_name,
chain=self._chain,
network=self._network,
address=self._account_id,
orders_to_create=[],
orders_to_cancel=found_orders_to_cancel,
)

transaction_hash: Optional[str] = update_result.get("txHash")
exception = None
Expand All @@ -455,9 +463,10 @@ async def batch_order_cancel(self, orders_to_cancel: List[InFlightOrder]) -> Lis

async def fetch_positions(self) -> List[Position]:
market_ids = self._get_market_ids()
backend_positions: PositionsResponse = await self._client.get_derivative_positions(
market_ids=market_ids, subaccount_id=self._account_id
)
async with self._throttler.execute_task(limit_id=CONSTANTS.POSITIONS_LIMIT_ID):
backend_positions: PositionsResponse = await self._client.get_derivative_positions(
market_ids=market_ids, subaccount_id=self._account_id
)

positions = [
self._parse_backed_position_to_position(backend_position=backed_position)
Expand All @@ -477,9 +486,10 @@ async def get_account_balances(self) -> Dict[str, Dict[str, Decimal]]:
await self._update_account_address_and_create_order_hash_manager()
self._check_markets_initialized() or await self._update_markets()

portfolio_response: AccountPortfolioResponse = await self._client.get_account_portfolio(
account_address=self._account_address
)
async with self._throttler.execute_task(limit_id=CONSTANTS.ACCOUNT_PORTFOLIO_LIMIT_ID):
portfolio_response: AccountPortfolioResponse = await self._client.get_account_portfolio(
account_address=self._account_address
)

portfolio: Portfolio = portfolio_response.portfolio
bank_balances: List[Coin] = portfolio.bank_balances
Expand Down Expand Up @@ -546,9 +556,10 @@ async def fetch_last_fee_payment(self, trading_pair: str) -> Tuple[float, Decima
if trading_pair not in self._markets_info:
return timestamp, funding_rate, payment

response: FundingPaymentsResponse = await self._client.get_funding_payments(
subaccount_id=self._account_id, market_id=self._markets_info[trading_pair].market_id, limit=1
)
async with self._throttler.execute_task(limit_id=CONSTANTS.FUNDING_PAYMENT_LIMIT_ID):
response: FundingPaymentsResponse = await self._client.get_funding_payments(
subaccount_id=self._account_id, market_id=self._markets_info[trading_pair].market_id, limit=1
)

if len(response.payments) != 0:
latest_funding_payment: FundingPayment = response.payments[0] # List of payments sorted by latest
Expand All @@ -565,13 +576,16 @@ async def fetch_last_fee_payment(self, trading_pair: str) -> Tuple[float, Decima
async def _update_account_address_and_create_order_hash_manager(self):
if not self._order_placement_lock.locked():
raise RuntimeError("The order-placement lock must be acquired before creating the order hash manager.")
response: Dict[str, Any] = await self._get_gateway_instance().clob_injective_balances(
chain=self._chain, network=self._network, address=self._account_id
)
async with self._throttler.execute_task(limit_id=CONSTANTS.BALANCES_LIMIT_ID):
response: Dict[str, Any] = await self._get_gateway_instance().clob_injective_balances(
chain=self._chain, network=self._network, address=self._account_id
)
self._account_address: str = response["injectiveAddress"]

await self._client.get_account(address=self._account_address)
await self._client.sync_timeout_height()
async with self._throttler.execute_task(limit_id=CONSTANTS.ACCOUNT_LIMIT_ID):
await self._client.get_account(address=self._account_address)
async with self._throttler.execute_task(limit_id=CONSTANTS.SYNC_TIMEOUT_HEIGHT_LIMIT_ID):
await self._client.sync_timeout_height()
tasks_to_await_submitted_orders_to_be_processed_by_chain = [
asyncio.wait_for(order.wait_until_processed_by_exchange(), timeout=CONSTANTS.ORDER_CHAIN_PROCESSING_TIMEOUT)
for order in self._gateway_order_tracker.active_orders.values()
Expand All @@ -581,7 +595,8 @@ async def _update_account_address_and_create_order_hash_manager(self):
*tasks_to_await_submitted_orders_to_be_processed_by_chain, return_exceptions=True # await their processing
)
self._order_hash_manager = OrderHashManager(network=self._network_obj, sub_account_id=self._account_id)
await self._order_hash_manager.start()
async with self._throttler.execute_task(limit_id=CONSTANTS.NONCE_LIMIT_ID):
await self._order_hash_manager.start()

def _check_markets_initialized(self) -> bool:
return (
Expand Down Expand Up @@ -612,11 +627,15 @@ async def _update_markets(self):

async def _fetch_derivative_markets(self) -> MarketsResponse:
market_status: str = "active"
return await self._client.get_derivative_markets(market_status=market_status)
async with self._throttler.execute_task(limit_id=CONSTANTS.DERIVATIVE_MARKETS_LIMIT_ID):
derivative_markets = await self._client.get_derivative_markets(market_status=market_status)
return derivative_markets

async def _fetch_spot_markets(self) -> MarketsResponse:
market_status: str = "active"
return await self._client.get_spot_markets(market_status=market_status)
async with self._throttler.execute_task(limit_id=CONSTANTS.SPOT_MARKETS_LIMIT_ID):
spot_markets = await self._client.get_spot_markets(market_status=market_status)
return spot_markets

def _update_market_map_attributes(self, markets: MarketsResponse):
"""Parses MarketsResponse and re-populate the market map attributes"""
Expand Down Expand Up @@ -684,15 +703,16 @@ async def _fetch_order_history(self, order: GatewayInFlightOrder) -> Optional[De
skip = 0
search_completed = False
while not search_completed:
response: OrdersHistoryResponse = await self._client.get_historical_derivative_orders(
market_id=market.market_id,
subaccount_id=self._account_id,
direction=direction,
start_time=int(order.creation_timestamp * 1e3),
limit=CONSTANTS.FETCH_ORDER_HISTORY_LIMIT,
skip=skip,
order_types=[CONSTANTS.CLIENT_TO_BACKEND_ORDER_TYPES_MAP[(trade_type, order_type)]],
)
async with self._throttler.execute_task(limit_id=CONSTANTS.HISTORICAL_DERIVATIVE_ORDERS_LIMIT_ID):
response: OrdersHistoryResponse = await self._client.get_historical_derivative_orders(
market_id=market.market_id,
subaccount_id=self._account_id,
direction=direction,
start_time=int(order.creation_timestamp * 1e3),
limit=CONSTANTS.FETCH_ORDER_HISTORY_LIMIT,
skip=skip,
order_types=[CONSTANTS.CLIENT_TO_BACKEND_ORDER_TYPES_MAP[(trade_type, order_type)]],
)
if len(response.orders) == 0:
search_completed = True
else:
Expand All @@ -717,13 +737,14 @@ async def _fetch_order_fills(self, order: InFlightOrder) -> List[DerivativeTrade
direction: str = "buy" if order.trade_type == TradeType.BUY else "sell"

while not search_completed:
trades = await self._client.get_derivative_trades(
market_id=market_id,
subaccount_id=self._account_id,
direction=direction,
skip=skip,
start_time=int(order.creation_timestamp * 1e3),
)
async with self._throttler.execute_task(limit_id=CONSTANTS.DERIVATIVE_TRADES_LIMIT_ID):
trades = await self._client.get_derivative_trades(
market_id=market_id,
subaccount_id=self._account_id,
direction=direction,
skip=skip,
start_time=int(order.creation_timestamp * 1e3),
)
if len(trades.trades) == 0:
search_completed = True
else:
Expand All @@ -733,7 +754,9 @@ async def _fetch_order_fills(self, order: InFlightOrder) -> List[DerivativeTrade
return all_trades

async def _fetch_transaction_by_hash(self, transaction_hash: str) -> GetTxByTxHashResponse:
return await self._client.get_tx_by_hash(tx_hash=transaction_hash)
async with self._throttler.execute_task(limit_id=CONSTANTS.TRANSACTION_BY_HASH_LIMIT_ID):
transaction = await self._client.get_tx_by_hash(tx_hash=transaction_hash)
return transaction

def _update_local_balances(self, balances: Dict[str, Dict[str, Decimal]]):
# We need to keep local copy of total and available balance so we can trigger BalanceUpdateEvent with correct
Expand All @@ -748,7 +771,10 @@ def _update_local_balances(self, balances: Dict[str, Dict[str, Decimal]]):
async def _request_last_funding_rate(self, trading_pair: str) -> Decimal:
# NOTE: Can be removed when GatewayHttpClient.clob_perp_funding_info is used.
market_info: DerivativeMarketInfo = self._markets_info[trading_pair]
response: FundingRatesResponse = await self._client.get_funding_rates(market_id=market_info.market_id, limit=1)
async with self._throttler.execute_task(limit_id=CONSTANTS.FUNDING_RATES_LIMIT_ID):
response: FundingRatesResponse = await self._client.get_funding_rates(
market_id=market_info.market_id, limit=1
)
funding_rate: FundingRate = response.funding_rates[0] # We only want the latest funding rate.
return Decimal(funding_rate.rate)

Expand All @@ -757,18 +783,20 @@ async def _request_oracle_price(self, market_info: DerivativeMarketInfo) -> Deci
"""
According to Injective, Oracle Price refers to mark price.
"""
response = await self._client.get_oracle_prices(
base_symbol=market_info.oracle_base,
quote_symbol=market_info.oracle_quote,
oracle_type=market_info.oracle_type,
oracle_scale_factor=0,
)
async with self._throttler.execute_task(limit_id=CONSTANTS.ORACLE_PRICES_LIMIT_ID):
response = await self._client.get_oracle_prices(
base_symbol=market_info.oracle_base,
quote_symbol=market_info.oracle_quote,
oracle_type=market_info.oracle_type,
oracle_scale_factor=0,
)
return Decimal(response.price)

async def _request_last_trade_price(self, trading_pair: str) -> Decimal:
# NOTE: Can be replaced by calling GatewayHTTPClient.clob_perp_last_trade_price
market_info: DerivativeMarketInfo = self._markets_info[trading_pair]
response: TradesResponse = await self._client.get_derivative_trades(market_id=market_info.market_id)
async with self._throttler.execute_task(limit_id=CONSTANTS.DERIVATIVE_TRADES_LIMIT_ID):
response: TradesResponse = await self._client.get_derivative_trades(market_id=market_info.market_id)
last_trade: DerivativeTrade = response.trades[0]
price_scaler: Decimal = Decimal(f"1e-{market_info.quote_token_meta.decimals}")
last_trade_price: Decimal = Decimal(last_trade.position_delta.execution_price) * price_scaler
Expand Down Expand Up @@ -1199,7 +1227,8 @@ async def _request_funding_info(self, trading_pair: str) -> FundingInfo:
last_funding_rate: Decimal = await self._request_last_funding_rate(trading_pair=trading_pair)
oracle_price: Decimal = await self._request_oracle_price(market_info=market_info)
last_trade_price: Decimal = await self._request_last_trade_price(trading_pair=trading_pair)
updated_market_info = await self._client.get_derivative_market(market_id=market_info.market_id)
async with self._throttler.execute_task(limit_id=CONSTANTS.SINGLE_DERIVATIVE_MARKET_LIMIT_ID):
updated_market_info = await self._client.get_derivative_market(market_id=market_info.market_id)
funding_info = FundingInfo(
trading_pair=trading_pair,
index_price=last_trade_price, # Default to using last trade price
Expand Down
Loading