From 92e702d569a5a8ce4883f3287ea8782f89437cd1 Mon Sep 17 00:00:00 2001 From: Haris Angelidakis <64154020+harisang@users.noreply.github.com> Date: Mon, 11 Nov 2024 18:50:00 +0200 Subject: [PATCH] Sync price feed (#125) This PR adds a simple job that pushes all prices collected in the `prices` table of the analytics db to Dune. It mimics PR #103, and is meant to run as a cronjob. I followed the "legacy method" as there are some concerns about the current way Prefect has been integrated in this repo. I also prepared this PR in the infra repo. https://github.com/cowprotocol/infrastructure/pull/2217 Note: there are currently not any tests. I saw there are some tests for the app-data sync, so i could try to basically copy-paste those, as i am not sure what's the best way to test this --- src/fetch/orderbook.py | 9 +++++++++ src/main.py | 19 +++++++++++++++++-- src/models/tables.py | 1 + src/sql/prices.sql | 8 ++++++++ src/sync/__init__.py | 1 + src/sync/config.py | 12 ++++++++++++ src/sync/price_feed.py | 27 +++++++++++++++++++++++++++ 7 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 src/sql/prices.sql create mode 100644 src/sync/price_feed.py diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index 42ff494f..01ce3f93 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -28,6 +28,7 @@ class OrderbookEnv(Enum): BARN = "BARN" PROD = "PROD" + ANALYTICS = "ANALYTICS" def __str__(self) -> str: return str(self.value) @@ -175,3 +176,11 @@ def get_app_hashes(cls) -> DataFrame: # We are only interested in unique app data return pd.concat([prod, barn]).drop_duplicates().reset_index(drop=True) + + @classmethod + def get_price_feed(cls) -> DataFrame: + """ + Fetches prices from multiple price feeds from the analytics db + """ + prices_query = open_query("prices.sql") + return cls._read_query_for_env(prices_query, OrderbookEnv.ANALYTICS) diff --git a/src/main.py b/src/main.py index 7dacef92..5ae888e1 100644 --- a/src/main.py +++ b/src/main.py @@ -13,7 +13,8 @@ from src.models.tables import SyncTable from src.post.aws import AWSClient from src.sync import sync_app_data -from src.sync.config import SyncConfig, AppDataSyncConfig +from src.sync import sync_price_feed +from src.sync.config import SyncConfig, AppDataSyncConfig, PriceFeedSyncConfig from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards log = set_log(__name__) @@ -48,7 +49,10 @@ def __init__(self) -> None: if __name__ == "__main__": load_dotenv() args = ScriptArgs() - dune = DuneClient(os.environ["DUNE_API_KEY"]) + dune = DuneClient( + api_key=os.environ["DUNE_API_KEY"], + request_timeout=float(os.environ.get("DUNE_API_REQUEST_TIMEOUT", 10)), + ) orderbook = OrderbookFetcher() if args.sync_table == SyncTable.APP_DATA: @@ -62,6 +66,17 @@ def __init__(self) -> None: dry_run=args.dry_run, ) ) + elif args.sync_table == SyncTable.PRICE_FEED: + table = os.environ["PRICE_FEED_TARGET_TABLE"] + assert table, "PRICE FEED sync needs a PRICE_FEED_TARGET_TABLE env" + asyncio.run( + sync_price_feed( + orderbook, + dune=dune, + config=PriceFeedSyncConfig(table), + dry_run=args.dry_run, + ) + ) elif args.sync_table == SyncTable.ORDER_REWARDS: aws = AWSClient.new_from_environment() volume_path = Path(os.environ["VOLUME_PATH"]) diff --git a/src/models/tables.py b/src/models/tables.py index 147f217a..94dc69be 100644 --- a/src/models/tables.py +++ b/src/models/tables.py @@ -9,6 +9,7 @@ class SyncTable(Enum): ORDER_REWARDS = "order_rewards" BATCH_REWARDS = "batch_rewards" INTERNAL_IMBALANCE = "internal_imbalance" + PRICE_FEED = "price_feed" def __str__(self) -> str: return str(self.value) diff --git a/src/sql/prices.sql b/src/sql/prices.sql new file mode 100644 index 00000000..919a6045 --- /dev/null +++ b/src/sql/prices.sql @@ -0,0 +1,8 @@ +-- Selects all prices collected in the analytics db +SELECT + concat('0x', encode(p.token_address, 'hex')) as token_address, + p.time, + p.price, + td.decimals, + p.source +FROM prices p INNER JOIN token_decimals td ON p.token_address = td.token_address diff --git a/src/sync/__init__.py b/src/sync/__init__.py index 37b723b3..8136f50c 100644 --- a/src/sync/__init__.py +++ b/src/sync/__init__.py @@ -1,2 +1,3 @@ """Re-exported sync methods.""" from .app_data import sync_app_data +from .price_feed import sync_price_feed diff --git a/src/sync/config.py b/src/sync/config.py index 829b5313..3be39788 100644 --- a/src/sync/config.py +++ b/src/sync/config.py @@ -28,3 +28,15 @@ class AppDataSyncConfig: description: str = ( "Table containing known CoW Protocol appData hashes and their pre-images" ) + + +@dataclass +class PriceFeedSyncConfig: + """Configuration for price feed sync.""" + + # The name of the table to upload to + table: str = "price_feed_test" + # Description of the table (for creation) + description: str = ( + "Table containing prices and timestamps from multiple price feeds" + ) diff --git a/src/sync/price_feed.py b/src/sync/price_feed.py new file mode 100644 index 00000000..a6d055d3 --- /dev/null +++ b/src/sync/price_feed.py @@ -0,0 +1,27 @@ +"""Main Entry point for price feed sync""" + +from dune_client.client import DuneClient + +from src.fetch.orderbook import OrderbookFetcher +from src.logger import set_log +from src.sync.config import PriceFeedSyncConfig + +log = set_log(__name__) + + +async def sync_price_feed( + orderbook: OrderbookFetcher, + dune: DuneClient, + config: PriceFeedSyncConfig, + dry_run: bool, +) -> None: + """Price Feed Sync Logic""" + prices = orderbook.get_price_feed() + if not dry_run: + dune.upload_csv( + data=prices.to_csv(index=False), + table_name=config.table, + description=config.description, + is_private=False, + ) + log.info("price feed sync run completed successfully")