Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Sync price feed (#125)
Browse files Browse the repository at this point in the history
This PR adds a simple job that pushes all prices collected in the
`prices` table of the analytics db to Dune. It mimics PR #103, and is
meant to run as a cronjob. I followed the "legacy method" as there are
some concerns about the current way Prefect has been integrated in this
repo.

I also prepared this PR in the infra repo.
cowprotocol/infrastructure#2217

Note: there are currently not any tests. I saw there are some tests for
the app-data sync, so i could try to basically copy-paste those, as i am
not sure what's the best way to test this
  • Loading branch information
harisang authored Nov 11, 2024
1 parent 8d0e449 commit 92e702d
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 2 deletions.
9 changes: 9 additions & 0 deletions src/fetch/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class OrderbookEnv(Enum):

BARN = "BARN"
PROD = "PROD"
ANALYTICS = "ANALYTICS"

def __str__(self) -> str:
return str(self.value)
Expand Down Expand Up @@ -175,3 +176,11 @@ def get_app_hashes(cls) -> DataFrame:

# We are only interested in unique app data
return pd.concat([prod, barn]).drop_duplicates().reset_index(drop=True)

@classmethod
def get_price_feed(cls) -> DataFrame:
"""
Fetches prices from multiple price feeds from the analytics db
"""
prices_query = open_query("prices.sql")
return cls._read_query_for_env(prices_query, OrderbookEnv.ANALYTICS)
19 changes: 17 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from src.models.tables import SyncTable
from src.post.aws import AWSClient
from src.sync import sync_app_data
from src.sync.config import SyncConfig, AppDataSyncConfig
from src.sync import sync_price_feed
from src.sync.config import SyncConfig, AppDataSyncConfig, PriceFeedSyncConfig
from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards

log = set_log(__name__)
Expand Down Expand Up @@ -48,7 +49,10 @@ def __init__(self) -> None:
if __name__ == "__main__":
load_dotenv()
args = ScriptArgs()
dune = DuneClient(os.environ["DUNE_API_KEY"])
dune = DuneClient(
api_key=os.environ["DUNE_API_KEY"],
request_timeout=float(os.environ.get("DUNE_API_REQUEST_TIMEOUT", 10)),
)
orderbook = OrderbookFetcher()

if args.sync_table == SyncTable.APP_DATA:
Expand All @@ -62,6 +66,17 @@ def __init__(self) -> None:
dry_run=args.dry_run,
)
)
elif args.sync_table == SyncTable.PRICE_FEED:
table = os.environ["PRICE_FEED_TARGET_TABLE"]
assert table, "PRICE FEED sync needs a PRICE_FEED_TARGET_TABLE env"
asyncio.run(
sync_price_feed(
orderbook,
dune=dune,
config=PriceFeedSyncConfig(table),
dry_run=args.dry_run,
)
)
elif args.sync_table == SyncTable.ORDER_REWARDS:
aws = AWSClient.new_from_environment()
volume_path = Path(os.environ["VOLUME_PATH"])
Expand Down
1 change: 1 addition & 0 deletions src/models/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class SyncTable(Enum):
ORDER_REWARDS = "order_rewards"
BATCH_REWARDS = "batch_rewards"
INTERNAL_IMBALANCE = "internal_imbalance"
PRICE_FEED = "price_feed"

def __str__(self) -> str:
return str(self.value)
Expand Down
8 changes: 8 additions & 0 deletions src/sql/prices.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Selects all prices collected in the analytics db
SELECT
concat('0x', encode(p.token_address, 'hex')) as token_address,
p.time,
p.price,
td.decimals,
p.source
FROM prices p INNER JOIN token_decimals td ON p.token_address = td.token_address
1 change: 1 addition & 0 deletions src/sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Re-exported sync methods."""
from .app_data import sync_app_data
from .price_feed import sync_price_feed
12 changes: 12 additions & 0 deletions src/sync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@ class AppDataSyncConfig:
description: str = (
"Table containing known CoW Protocol appData hashes and their pre-images"
)


@dataclass
class PriceFeedSyncConfig:
"""Configuration for price feed sync."""

# The name of the table to upload to
table: str = "price_feed_test"
# Description of the table (for creation)
description: str = (
"Table containing prices and timestamps from multiple price feeds"
)
27 changes: 27 additions & 0 deletions src/sync/price_feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Main Entry point for price feed sync"""

from dune_client.client import DuneClient

from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.sync.config import PriceFeedSyncConfig

log = set_log(__name__)


async def sync_price_feed(
orderbook: OrderbookFetcher,
dune: DuneClient,
config: PriceFeedSyncConfig,
dry_run: bool,
) -> None:
"""Price Feed Sync Logic"""
prices = orderbook.get_price_feed()
if not dry_run:
dune.upload_csv(
data=prices.to_csv(index=False),
table_name=config.table,
description=config.description,
is_private=False,
)
log.info("price feed sync run completed successfully")

0 comments on commit 92e702d

Please sign in to comment.