Skip to content

Commit

Permalink
Evaluate routes in parallel.
Browse files Browse the repository at this point in the history
  • Loading branch information
dowlandaiello committed Jul 26, 2024
1 parent 11f27af commit 79848a8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
67 changes: 42 additions & 25 deletions src/strategies/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
of hops using all available providers.
"""

from asyncio import Task
from queue import Queue
import asyncio
import traceback
from typing import List, Optional, Self
from typing import List, Optional, Self, AsyncGenerator, Any
from dataclasses import dataclass
import logging
from src.contracts.route import Leg, Status, Route
Expand Down Expand Up @@ -101,22 +104,18 @@ async def strategy(
"Finding profitable routes",
)

async for r, route in listen_routes_with_depth_dfs(
ctx.cli_args["hops"],
ctx.cli_args["base_denom"],
ctx.cli_args["base_denom"],
set(ctx.cli_args["require_leg_types"]),
pools,
auctions,
async for r, route in eval_routes(
listen_routes_with_depth_dfs(
ctx.cli_args["hops"],
ctx.cli_args["base_denom"],
ctx.cli_args["base_denom"],
set(ctx.cli_args["require_leg_types"]),
pools,
auctions,
ctx,
),
ctx,
):
resp = await eval_route(r, route, ctx)

if not resp:
continue

r, route = resp

ctx.log_route(r, "info", "Route queued: %s", [fmt_route(route)])

if not state.balance:
Expand Down Expand Up @@ -334,14 +333,32 @@ async def eval_route(
return (r, route)


async def leg_liquidity(leg: Leg) -> tuple[int, int]:
if isinstance(leg.backend, AuctionProvider):
return (
await leg.backend.remaining_asset_b(),
await leg.backend.remaining_asset_b(),
)
async def eval_routes(
listened_routes: AsyncGenerator[tuple[Route, list[Leg]], None],
ctx: Ctx[Any],
) -> AsyncGenerator[tuple[Route, list[Leg]], None]:
""" "
Evaluates routes concurrently, yielding profitable routes.
"""

return (
await leg.backend.balance_asset_a(),
await leg.backend.balance_asset_b(),
)
tasks = set()
profitable_routes: Queue[tuple[Route, list[Leg]]] = Queue()

async for r, route in listened_routes:
while not profitable_routes.empty():
yield profitable_routes.get()

task = asyncio.create_task(eval_route(r, route, ctx))

def eval_end(t: Task[Optional[tuple[Route, list[Leg]]]]) -> None:
tasks.discard(task)

res = t.result()

if not res:
return

profitable_routes.put(res)

task.add_done_callback(eval_end)
tasks.add(task)
12 changes: 8 additions & 4 deletions src/strategies/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ async def rebalance_portfolio(
if coin.denom != ctx.cli_args["base_denom"]
)

async def eval_sell_denom(denom: str, balance: int) -> None:
async def eval_sell_denom(denom: str, sell_denom: str, balance: int) -> None:
"""
- Finds a route to sell the given denom
- Calculates the execution plan
Expand All @@ -503,7 +503,7 @@ async def eval_sell_denom(denom: str, balance: int) -> None:
listen_routes_with_depth_dfs(
ctx.cli_args["hops"],
denom,
ctx.cli_args["base_denom"],
sell_denom,
set(),
pools,
auctions,
Expand Down Expand Up @@ -571,7 +571,7 @@ async def eval_sell_denom(denom: str, balance: int) -> None:

await asyncio.gather(
*[
eval_sell_denom(denom, balance)
eval_sell_denom(denom, ctx.cli_args["base_denom"], balance)
for denom, balance in qualifying_denoms_balances
]
)
Expand Down Expand Up @@ -788,7 +788,11 @@ async def recover_funds(
r,
"info",
"Recovering funds in denom %s from current denom %s on chain %s",
[ctx.cli_args["base_denom"], curr_leg.in_asset(), curr_leg.backend.chain_id],
[
ctx.cli_args["base_denom"],
curr_leg.in_asset(),
curr_leg.backend.chain_id,
],
)

route = route[: route.index(curr_leg) - 1]
Expand Down

0 comments on commit 79848a8

Please sign in to comment.