diff --git a/src/strategies/naive.py b/src/strategies/naive.py index d4985f8b..869b37f4 100644 --- a/src/strategies/naive.py +++ b/src/strategies/naive.py @@ -3,8 +3,11 @@ of hops using all available providers. """ +from asyncio import Task +from queue import Queue +import asyncio import traceback -from typing import List, Optional, Self +from typing import List, Optional, Self, AsyncGenerator, Any from dataclasses import dataclass import logging from src.contracts.route import Leg, Status, Route @@ -101,22 +104,18 @@ async def strategy( "Finding profitable routes", ) - async for r, route in listen_routes_with_depth_dfs( - ctx.cli_args["hops"], - ctx.cli_args["base_denom"], - ctx.cli_args["base_denom"], - set(ctx.cli_args["require_leg_types"]), - pools, - auctions, + async for r, route in eval_routes( + listen_routes_with_depth_dfs( + ctx.cli_args["hops"], + ctx.cli_args["base_denom"], + ctx.cli_args["base_denom"], + set(ctx.cli_args["require_leg_types"]), + pools, + auctions, + ctx, + ), ctx, ): - resp = await eval_route(r, route, ctx) - - if not resp: - continue - - r, route = resp - ctx.log_route(r, "info", "Route queued: %s", [fmt_route(route)]) if not state.balance: @@ -334,14 +333,32 @@ async def eval_route( return (r, route) -async def leg_liquidity(leg: Leg) -> tuple[int, int]: - if isinstance(leg.backend, AuctionProvider): - return ( - await leg.backend.remaining_asset_b(), - await leg.backend.remaining_asset_b(), - ) +async def eval_routes( + listened_routes: AsyncGenerator[tuple[Route, list[Leg]], None], + ctx: Ctx[Any], +) -> AsyncGenerator[tuple[Route, list[Leg]], None]: + """ " + Evaluates routes concurrently, yielding profitable routes. + """ - return ( - await leg.backend.balance_asset_a(), - await leg.backend.balance_asset_b(), - ) + tasks = set() + profitable_routes: Queue[tuple[Route, list[Leg]]] = Queue() + + async for r, route in listened_routes: + while not profitable_routes.empty(): + yield profitable_routes.get() + + task = asyncio.create_task(eval_route(r, route, ctx)) + + def eval_end(t: Task[Optional[tuple[Route, list[Leg]]]]) -> None: + tasks.discard(task) + + res = t.result() + + if not res: + return + + profitable_routes.put(res) + + task.add_done_callback(eval_end) + tasks.add(task) diff --git a/src/strategies/util.py b/src/strategies/util.py index 1bc187c3..7d3419c0 100644 --- a/src/strategies/util.py +++ b/src/strategies/util.py @@ -486,7 +486,7 @@ async def rebalance_portfolio( if coin.denom != ctx.cli_args["base_denom"] ) - async def eval_sell_denom(denom: str, balance: int) -> None: + async def eval_sell_denom(denom: str, sell_denom: str, balance: int) -> None: """ - Finds a route to sell the given denom - Calculates the execution plan @@ -503,7 +503,7 @@ async def eval_sell_denom(denom: str, balance: int) -> None: listen_routes_with_depth_dfs( ctx.cli_args["hops"], denom, - ctx.cli_args["base_denom"], + sell_denom, set(), pools, auctions, @@ -571,7 +571,7 @@ async def eval_sell_denom(denom: str, balance: int) -> None: await asyncio.gather( *[ - eval_sell_denom(denom, balance) + eval_sell_denom(denom, ctx.cli_args["base_denom"], balance) for denom, balance in qualifying_denoms_balances ] ) @@ -788,7 +788,11 @@ async def recover_funds( r, "info", "Recovering funds in denom %s from current denom %s on chain %s", - [ctx.cli_args["base_denom"], curr_leg.in_asset(), curr_leg.backend.chain_id], + [ + ctx.cli_args["base_denom"], + curr_leg.in_asset(), + curr_leg.backend.chain_id, + ], ) route = route[: route.index(curr_leg) - 1]