diff --git a/.env.sample b/.env.sample index c913a932..2360f6d5 100644 --- a/.env.sample +++ b/.env.sample @@ -1,3 +1,10 @@ +VOLUME_PATH=data +APP_DATA_MAX_RETRIES=3 +APP_DATA_GIVE_UP_THRESHOLD=100 + +# Dune credentials +DUNE_API_KEY= + # AWS Credentials AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= @@ -6,3 +13,10 @@ AWS_INTERNAL_ROLE= AWS_EXTERNAL_ROLE= AWS_EXTERNAL_ID= AWS_BUCKET= + +#Orderbook DB Credentials +BARN_DB_URL={user}:{password}@{host}:{port}/{database} +PROD_DB_URL={user}:{password}@{host}:{port}/{database} + +# IPFS Gateway +IPFS_ACCESS_TOKEN= diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml new file mode 100644 index 00000000..ddf5d275 --- /dev/null +++ b/.github/workflows/deploy.yaml @@ -0,0 +1,34 @@ +name: deploy + +on: + push: + branches: [main] + tags: [v*] + +jobs: + deploy: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - uses: actions/checkout@v2 + + - uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - id: meta + uses: docker/metadata-action@v3 + with: + images: ghcr.io/${{ github.repository }} + - uses: docker/build-push-action@v2 + with: + context: . + file: Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 8c03d1f2..1d796116 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -24,3 +24,29 @@ jobs: run: black --check ./ - name: Type Check (mypy) run: mypy src --strict + + tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.10", "3.11"] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Requirements + run: pip install -r requirements/dev.txt + - name: Unit Tests + run: python -m pytest tests/unit + env: + IPFS_ACCESS_KEY: ${{ secrets.IPFS_ACCESS_KEY }} + - name: Integration Tests + run: python -m pytest tests/integration + env: + PROD_DB_URL: ${{ secrets.PROD_DB_URL }} + BARN_DB_URL: ${{ secrets.BARN_DB_URL }} + WAREHOUSE_URL: ${{ secrets.WAREHOUSE_URL }} diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..335b2081 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.10 + +WORKDIR /app + +COPY requirements/* requirements/ +RUN pip install -r requirements/prod.txt +COPY ./src ./src +COPY logging.conf . + +ENTRYPOINT [ "python3", "-m" , "src.main"] diff --git a/requirements/dev.txt b/requirements/dev.txt index ee2d2d1f..1c31125c 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -5,4 +5,5 @@ black>=22.6.0 mypy==1.3.0 mypy-extensions==1.0.0 pylint>=2.14.4 -pytest>=7.1.2 \ No newline at end of file +pytest>=7.1.2 +sqlalchemy-stubs>=0.4 \ No newline at end of file diff --git a/requirements/prod.txt b/requirements/prod.txt index 495afc14..16b34982 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -1,5 +1,9 @@ dune-client>=0.3.0 +psycopg2-binary>=2.9.3 python-dotenv>=0.20.0 requests>=2.28.1 pandas>=1.5.0 -boto3>=1.26.12 \ No newline at end of file +ndjson>=0.3.1 +py-multiformats-cid>=0.4.4 +boto3>=1.26.12 +SQLAlchemy<2.0 \ No newline at end of file diff --git a/seed_data.zip b/seed_data.zip new file mode 100644 index 00000000..33d7e1f3 Binary files /dev/null and b/seed_data.zip differ diff --git a/src/dune_queries.py b/src/dune_queries.py new file mode 100644 index 00000000..a484beff --- /dev/null +++ b/src/dune_queries.py @@ -0,0 +1,44 @@ +""" +Localized account of all Queries related to this project's main functionality +""" +from __future__ import annotations + +from copy import copy +from dataclasses import dataclass + +from dune_client.query import Query +from dune_client.types import QueryParameter + + +@dataclass +class QueryData: + """Stores name and a version of the query for each query.""" + + name: str + query: Query + + def __init__(self, name: str, query_id: int, filename: str) -> None: + self.name = name + self.filepath = filename + self.query = Query(query_id, name) + + def with_params(self, params: list[QueryParameter]) -> Query: + """ + Copies the query and adds parameters to it, returning the copy. + """ + # We currently default to the V1 Queries, soon to switch them out. + query_copy = copy(self.query) + query_copy.params = params + return query_copy + + +QUERIES = { + "APP_HASHES": QueryData( + query_id=1610025, name="Unique App Hashes", filename="app_hashes.sql" + ), + "LATEST_APP_HASH_BLOCK": QueryData( + query_id=1615490, + name="Latest Possible App Hash Block", + filename="app_hash_latest_block.sql", + ), +} diff --git a/src/environment.py b/src/environment.py index 3c87b3c1..f78290eb 100644 --- a/src/environment.py +++ b/src/environment.py @@ -2,4 +2,5 @@ from pathlib import Path PROJECT_ROOT = Path(__file__).parent.parent +QUERY_PATH = PROJECT_ROOT / Path("src/sql") LOG_CONFIG_FILE = PROJECT_ROOT / Path("logging.conf") diff --git a/src/fetch/__init__.py b/src/fetch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/fetch/dune.py b/src/fetch/dune.py new file mode 100644 index 00000000..7565399a --- /dev/null +++ b/src/fetch/dune.py @@ -0,0 +1,85 @@ +""" +All Dune Query executions should be routed through this file. +TODO - Move reusable components into dune-client: + https://github.com/cowprotocol/dune-bridge/issues/40 +""" +import asyncio +import sys + +from dune_client.client import DuneClient +from dune_client.query import Query +from dune_client.types import DuneRecord +from requests import HTTPError + +from src.dune_queries import QUERIES +from src.logger import set_log +from src.models.block_range import BlockRange + +log = set_log(__name__) + + +class DuneFetcher: + """ + Class containing, DuneClient, FileIO and a logger for convenient Dune Fetching. + """ + + def __init__( + self, + api_key: str, + ) -> None: + """ + Class constructor. + Builds DuneClient from `api_key` along with a logger and FileIO object. + """ + self.dune = DuneClient(api_key) + + async def fetch(self, query: Query) -> list[DuneRecord]: + """Async Dune Fetcher with some exception handling.""" + log.debug(f"Executing {query}") + + try: + # Tried to use the AsyncDuneClient, without success: + # https://github.com/cowprotocol/dune-client/pull/31#issuecomment-1316045313 + response = await asyncio.to_thread( + self.dune.refresh, query, ping_frequency=10 + ) + if response.state.is_complete(): + response_rows = response.get_rows() + log.debug( + f"Got {len(response_rows)} results for execution {response.execution_id}" + ) + return response_rows + + message = ( + f"query execution {response.execution_id} incomplete {response.state}" + ) + log.error(message) + raise RuntimeError(f"no results for {message}") + except HTTPError as err: + log.error(f"Got {err} - Exiting") + sys.exit() + + async def latest_app_hash_block(self) -> int: + """ + Block Range is used to app hash fetcher where to find the new records. + block_from: read from file `fname` as a loaded singleton. + - uses genesis block is no file exists (should only ever happen once) + - raises RuntimeError if column specified does not exist. + block_to: fetched from Dune as the last indexed block for "GPv2Settlement_call_settle" + """ + return int( + # KeyError here means the query has been modified and column no longer exists + # IndexError means no results were returned from query (which is unlikely). + (await self.fetch(QUERIES["LATEST_APP_HASH_BLOCK"].query))[0][ + "latest_block" + ] + ) + + async def get_app_hashes(self, block_range: BlockRange) -> list[DuneRecord]: + """ + Executes APP_HASHES query for the given `block_range` and returns the results + """ + app_hash_query = QUERIES["APP_HASHES"].with_params( + block_range.as_query_params() + ) + return await self.fetch(app_hash_query) diff --git a/src/fetch/ipfs.py b/src/fetch/ipfs.py new file mode 100644 index 00000000..2f7566e8 --- /dev/null +++ b/src/fetch/ipfs.py @@ -0,0 +1,154 @@ +"""IPFS CID (de)serialization""" +from __future__ import annotations + +import asyncio +from typing import Any, Optional + +import aiohttp +import requests +from aiohttp import ClientSession +from multiformats_cid.cid import from_bytes + +from src.logger import set_log +from src.models.app_data_content import FoundContent, NotFoundContent + +log = set_log(__name__) + +OLD_PREFIX = bytearray([1, 112, 18, 32]) +# https://github.com/cowprotocol/services/blob/2db800aa38824e32fb542c9e2387d77ca6349676/crates/app-data-hash/src/lib.rs#L41-L44 +NEW_PREFIX = bytearray([1, 0x55, 0x1B, 32]) + + +class Cid: + """Holds logic for constructing and converting various representations of a Delegation ID""" + + def __init__(self, hex_str: str, prefix: bytearray = NEW_PREFIX) -> None: + """Builds Object (bytes as base representation) from hex string.""" + stripped_hex = hex_str.replace("0x", "") + # Anatomy of a CID: https://proto.school/anatomy-of-a-cid/04 + self.bytes = bytes(prefix + bytes.fromhex(stripped_hex)) + + @classmethod + def old_schema(cls, hex_str: str) -> Cid: + """Constructor of old CID format (with different prefix)""" + return cls(hex_str, OLD_PREFIX) + + @property + def hex(self) -> str: + """Returns hex representation""" + without_prefix = self.bytes[4:] + return "0x" + without_prefix.hex() + + def __str__(self) -> str: + """Returns string representation""" + return str(from_bytes(self.bytes)) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Cid): + return False + return self.bytes == other.bytes + + def url(self) -> str: + """IPFS URL where content can be recovered""" + return f"https://ipfs.cow.fi/ipfs/{self}" + + def get_content(self, access_token: str, max_retries: int = 3) -> Optional[Any]: + """ + Attempts to fetch content at cid with a timeout of 1 second. + Trys `max_retries` times and otherwise returns None` + """ + attempts = 0 + while attempts < max_retries: + try: + response = requests.get( + self.url(), + timeout=1, + headers={"x-pinata-gateway-token": access_token}, + ) + return response.json() + except requests.exceptions.ReadTimeout: + attempts += 1 + except requests.exceptions.JSONDecodeError as err: + attempts += 1 + log.warning(f"unexpected error {err} retrying...") + return None + + @classmethod + async def fetch_many( # pylint: disable=too-many-locals + cls, missing_rows: list[dict[str, str]], access_token: str, max_retries: int = 3 + ) -> tuple[list[FoundContent], list[NotFoundContent]]: + """Async AppData Fetching""" + found, not_found = [], [] + async with aiohttp.ClientSession( + headers={"x-pinata-gateway-token": access_token} + ) as session: + while missing_rows: + row = missing_rows.pop() + app_hash = row["app_hash"] + + previous_attempts = int(row.get("attempts", 0)) + cid = cls(app_hash) + + first_seen_block = int(row["first_seen_block"]) + result = await cid.fetch_content( + max_retries, previous_attempts, session, first_seen_block + ) + if isinstance(result, NotFoundContent): + # Try Fetching the old format + result = await cls.old_schema(app_hash).fetch_content( + max_retries, previous_attempts, session, first_seen_block + ) + + if isinstance(result, FoundContent): + found.append(result) + else: + assert isinstance(result, NotFoundContent) + not_found.append(result) + + return found, not_found + + async def fetch_content( + self, + max_retries: int, + previous_attempts: int, + session: ClientSession, + first_seen_block: int, + ) -> FoundContent | NotFoundContent: + """Asynchronous content fetching""" + attempts = 0 + while attempts < max_retries: + try: + async with session.get(self.url(), timeout=1) as response: + content = await response.json() + if previous_attempts: + log.debug( + f"Found previously missing content hash {self.hex} at CID {self}" + ) + else: + log.debug( + f"Found content for {self.hex} at CID {self} ({attempts + 1} trys)" + ) + return FoundContent( + app_hash=self.hex, + first_seen_block=first_seen_block, + content=content, + ) + except asyncio.TimeoutError: + attempts += 1 + except aiohttp.ContentTypeError as err: + log.warning(f"failed to parse response {response} with {err}") + attempts += 1 + + # Content Not Found. + total_attempts = previous_attempts + max_retries + base_message = f"no content found for {self.hex} at CID {self} after" + if previous_attempts: + log.debug(f"still {base_message} {total_attempts} attempts") + else: + log.debug(f"{base_message} {max_retries} retries") + + return NotFoundContent( + app_hash=self.hex, + first_seen_block=first_seen_block, + attempts=total_attempts, + ) diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py new file mode 100644 index 00000000..06f65603 --- /dev/null +++ b/src/fetch/orderbook.py @@ -0,0 +1,111 @@ +"""Basic client for connecting to postgres database with login credentials""" +from __future__ import annotations + +import os +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +import pandas as pd +from dotenv import load_dotenv +from pandas import DataFrame +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + +from src.models.block_range import BlockRange +from src.utils import open_query + +REORG_THRESHOLD = 65 + + +class OrderbookEnv(Enum): + """ + Enum for distinguishing between CoW Protocol's staging and production environment + """ + + BARN = "BARN" + PROD = "PROD" + + def __str__(self) -> str: + return str(self.value) + + +@dataclass +class OrderbookFetcher: + """ + A pair of Dataframes primarily intended to store query results + from production and staging orderbook databases + """ + + @staticmethod + def _pg_engine(db_env: OrderbookEnv) -> Engine: + """Returns a connection to postgres database""" + load_dotenv() + db_url = os.environ[f"{db_env}_DB_URL"] + db_string = f"postgresql+psycopg2://{db_url}" + return create_engine(db_string) + + @classmethod + def _read_query_for_env( + cls, query: str, env: OrderbookEnv, data_types: Optional[dict[str, str]] = None + ) -> DataFrame: + return pd.read_sql_query(query, con=cls._pg_engine(env), dtype=data_types) + + @classmethod + def _query_both_dbs( + cls, query: str, data_types: Optional[dict[str, str]] = None + ) -> tuple[DataFrame, DataFrame]: + barn = cls._read_query_for_env(query, OrderbookEnv.BARN, data_types) + prod = cls._read_query_for_env(query, OrderbookEnv.PROD, data_types) + return barn, prod + + @classmethod + def get_latest_block(cls) -> int: + """ + Fetches the latest mutually synced block from orderbook databases (with REORG protection) + """ + data_types = {"latest": "int64"} + barn, prod = cls._query_both_dbs( + open_query("orderbook/latest_block.sql"), data_types + ) + assert len(barn) == 1 == len(prod), "Expecting single record" + return min(int(barn["latest"][0]), int(prod["latest"][0])) - REORG_THRESHOLD + + @classmethod + def get_order_rewards(cls, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Order Reward DataFrame as concatenation from Prod and Staging DB + """ + cow_reward_query = ( + open_query("orderbook/order_rewards.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + ) + data_types = {"block_number": "int64", "amount": "float64"} + barn, prod = cls._query_both_dbs(cow_reward_query, data_types) + + # Solvers do not appear in both environments! + assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!" + return pd.concat([prod, barn]) + + @classmethod + def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Batch Rewards DataFrame as concatenation from Prod and Staging DB + """ + cow_reward_query = ( + open_query("orderbook/batch_rewards.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + ) + data_types = { + # According to this: https://stackoverflow.com/a/11548224 + # capitalized int64 means `Optional` and it appears to work. + "block_number": "Int64", + "block_deadline": "int64", + } + barn, prod = cls._query_both_dbs(cow_reward_query, data_types) + + # Solvers do not appear in both environments! + assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!" + return pd.concat([prod, barn]) diff --git a/src/fetch/postgres.py b/src/fetch/postgres.py new file mode 100644 index 00000000..3364c519 --- /dev/null +++ b/src/fetch/postgres.py @@ -0,0 +1,54 @@ +"""Generic Postgres Adapter for executing queries on a postgres DB.""" +from dataclasses import dataclass +from typing import Optional + +import pandas as pd +from pandas import DataFrame +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + +from src.fetch.orderbook import REORG_THRESHOLD +from src.models.block_range import BlockRange +from src.utils import open_query + + +@dataclass +class PostgresFetcher: + """ + Basic Postgres interface + """ + + engine: Engine + + def __init__(self, db_url: str): + db_string = f"postgresql+psycopg2://{db_url}" + + self.engine = create_engine(db_string) + + def _read_query( + self, query: str, data_types: Optional[dict[str, str]] = None + ) -> DataFrame: + return pd.read_sql_query(query, con=self.engine, dtype=data_types) + + def get_latest_block(self) -> int: + """ + Fetches the latest mutually synced block from orderbook databases (with REORG protection) + """ + data_types = {"latest": "int64"} + res = self._read_query(open_query("warehouse/latest_block.sql"), data_types) + assert len(res) == 1, "Expecting single record" + return int(res["latest"][0]) - REORG_THRESHOLD + + def get_internal_imbalances(self, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Internal Token Imbalances + """ + cow_reward_query = ( + open_query("warehouse/token_imbalances.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + ) + data_types = { + "block_number": "int64", + } + return self._read_query(cow_reward_query, data_types) diff --git a/src/main.py b/src/main.py new file mode 100644 index 00000000..834ca3c1 --- /dev/null +++ b/src/main.py @@ -0,0 +1,95 @@ +"""Main Entry point for app_hash sync""" +import argparse +import asyncio +import os +from dataclasses import dataclass +from pathlib import Path + +from dotenv import load_dotenv + +from src.fetch.dune import DuneFetcher +from src.fetch.orderbook import OrderbookFetcher +from src.fetch.postgres import PostgresFetcher +from src.logger import set_log +from src.models.tables import SyncTable +from src.post.aws import AWSClient +from src.sync import sync_app_data +from src.sync.config import SyncConfig, AppDataSyncConfig +from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards +from src.sync.token_imbalance import sync_internal_imbalance + +log = set_log(__name__) + + +@dataclass +class ScriptArgs: + """Runtime arguments' parser/initializer""" + + dry_run: bool + sync_table: SyncTable + + def __init__(self) -> None: + parser = argparse.ArgumentParser("Dune Community Sources Sync") + parser.add_argument( + "--sync-table", + type=SyncTable, + required=True, + choices=list(SyncTable), + ) + parser.add_argument( + "--dry-run", + type=bool, + help="Flag indicating whether script should not post files to AWS or not", + default=False, + ) + arguments, _ = parser.parse_known_args() + self.sync_table: SyncTable = arguments.sync_table + self.dry_run: bool = arguments.dry_run + + +if __name__ == "__main__": + load_dotenv() + volume_path = Path(os.environ["VOLUME_PATH"]) + args = ScriptArgs() + aws = AWSClient.new_from_environment() + + if args.sync_table == SyncTable.APP_DATA: + asyncio.run( + sync_app_data( + aws, + dune=DuneFetcher(os.environ["DUNE_API_KEY"]), + config=AppDataSyncConfig( + volume_path=volume_path, + missing_files_name="missing_app_hashes.json", + max_retries=int(os.environ.get("APP_DATA_MAX_RETRIES", 3)), + give_up_threshold=int( + os.environ.get("APP_DATA_GIVE_UP_THRESHOLD", 100) + ), + ), + ipfs_access_key=os.environ["IPFS_ACCESS_KEY"], + dry_run=args.dry_run, + ) + ) + elif args.sync_table == SyncTable.ORDER_REWARDS: + sync_order_rewards( + aws, + config=SyncConfig(volume_path), + fetcher=OrderbookFetcher(), + dry_run=args.dry_run, + ) + elif args.sync_table == SyncTable.BATCH_REWARDS: + sync_batch_rewards( + aws, + config=SyncConfig(volume_path), + fetcher=OrderbookFetcher(), + dry_run=args.dry_run, + ) + elif args.sync_table == SyncTable.INTERNAL_IMBALANCE: + sync_internal_imbalance( + aws, + config=SyncConfig(volume_path), + fetcher=PostgresFetcher(os.environ["WAREHOUSE_URL"]), + dry_run=args.dry_run, + ) + else: + log.error(f"unsupported sync_table '{args.sync_table}'") diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/models/app_data_content.py b/src/models/app_data_content.py new file mode 100644 index 00000000..20c3055a --- /dev/null +++ b/src/models/app_data_content.py @@ -0,0 +1,59 @@ +"""Models for Found and Not Found App Data Content. Also, responsible for type conversion""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass +class FoundContent: + """Representation of AppData with Content""" + + app_hash: str + first_seen_block: int + content: dict[str, Any] + + @classmethod + def from_dict(cls, row: dict[str, Any]) -> FoundContent: + """Constructor from dictionary""" + return cls( + app_hash=row["app_hash"], + first_seen_block=int(row["first_seen_block"]), + content=row["content"], + ) + + def as_dune_record(self) -> dict[str, Any]: + """Converts to DuneRecord type""" + return { + "app_hash": self.app_hash, + "first_seen_block": self.first_seen_block, + "content": self.content, + } + + +@dataclass +class NotFoundContent: + """ + Representation of AppData with unknown content. + Records also number of attempts made to recover the content""" + + app_hash: str + first_seen_block: int + attempts: int + + @classmethod + def from_dict(cls, row: dict[str, Any]) -> NotFoundContent: + """Constructor from dictionary""" + return cls( + app_hash=row["app_hash"], + first_seen_block=int(row["first_seen_block"]), + attempts=int(row["attempts"]), + ) + + def as_dune_record(self) -> dict[str, Any]: + """Converts to DuneRecord type""" + return { + "app_hash": self.app_hash, + "first_seen_block": self.first_seen_block, + "attempts": self.attempts, + } diff --git a/src/models/batch_rewards_schema.py b/src/models/batch_rewards_schema.py new file mode 100644 index 00000000..a3bc5bab --- /dev/null +++ b/src/models/batch_rewards_schema.py @@ -0,0 +1,41 @@ +"""Model for Batch Rewards Data""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import pandas +from pandas import DataFrame + + +@dataclass +class BatchRewards: + """ + This class provides a transformation interface for the Dataframe we fetch from the orderbook + """ + + @classmethod + def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]]: + """Converts Pandas DataFrame into the expected stream type for Dune""" + return [ + { + "block_number": int(row["block_number"]) + if not pandas.isna(row["block_number"]) + else None, + "tx_hash": row["tx_hash"], + "solver": row["solver"], + "block_deadline": int(row["block_deadline"]), + "data": { + # All the following values are in WEI. + "uncapped_payment_eth": int(row["uncapped_payment_eth"]), + "capped_payment": int(row["capped_payment"]), + "execution_cost": int(row["execution_cost"]), + "surplus": int(row["surplus"]), + "fee": int(row["fee"]), + "winning_score": int(row["winning_score"]), + "reference_score": int(row["reference_score"]), + "participating_solvers": row["participating_solvers"], + }, + } + for row in rewards_df.to_dict(orient="records") + ] diff --git a/src/models/block_range.py b/src/models/block_range.py new file mode 100644 index 00000000..25f7a89b --- /dev/null +++ b/src/models/block_range.py @@ -0,0 +1,31 @@ +""" +BlockRange Model is just a data class for left and right bounds +""" +from dataclasses import dataclass + +from dune_client.types import QueryParameter + + +@dataclass +class BlockRange: + """ + Basic dataclass for an Ethereum block range with some Dune compatibility methods. + TODO (easy) - this data class could probably live in dune-client. + https://github.com/cowprotocol/dune-bridge/issues/40 + """ + + block_from: int + block_to: int + + def __str__(self) -> str: + return f"BlockRange(from={self.block_from}, to={self.block_to})" + + def __repr__(self) -> str: + return str(self) + + def as_query_params(self) -> list[QueryParameter]: + """Returns self as Dune QueryParameters""" + return [ + QueryParameter.number_type("BlockFrom", self.block_from), + QueryParameter.number_type("BlockTo", self.block_to), + ] diff --git a/src/models/order_rewards_schema.py b/src/models/order_rewards_schema.py new file mode 100644 index 00000000..2b7734a2 --- /dev/null +++ b/src/models/order_rewards_schema.py @@ -0,0 +1,31 @@ +"""Model for Order Rewards Data""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from pandas import DataFrame + + +@dataclass +class OrderRewards: + """ + This class provides a transformation interface for the Dataframe we fetch from the orderbook + """ + + @classmethod + def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]]: + """Converts Pandas DataFrame into the expected stream type for Dune""" + return [ + { + "block_number": int(row["block_number"]), + "order_uid": row["order_uid"], + "tx_hash": row["tx_hash"], + "solver": row["solver"], + "data": { + "surplus_fee": str(row["surplus_fee"]), + "amount": float(row["amount"]), + }, + } + for row in rewards_df.to_dict(orient="records") + ] diff --git a/src/models/tables.py b/src/models/tables.py new file mode 100644 index 00000000..147f217a --- /dev/null +++ b/src/models/tables.py @@ -0,0 +1,19 @@ +"""Data structure containing the supported sync tables""" +from enum import Enum + + +class SyncTable(Enum): + """Enum for Deployment Supported Table Sync""" + + APP_DATA = "app_data" + ORDER_REWARDS = "order_rewards" + BATCH_REWARDS = "batch_rewards" + INTERNAL_IMBALANCE = "internal_imbalance" + + def __str__(self) -> str: + return str(self.value) + + @staticmethod + def supported_tables() -> list[str]: + """Returns a list of supported tables (i.e. valid object contructors).""" + return [str(t) for t in list(SyncTable)] diff --git a/src/models/token_imbalance_schema.py b/src/models/token_imbalance_schema.py new file mode 100644 index 00000000..e7053cd4 --- /dev/null +++ b/src/models/token_imbalance_schema.py @@ -0,0 +1,27 @@ +"""Model for Batch Rewards Data""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from pandas import DataFrame + + +@dataclass +class TokenImbalance: + """ + This class provides a transformation interface (to JSON) for Dataframe + """ + + @classmethod + def from_pdf_to_dune_records(cls, frame: DataFrame) -> list[dict[str, Any]]: + """Converts Pandas DataFrame into the expected stream type for Dune""" + return [ + { + "block_number": int(row["block_number"]), + "tx_hash": row["tx_hash"], + "token": row["token"], + "amount": str(row["amount"]), + } + for row in frame.to_dict(orient="records") + ] diff --git a/src/post/__init__.py b/src/post/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/aws.py b/src/post/aws.py similarity index 82% rename from src/aws.py rename to src/post/aws.py index f9cb08e5..3e8c455e 100644 --- a/src/aws.py +++ b/src/post/aws.py @@ -1,7 +1,6 @@ """Aws S3 Bucket functionality (namely upload_file)""" from __future__ import annotations -import json import os from collections import defaultdict from dataclasses import dataclass @@ -14,7 +13,7 @@ from dotenv import load_dotenv from src.logger import set_log -from src.text_io import BytesIteratorIO +from src.models.tables import SyncTable log = set_log(__name__) @@ -79,17 +78,21 @@ def from_bucket_collection(cls, bucket_objects: Any) -> BucketStructure: object_key = bucket_obj.key path, _ = object_key.split("/") grouped_files[path].append(BucketFileObject.from_key(object_key)) + if path not in SyncTable.supported_tables(): + # Catches any unrecognized filepath. + log.warning(f"Found unexpected file {object_key}") log.debug(f"loaded bucket filesystem: {grouped_files.keys()}") return cls(files=grouped_files) - def get(self, table: str) -> list[BucketFileObject]: + def get(self, table: SyncTable | str) -> list[BucketFileObject]: """ Returns the list of files under `table` - returns empty list if none available. """ - return self.files.get(table, []) + table_str = str(table) if isinstance(table, SyncTable) else table + return self.files.get(table_str, []) class AWSClient: @@ -128,22 +131,16 @@ def _assume_role(self) -> ServiceResource: """ sts_client = boto3.client("sts") - # TODO - assume that the internal role is already assumed. and use get session_token - # sts_client.get_session_token() internal_assumed_role_object = sts_client.assume_role( RoleArn=self.internal_role, RoleSessionName="InternalSession", ) credentials = internal_assumed_role_object["Credentials"] - # sts_client.get_session_token() - sts_client = boto3.client( "sts", - aws_access_key_id=credentials["AccessKeyId"], # AWS_ACCESS_KEY_ID - aws_secret_access_key=credentials[ - "SecretAccessKey" - ], # AWS_SECRET_ACCESS_KEY - aws_session_token=credentials["SessionToken"], # AWS_SESSION_TOKEN + aws_access_key_id=credentials["AccessKeyId"], + aws_secret_access_key=credentials["SecretAccessKey"], + aws_session_token=credentials["SessionToken"], ) external_assumed_role_object = sts_client.assume_role( @@ -179,30 +176,6 @@ def upload_file(self, filename: str, object_key: str) -> bool: log.debug(f"uploaded {filename} to {self.bucket}") return True - def put_object(self, data_set: list[dict[str, Any]], object_key: str) -> bool: - """Upload a file to an S3 bucket - - :param data_list: Data to upload. Should be a full path to file. - :param object_key: S3 object key. For our purposes, this would - be f"{table_name}/cow_{latest_block_number}.json" - :return: True if file was uploaded, else raises - """ - - file_object = BytesIteratorIO( - f"{json.dumps(row)}\n".encode("utf-8") for row in data_set - ) - - s3_client = self._get_s3_client(self._assume_role()) - - s3_client.upload_fileobj( - file_object, - bucket=self.bucket, - key=object_key, - extra_args={"ACL": "bucket-owner-full-control"}, - ) - log.debug(f"uploaded {object_key} to {self.bucket}") - return True - def delete_file(self, object_key: str) -> bool: """Delete a file from an S3 bucket @@ -210,6 +183,7 @@ def delete_file(self, object_key: str) -> bool: be f"{table_name}/cow_{latest_block_number}.json" :return: True if file was deleted, else raises """ + # TODO - types! error: "BaseClient" has no attribute "delete_object" s3_client = self._get_s3_client(self._assume_role()) s3_client.delete_object( # type: ignore Bucket=self.bucket, @@ -246,13 +220,14 @@ def existing_files(self) -> BucketStructure: bucket_objects = bucket.objects.all() return BucketStructure.from_bucket_collection(bucket_objects) - def last_sync_block(self, table: str) -> int: + def last_sync_block(self, table: SyncTable | str) -> int: """ Based on the existing bucket files, the last sync block is uniquely determined from the file names. """ + table_str = str(table) if isinstance(table, SyncTable) else table try: - table_files = self.existing_files().get(table) + table_files = self.existing_files().get(table_str) return max(file_obj.block for file_obj in table_files if file_obj.block) except ValueError as err: # Raised when table_files = [] @@ -260,7 +235,7 @@ def last_sync_block(self, table: str) -> int: f"Could not determine last sync block for {table} files. No files." ) from err - def delete_all(self, table: str) -> None: + def delete_all(self, table: SyncTable | str) -> None: """Deletes all files within the supported tables directory""" log.info(f"Emptying Bucket {table}") try: @@ -270,4 +245,6 @@ def delete_all(self, table: str) -> None: log.info(f"Deleting file {file_data.object_key}") self.delete_file(file_data.object_key) except KeyError as err: - raise ValueError(f"invalid table name {table}") from err + raise ValueError( + f"Invalid table_name {table}, please chose from {SyncTable.supported_tables()}" + ) from err diff --git a/src/record_handler.py b/src/record_handler.py deleted file mode 100644 index 718e6252..00000000 --- a/src/record_handler.py +++ /dev/null @@ -1,107 +0,0 @@ -""" -Abstraction for New Content Handling -provides a framework for writing new content to disk and posting to AWS -""" -import sys -from typing import Any - -from s3transfer import S3UploadFailedError -from src.aws import AWSClient - -from src.logger import set_log - -log = set_log(__name__) - - -def last_sync_block(aws: AWSClient, table: str, genesis_block: int = 0) -> int: - """Attempts to get last sync block from AWS Bucket files, otherwise uses genesis""" - try: - block_from = aws.last_sync_block(table) - except FileNotFoundError: - log.warning( - f"last sync could not be evaluated from AWS, using genesis block {genesis_block}" - ) - block_from = genesis_block - - return block_from - - -class RecordHandler: - - """ - This class is responsible for consuming new dune records and missing values from previous runs - it attempts to fetch content for them and filters them into "found" and "not found" as necessary - """ - - def __init__( - self, - file_index: int, - file_prefix: str, - table: str, - data_set: list[dict[str, Any]], - ): - self.file_index = file_index - self.file_prefix = file_prefix - self.table = table - self.data_set = data_set - - def num_records(self) -> int: - """Returns number of records to handle""" - return len(self.data_set) - - @property - def content_filename(self) -> str: - """returns filename""" - return f"{self.file_prefix}_{self.file_index}.json" - - @property - def object_key(self) -> str: - """returns object key""" - return f"{self.table}/{self.content_filename}" - - def _aws_login_and_upload( - self, aws: AWSClient, data_set: list[dict[str, Any]] - ) -> bool: - """Creates AWS client session and attempts to upload file""" - try: - return aws.put_object( - data_set, - object_key=self.object_key, - ) - except S3UploadFailedError as err: - log.error(err) - sys.exit(1) - - def write_and_upload_content(self, aws: AWSClient, dry_run: bool) -> None: - """ - - Writes record handlers content to persistent volume, - - attempts to upload to AWS and - - records last sync block on volume. - When dryrun flag is enabled, does not upload to IPFS. - """ - count = self.num_records() - if count > 0: - log.info( - f"posting {count} new {self.table} records for file index {self.file_index}" - ) - if dry_run: - log.info("DRY-RUN-ENABLED: new records not posted to AWS.") - else: - try: - aws.put_object( - data_set=self.data_set, - object_key=self.object_key, - ) - log.info( - f"{self.table} sync for file index {self.file_index} complete: " - f"synced {count} records" - ) - return - except S3UploadFailedError as err: - log.error(err) - sys.exit(1) - - else: - log.info( - f"No new {self.table} for file index {self.file_index}: no sync necessary" - ) diff --git a/src/scripts/__init__.py b/src/scripts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/scripts/download_file.py b/src/scripts/download_file.py new file mode 100644 index 00000000..66100521 --- /dev/null +++ b/src/scripts/download_file.py @@ -0,0 +1,26 @@ +"""Downloads file from AWS to PWD""" +import argparse + +from dotenv import load_dotenv + +from src.post.aws import AWSClient + + +def download_file(aws: AWSClient, object_key: str) -> None: + """Download file (`object_key`) from AWS""" + aws.download_file( + filename=object_key.split("/")[1], + object_key=object_key, + ) + + +if __name__ == "__main__": + load_dotenv() + parser = argparse.ArgumentParser("Download File from Bucket") + parser.add_argument( + "filename", + type=str, + required=True, + ) + args, _ = parser.parse_known_args() + download_file(aws=AWSClient.new_from_environment(), object_key=args.object_key) diff --git a/src/scripts/empty_bucket.py b/src/scripts/empty_bucket.py new file mode 100644 index 00000000..b36a4b56 --- /dev/null +++ b/src/scripts/empty_bucket.py @@ -0,0 +1,33 @@ +""" +Script to empty AWS bucket. +Used for re-deployments involving schema change. +""" +import os +import shutil +from pathlib import Path + +from dotenv import load_dotenv + +from src.main import ScriptArgs +from src.models.tables import SyncTable +from src.post.aws import AWSClient + + +def empty_bucket(table: SyncTable, aws: AWSClient, volume_path: Path) -> None: + """ + Empties the bucket for `table` + and deletes backup from mounted volume + """ + # Drop Data from AWS bucket + aws.delete_all(table) + # drop backup data from volume path + shutil.rmtree(volume_path / str(table)) + + +if __name__ == "__main__": + load_dotenv() + empty_bucket( + table=ScriptArgs().sync_table, + aws=AWSClient.new_from_environment(), + volume_path=Path(os.environ["VOLUME_PATH"]), + ) diff --git a/src/sql/app_hash_latest_block.sql b/src/sql/app_hash_latest_block.sql new file mode 100644 index 00000000..571fce55 --- /dev/null +++ b/src/sql/app_hash_latest_block.sql @@ -0,0 +1,4 @@ +-- https://dune.com/queries/1615490 +select + max(call_block_number) as latest_block +from gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle \ No newline at end of file diff --git a/src/sql/app_hashes.sql b/src/sql/app_hashes.sql new file mode 100644 index 00000000..daef7452 --- /dev/null +++ b/src/sql/app_hashes.sql @@ -0,0 +1,21 @@ +-- App Hashes: https://dune.com/queries/1610025 +-- MIN(first_block_seen) = 12153263 +-- Nov 16, 2022: Query takes 4 seconds to run for on full block range +with +app_hashes as ( + select + min(call_block_number) first_seen_block, + get_json_object(trade, '$.appData') as app_hash + from gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle + lateral view explode(trades) as trade + group by app_hash +) +select + app_hash, + first_seen_block +from app_hashes +where first_seen_block > '{{BlockFrom}}' +and first_seen_block <= '{{BlockTo}}' + +-- For some additional stats, +-- on this data see https://dune.com/queries/1608286 \ No newline at end of file diff --git a/src/sql/orderbook/batch_rewards.sql b/src/sql/orderbook/batch_rewards.sql new file mode 100644 index 00000000..affa000b --- /dev/null +++ b/src/sql/orderbook/batch_rewards.sql @@ -0,0 +1,97 @@ +WITH observed_settlements AS (SELECT + -- settlement + tx_hash, + solver, + s.block_number, + -- settlement_observations + effective_gas_price * gas_used AS execution_cost, + surplus, + fee, + -- auction_transaction + at.auction_id + FROM settlement_observations so + JOIN settlements s + ON s.block_number = so.block_number + AND s.log_index = so.log_index + JOIN auction_transaction at + ON s.tx_from = at.tx_from + AND s.tx_nonce = at.tx_nonce + JOIN settlement_scores ss + ON at.auction_id = ss.auction_id + WHERE ss.block_deadline > {{start_block}} + AND ss.block_deadline <= {{end_block}}), + + auction_participation as (SELECT ss.auction_id, + array_agg( + concat('0x', encode(participant, 'hex')) ORDER BY participant + ) as participating_solvers + FROM auction_participants + JOIN settlement_scores ss + ON auction_participants.auction_id = ss.auction_id + WHERE block_deadline > {{start_block}} + AND block_deadline <= {{end_block}} + GROUP BY ss.auction_id), + reward_data AS (SELECT + -- observations + tx_hash, + ss.auction_id, + -- TODO - Assuming that `solver == winner` when both not null + -- We will need to monitor that `solver == winner`! + coalesce(solver, winner) as solver, + block_number as settlement_block, + block_deadline, + case + when block_number is not null and block_number > block_deadline then 0 + else coalesce(execution_cost, 0) end as execution_cost, + case + when block_number is not null and block_number > block_deadline then 0 + else coalesce(surplus, 0) end as surplus, + case + when block_number is not null and block_number > block_deadline then 0 + else coalesce(fee, 0) end as fee, + -- scores + winning_score, + reference_score, + -- auction_participation + participating_solvers + FROM settlement_scores ss + -- If there are reported scores, + -- there will always be a record of auction participants + JOIN auction_participation ap + ON ss.auction_id = ap.auction_id + -- outer joins made in order to capture non-existent settlements. + LEFT OUTER JOIN observed_settlements os + ON os.auction_id = ss.auction_id), + reward_per_auction as (SELECT tx_hash, + settlement_block, + block_deadline, + solver, + execution_cost, + surplus, + fee, + surplus + fee - reference_score as uncapped_payment_eth, + -- Uncapped Reward = CLAMP_[-E, E + exec_cost](uncapped_payment_eth) + LEAST(GREATEST(-10000000000000000, surplus + fee - reference_score), + 10000000000000000 + execution_cost) as capped_payment, + winning_score, + reference_score, + participating_solvers + FROM reward_data) + + +SELECT settlement_block as block_number, + block_deadline, + case + when tx_hash is NULL then NULL + else concat('0x', encode(tx_hash, 'hex')) + end as tx_hash, + concat('0x', encode(solver, 'hex')) as solver, + execution_cost::text as execution_cost, + surplus::text as surplus, + fee::text as fee, + uncapped_payment_eth::text as uncapped_payment_eth, + capped_payment::text as capped_payment, + winning_score::text as winning_score, + reference_score::text as reference_score, + participating_solvers +FROM reward_per_auction diff --git a/src/sql/orderbook/latest_block.sql b/src/sql/orderbook/latest_block.sql new file mode 100644 index 00000000..acc3a5d4 --- /dev/null +++ b/src/sql/orderbook/latest_block.sql @@ -0,0 +1,3 @@ +select min(block_number) latest +from settlements +where tx_from is null; \ No newline at end of file diff --git a/src/sql/orderbook/order_rewards.sql b/src/sql/orderbook/order_rewards.sql new file mode 100644 index 00000000..d80666f5 --- /dev/null +++ b/src/sql/orderbook/order_rewards.sql @@ -0,0 +1,33 @@ +with trade_hashes as (SELECT solver, + block_number, + order_uid, + fee_amount, + settlement.tx_hash, + auction_id + FROM trades t + LEFT OUTER JOIN LATERAL ( + SELECT tx_hash, solver, tx_nonce, tx_from + FROM settlements s + WHERE s.block_number = t.block_number + AND s.log_index > t.log_index + ORDER BY s.log_index ASC + LIMIT 1 + ) AS settlement ON true + join auction_transaction + -- This join also eliminates overlapping + -- trades & settlements between barn and prod DB + on settlement.tx_from = auction_transaction.tx_from + and settlement.tx_nonce = auction_transaction.tx_nonce + where block_number > {{start_block}} and block_number <= {{end_block}}) + +-- Most efficient column order for sorting would be having tx_hash or order_uid first +select block_number, + concat('0x', encode(trade_hashes.order_uid, 'hex')) as order_uid, + concat('0x', encode(solver, 'hex')) as solver, + concat('0x', encode(tx_hash, 'hex')) as tx_hash, + coalesce(surplus_fee, 0)::text as surplus_fee, + coalesce(reward, 0.0) as amount +from trade_hashes + left outer join order_execution o + on trade_hashes.order_uid = o.order_uid + and trade_hashes.auction_id = o.auction_id; diff --git a/src/sql/warehouse/latest_block.sql b/src/sql/warehouse/latest_block.sql new file mode 100644 index 00000000..609534b6 --- /dev/null +++ b/src/sql/warehouse/latest_block.sql @@ -0,0 +1,2 @@ +select max(block_number) as latest +from settlements; diff --git a/src/sql/warehouse/token_imbalances.sql b/src/sql/warehouse/token_imbalances.sql new file mode 100644 index 00000000..0b81f9ba --- /dev/null +++ b/src/sql/warehouse/token_imbalances.sql @@ -0,0 +1,8 @@ +select block_number, + concat('0x', encode(s.tx_hash, 'hex')) as tx_hash, + concat('0x', encode(token, 'hex')) as token, + amount::text as amount +from internalized_imbalances + join settlements s + on internalized_imbalances.tx_hash = s.tx_hash +where block_number > {{start_block}} and block_number <= {{end_block}}; diff --git a/src/sync/__init__.py b/src/sync/__init__.py new file mode 100644 index 00000000..37b723b3 --- /dev/null +++ b/src/sync/__init__.py @@ -0,0 +1,2 @@ +"""Re-exported sync methods.""" +from .app_data import sync_app_data diff --git a/src/sync/app_data.py b/src/sync/app_data.py new file mode 100644 index 00000000..8f5f143d --- /dev/null +++ b/src/sync/app_data.py @@ -0,0 +1,154 @@ +"""Main Entry point for app_hash sync""" + +from dune_client.file.interface import FileIO +from dune_client.types import DuneRecord + +from src.fetch.dune import DuneFetcher +from src.fetch.ipfs import Cid +from src.logger import set_log +from src.models.app_data_content import FoundContent, NotFoundContent +from src.models.block_range import BlockRange +from src.models.tables import SyncTable +from src.post.aws import AWSClient +from src.sync.common import last_sync_block +from src.sync.config import SyncConfig, AppDataSyncConfig +from src.sync.record_handler import RecordHandler +from src.sync.upload_handler import UploadHandler + +log = set_log(__name__) + + +SYNC_TABLE = SyncTable.APP_DATA + + +class AppDataHandler(RecordHandler): # pylint:disable=too-many-instance-attributes + """ + This class is responsible for consuming new dune records and missing values from previous runs + it attempts to fetch content for them and filters them into "found" and "not found" as necessary + """ + + def __init__( # pylint:disable=too-many-arguments + self, + file_manager: FileIO, + new_rows: list[DuneRecord], + block_range: BlockRange, + config: SyncConfig, + ipfs_access_key: str, + missing_file_name: str, + ): + super().__init__(block_range, SYNC_TABLE, config) + self.file_manager = file_manager + self.ipfs_access_key = ipfs_access_key + + self._found: list[FoundContent] = [] + self._not_found: list[NotFoundContent] = [] + + self.new_rows = new_rows + self.missing_file_name = missing_file_name + try: + self.missing_values = self.file_manager.load_ndjson(missing_file_name) + except FileNotFoundError: + self.missing_values = [] + + def num_records(self) -> int: + assert len(self.new_rows) == 0, ( + "this function call is not allowed until self.new_rows have been processed! " + "call fetch_content_and_filter first" + ) + return len(self._found) + + async def _handle_new_records(self, max_retries: int) -> None: + # Drain the dune_results into "found" and "not found" categories + self._found, self._not_found = await Cid.fetch_many( + self.new_rows, self.ipfs_access_key, max_retries + ) + + async def _handle_missing_records( + self, max_retries: int, give_up_threshold: int + ) -> None: + found, not_found = await Cid.fetch_many( + self.missing_values, self.ipfs_access_key, max_retries + ) + while found: + self._found.append(found.pop()) + while not_found: + row = not_found.pop() + app_hash, attempts = row.app_hash, row.attempts + if attempts > give_up_threshold: + log.debug( + f"No content found after {attempts} attempts for {app_hash} assuming NULL." + ) + self._found.append( + FoundContent( + app_hash=app_hash, + first_seen_block=row.first_seen_block, + content={}, + ) + ) + else: + self._not_found.append(row) + + def write_found_content(self) -> None: + assert len(self.new_rows) == 0, "Must call _handle_new_records first!" + self.file_manager.write_ndjson( + data=[x.as_dune_record() for x in self._found], name=self.content_filename + ) + # When not_found is empty, we want to overwrite the file (hence skip_empty=False) + # This happens when number of attempts exceeds GIVE_UP_THRESHOLD + self.file_manager.write_ndjson( + data=[x.as_dune_record() for x in self._not_found], + name=self.missing_file_name, + skip_empty=False, + ) + + def write_sync_data(self) -> None: + # Only write these if upload was successful. + self.file_manager.write_csv( + data=[{self.config.sync_column: str(self.block_range.block_to)}], + name=self.config.sync_file, + ) + + async def fetch_content_and_filter( + self, max_retries: int, give_up_threshold: int + ) -> None: + """ + Run loop fetching app_data for hashes, + separates into (found and not found), returning the pair. + """ + await self._handle_new_records(max_retries) + log.info( + f"Attempting to recover missing {len(self.missing_values)} records from previous run" + ) + await self._handle_missing_records(max_retries, give_up_threshold) + + +async def sync_app_data( + aws: AWSClient, + dune: DuneFetcher, + config: AppDataSyncConfig, + ipfs_access_key: str, + dry_run: bool, +) -> None: + """App Data Sync Logic""" + block_range = BlockRange( + block_from=last_sync_block( + aws, + table=SYNC_TABLE, + genesis_block=12153262, # First App Hash Block + ), + block_to=await dune.latest_app_hash_block(), + ) + + data_handler = AppDataHandler( + file_manager=FileIO(config.volume_path / str(SYNC_TABLE)), + new_rows=await dune.get_app_hashes(block_range), + block_range=block_range, + config=config, + ipfs_access_key=ipfs_access_key, + missing_file_name=config.missing_files_name, + ) + await data_handler.fetch_content_and_filter( + max_retries=config.max_retries, give_up_threshold=config.give_up_threshold + ) + UploadHandler(aws, data_handler, table=SYNC_TABLE).write_and_upload_content(dry_run) + log.info("app_data sync run completed successfully") diff --git a/src/sync/common.py b/src/sync/common.py new file mode 100644 index 00000000..c3a11195 --- /dev/null +++ b/src/sync/common.py @@ -0,0 +1,20 @@ +"""Shared methods between both sync scripts.""" + +from src.logger import set_log +from src.models.tables import SyncTable +from src.post.aws import AWSClient + +log = set_log(__name__) + + +def last_sync_block(aws: AWSClient, table: SyncTable, genesis_block: int = 0) -> int: + """Attempts to get last sync block from AWS Bucket files, otherwise uses genesis""" + try: + block_from = aws.last_sync_block(table) + except FileNotFoundError: + log.warning( + f"last sync could not be evaluated from AWS, using genesis block {genesis_block}" + ) + block_from = genesis_block + + return block_from diff --git a/src/sync/config.py b/src/sync/config.py new file mode 100644 index 00000000..bbd663c6 --- /dev/null +++ b/src/sync/config.py @@ -0,0 +1,30 @@ +"""Configuration details for sync jobs""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class SyncConfig: + """ + This data class contains all the credentials and volume paths + required to sync with both a persistent volume and Dune's S3 Buckets. + """ + + volume_path: Path + # File System + sync_file: str = "sync_block.csv" + sync_column: str = "last_synced_block" + + +@dataclass +class AppDataSyncConfig(SyncConfig): + """Additional data field for app data sync.""" + + # Maximum number of retries on a single run + max_retries: int = 3 + # Total number of accumulated attempts before we assume no content + give_up_threshold: int = 100 + # Persisted file where we store the missing results and number of attempts. + missing_files_name: str = "missing_app_hashes.json" diff --git a/src/sync/order_rewards.py b/src/sync/order_rewards.py new file mode 100644 index 00000000..5c2e2596 --- /dev/null +++ b/src/sync/order_rewards.py @@ -0,0 +1,125 @@ +"""Main Entry point for app_hash sync""" +from typing import Any + +from dune_client.file.interface import FileIO + +from src.fetch.orderbook import OrderbookFetcher +from src.logger import set_log +from src.models.batch_rewards_schema import BatchRewards +from src.models.block_range import BlockRange +from src.models.order_rewards_schema import OrderRewards +from src.models.tables import SyncTable +from src.post.aws import AWSClient +from src.sync.common import last_sync_block +from src.sync.config import SyncConfig +from src.sync.record_handler import RecordHandler +from src.sync.upload_handler import UploadHandler + +log = set_log(__name__) + + +class OrderbookDataHandler( + RecordHandler +): # pylint:disable=too-few-public-methods,too-many-arguments + """ + This class is responsible for consuming new dune records and missing values from previous runs + it attempts to fetch content for them and filters them into "found" and "not found" as necessary + """ + + def __init__( + self, + file_manager: FileIO, + block_range: BlockRange, + sync_table: SyncTable, + config: SyncConfig, + data_list: list[dict[str, Any]], + ): + super().__init__(block_range, sync_table, config) + log.info(f"Handling {len(data_list)} new records") + self.file_manager = file_manager + self.data_list = data_list + + def num_records(self) -> int: + return len(self.data_list) + + def write_found_content(self) -> None: + self.file_manager.write_ndjson(data=self.data_list, name=self.content_filename) + + def write_sync_data(self) -> None: + # Only write these if upload was successful. + self.file_manager.write_csv( + data=[{self.config.sync_column: str(self.block_range.block_to)}], + name=self.config.sync_file, + ) + + +def sync_order_rewards( + aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool +) -> None: + """Order Rewards Data Sync Logic""" + sync_table = SyncTable.ORDER_REWARDS + block_range = BlockRange( + block_from=last_sync_block( + aws, + table=sync_table, + genesis_block=15719994, # First Recorded Order Reward block + ), + block_to=fetcher.get_latest_block(), + ) + sync_orderbook_data( + aws, + block_range, + config, + dry_run, + sync_table=sync_table, + data_list=OrderRewards.from_pdf_to_dune_records( + fetcher.get_order_rewards(block_range) + ), + ) + + +def sync_batch_rewards( + aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool +) -> None: + """Batch Reward Sync Logic""" + sync_table = SyncTable.BATCH_REWARDS + block_range = BlockRange( + block_from=last_sync_block( + aws, + table=sync_table, + genesis_block=16862919, # First Recorded Batch Reward block + ), + block_to=fetcher.get_latest_block(), + ) + sync_orderbook_data( + aws, + block_range, + config, + dry_run, + sync_table, + data_list=BatchRewards.from_pdf_to_dune_records( + fetcher.get_batch_rewards(block_range) + ), + ) + + +def sync_orderbook_data( # pylint:disable=too-many-arguments + aws: AWSClient, + block_range: BlockRange, + config: SyncConfig, + dry_run: bool, + sync_table: SyncTable, + data_list: list[dict[str, Any]], +) -> None: + """Generic Orderbook Sync Logic""" + record_handler = OrderbookDataHandler( + file_manager=FileIO(config.volume_path / str(sync_table)), + block_range=block_range, + config=config, + data_list=data_list, + sync_table=sync_table, + ) + UploadHandler(aws, record_handler, table=sync_table).write_and_upload_content( + dry_run + ) + log.info(f"{sync_table} sync run completed successfully") diff --git a/src/sync/record_handler.py b/src/sync/record_handler.py new file mode 100644 index 00000000..cca9e573 --- /dev/null +++ b/src/sync/record_handler.py @@ -0,0 +1,45 @@ +""" +Abstraction for New Content Handling +provides a framework for writing new content to disk and posting to AWS +""" +from abc import ABC, abstractmethod + +from src.logger import set_log +from src.models.block_range import BlockRange +from src.models.tables import SyncTable +from src.sync.config import SyncConfig + +log = set_log(__name__) + + +class RecordHandler(ABC): # pylint: disable=too-few-public-methods + + """ + This class is responsible for consuming new dune records and missing values from previous runs + it attempts to fetch content for them and filters them into "found" and "not found" as necessary + """ + + def __init__( + self, + block_range: BlockRange, + table: SyncTable, + config: SyncConfig, + ): + self.config = config + self.block_range = block_range + + self.name = str(table) + self.file_path = config.volume_path / self.name + self.content_filename = f"cow_{block_range.block_to}.json" + + @abstractmethod + def num_records(self) -> int: + """Returns number of records to handle""" + + @abstractmethod + def write_found_content(self) -> None: + """Writes content to disk""" + + @abstractmethod + def write_sync_data(self) -> None: + """Records last synced content file""" diff --git a/src/sync/token_imbalance.py b/src/sync/token_imbalance.py new file mode 100644 index 00000000..fe0ca4f3 --- /dev/null +++ b/src/sync/token_imbalance.py @@ -0,0 +1,47 @@ +"""Main Entry point for token_imbalance sync""" +from dune_client.file.interface import FileIO + +from src.fetch.postgres import PostgresFetcher +from src.logger import set_log +from src.models.block_range import BlockRange +from src.models.tables import SyncTable +from src.models.token_imbalance_schema import TokenImbalance +from src.post.aws import AWSClient +from src.sync.common import last_sync_block +from src.sync.config import SyncConfig +from src.sync.order_rewards import OrderbookDataHandler +from src.sync.upload_handler import UploadHandler + +log = set_log(__name__) + + +def sync_internal_imbalance( + aws: AWSClient, fetcher: PostgresFetcher, config: SyncConfig, dry_run: bool +) -> None: + """Token Imbalance Sync Logic""" + sync_table = SyncTable.INTERNAL_IMBALANCE + block_range = BlockRange( + block_from=last_sync_block( + aws, + table=sync_table, + # The first block for which solver competitions + # are available in production orderbook: + # select * from solver_competitions where id = 1; + genesis_block=15173540, + ), + block_to=fetcher.get_latest_block(), + ) + # TODO - Gap Detection (find missing txHashes and ensure they are accounted for!) + record_handler = OrderbookDataHandler( + file_manager=FileIO(config.volume_path / str(sync_table)), + block_range=block_range, + config=config, + data_list=TokenImbalance.from_pdf_to_dune_records( + fetcher.get_internal_imbalances(block_range) + ), + sync_table=sync_table, + ) + UploadHandler(aws, record_handler, table=sync_table).write_and_upload_content( + dry_run + ) + log.info(f"{sync_table} sync run completed successfully") diff --git a/src/sync/upload_handler.py b/src/sync/upload_handler.py new file mode 100644 index 00000000..498d042d --- /dev/null +++ b/src/sync/upload_handler.py @@ -0,0 +1,76 @@ +"""Upload handler responsible for local file updates and aws uploads""" +import os +import sys + +from s3transfer import S3UploadFailedError + +from src.logger import set_log +from src.models.tables import SyncTable +from src.post.aws import AWSClient +from src.sync.record_handler import RecordHandler + +log = set_log(__name__) + + +class UploadHandler: # pylint: disable=too-few-public-methods + """ + Given an instance of Record handler, ensures that + - files are written locally, + - uploaded to AWS and + - sync block is recorded + in the appropriate order. + """ + + def __init__(self, aws: AWSClient, record_handler: RecordHandler, table: SyncTable): + self.aws = aws + self.record_handler = record_handler + self.table = str(table) + + def _aws_login_and_upload(self) -> bool: + """Creates AWS client session and attempts to upload file""" + path, filename = ( + self.record_handler.file_path, + self.record_handler.content_filename, + ) + try: + return self.aws.upload_file( + filename=os.path.join(path, filename), + object_key=f"{self.table}/{filename}", + ) + except S3UploadFailedError as err: + log.error(err) + sys.exit(1) + + def write_and_upload_content(self, dry_run: bool) -> None: + """ + - Writes record handlers content to persistent volume, + - attempts to upload to AWS and + - records last sync block on volume. + When dryrun flag is enabled, does not upload to IPFS. + """ + record_handler = self.record_handler + num_records, block_range, name = ( + record_handler.num_records(), + record_handler.block_range, + record_handler.name, + ) + + record_handler.write_found_content() + if num_records > 0: + log.info( + f"attempting to post {num_records} new {name} records for block range {block_range}" + ) + if dry_run: + log.info( + "DRY-RUN-ENABLED: New records written to volume, but not posted to AWS." + ) + else: + self._aws_login_and_upload() + log.info( + f"{name} sync for block range {block_range} complete: " + f"synced {num_records} records" + ) + else: + log.info(f"No new {name} for block range {block_range}: no sync necessary") + + record_handler.write_sync_data() diff --git a/src/text_io.py b/src/text_io.py deleted file mode 100644 index 0f602e63..00000000 --- a/src/text_io.py +++ /dev/null @@ -1,76 +0,0 @@ -"""This is taken from https://hakibenita.com/fast-load-data-python-postgresql""" -# pylint: skip-file -import io -from typing import Iterator, Optional - - -class StringIteratorIO(io.TextIOBase): - def __init__(self, iter: Iterator[str]): - self._iter = iter - self._buff = "" - - def readable(self) -> bool: - return True - - def _read1(self, n: Optional[int] = None) -> str: - while not self._buff: - try: - self._buff = next(self._iter) - except StopIteration: - break - ret = self._buff[:n] - self._buff = self._buff[len(ret) :] - return ret - - def read(self, n: Optional[int] = None) -> str: - line = [] - if n is None or n < 0: - while True: - m = self._read1() - if not m: - break - line.append(m) - else: - while n > 0: - m = self._read1(n) - if not m: - break - n -= len(m) - line.append(m) - return "".join(line) - - -class BytesIteratorIO(io.BufferedIOBase): - def __init__(self, iter: Iterator[bytes]): - self._iter = iter - self._buff = b"" - - def readable(self) -> bool: - return True - - def _read1(self, n: Optional[int] = None) -> bytes: - while not self._buff: - try: - self._buff = next(self._iter) - except StopIteration: - break - ret = self._buff[:n] - self._buff = self._buff[len(ret) :] - return ret - - def read(self, n: Optional[int] = None) -> bytes: - line = [] - if n is None or n < 0: - while True: - m = self._read1() - if not m: - break - line.append(m) - else: - while n > 0: - m = self._read1(n) - if not m: - break - n -= len(m) - line.append(m) - return b"".join(line) diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 00000000..a601e8c9 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,15 @@ +"""Basic reusable utility functions""" +import os + +from src.environment import QUERY_PATH + + +def open_query(filename: str) -> str: + """Opens `filename` and returns as string""" + with open(query_file(filename), "r", encoding="utf-8") as file: + return file.read() + + +def query_file(filename: str) -> str: + """Returns proper path for filename in QUERY_PATH""" + return os.path.join(QUERY_PATH, filename) diff --git a/tests/e2e/test_sync_app_data.py b/tests/e2e/test_sync_app_data.py new file mode 100644 index 00000000..e6aab234 --- /dev/null +++ b/tests/e2e/test_sync_app_data.py @@ -0,0 +1,80 @@ +import os +import shutil +import unittest +from pathlib import Path +from unittest import IsolatedAsyncioTestCase + +from dotenv import load_dotenv +from dune_client.file.interface import FileIO + +from src.fetch.dune import DuneFetcher +from src.models.block_range import BlockRange +from src.sync.app_data import AppDataHandler, SYNC_TABLE +from src.sync.config import AppDataSyncConfig + + +class TestSyncAppData(IsolatedAsyncioTestCase): + def setUp(self) -> None: + load_dotenv() + self.dune = DuneFetcher(os.environ["DUNE_API_KEY"]) + self.config = AppDataSyncConfig( + volume_path=Path(os.environ["VOLUME_PATH"]), + missing_files_name="missing_app_hashes.json", + max_retries=2, + give_up_threshold=3, + ) + self.file_manager = FileIO(self.config.volume_path / str(SYNC_TABLE)) + + def tearDown(self) -> None: + shutil.rmtree(self.config.volume_path) + + async def test_fetch_content_and_filter(self): + retries = self.config.max_retries + give_up = self.config.give_up_threshold + missing_files = self.config.missing_files_name + # block numbers + a, b, c = 15582187, 16082187, 16100000 + self.assertTrue(a < b < c) + + block_range_1 = BlockRange( + block_from=a, + block_to=b, + ) + data_handler = AppDataHandler( + file_manager=self.file_manager, + new_rows=await self.dune.get_app_hashes(block_range_1), + block_range=block_range_1, + config=self.config, + missing_file_name=missing_files, + ipfs_access_key=os.environ["IPFS_ACCESS_KEY"], + ) + + print(f"Beginning Content Fetching on {len(data_handler.new_rows)} records") + await data_handler.fetch_content_and_filter(retries, give_up) + data_handler.write_found_content() + self.assertEqual(0, len(data_handler.new_rows)) + + block_range_2 = BlockRange( + block_from=b, + block_to=c, + ) + data_handler = AppDataHandler( + file_manager=self.file_manager, + new_rows=await self.dune.get_app_hashes(block_range_2), + block_range=block_range_2, + config=self.config, + missing_file_name=missing_files, + ) + print( + f"Beginning Second Run Content Fetching on {len(data_handler.new_rows)} records" + ) + await data_handler.fetch_content_and_filter(retries, give_up) + data_handler.write_found_content() + + self.assertEqual(0, len(data_handler.new_rows)) + # Two runs with retries = 2 and give_up = 3 implies no more missing records. + self.assertEqual(0, len(data_handler._not_found)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/integration/test_aws.py b/tests/integration/test_aws.py index 222d09f8..c4598902 100644 --- a/tests/integration/test_aws.py +++ b/tests/integration/test_aws.py @@ -6,7 +6,7 @@ import pytest from dotenv import load_dotenv -from src.aws import AWSClient, BucketFileObject +from src.post.aws import AWSClient, BucketFileObject class TestAWSConnection(unittest.TestCase): diff --git a/tests/integration/test_fetch_orderbook.py b/tests/integration/test_fetch_orderbook.py new file mode 100644 index 00000000..e4be7fd6 --- /dev/null +++ b/tests/integration/test_fetch_orderbook.py @@ -0,0 +1,130 @@ +import unittest + +import pandas as pd + +from src.fetch.orderbook import OrderbookFetcher +from src.models.block_range import BlockRange + + +class TestFetchOrderbook(unittest.TestCase): + def test_latest_block_reasonable(self): + self.assertGreater(OrderbookFetcher.get_latest_block(), 16020300) + + def test_latest_block_increasing(self): + latest_block = OrderbookFetcher.get_latest_block() + self.assertGreaterEqual(OrderbookFetcher.get_latest_block(), latest_block) + + def test_get_order_rewards(self): + block_number = 16000000 + block_range = BlockRange(block_number, block_number + 50) + rewards_df = OrderbookFetcher.get_order_rewards(block_range) + expected = pd.DataFrame( + { + "block_number": [16000018, 16000050], + "order_uid": [ + "0xb52fecfe3df73f0e93f1f9b27c92e3def50322960f9c62d0b97bc2ceee36c07a0639dda84198dc06f5bc91bddbb62cd2e38c2f9a6378140f", + "0xf61cba0b42ed3e956f9db049c0523e123967723c5bcf76ccac0b179a66305b2a7fee439ed7a6bb1b8e7ca1ffdb0a5ca8d993c030637815ad", + ], + "solver": [ + "0x3cee8c7d9b5c8f225a8c36e7d3514e1860309651", + "0xc9ec550bea1c64d779124b23a26292cc223327b6", + ], + "tx_hash": [ + "0xb6f7df8a1114129f7b61f2863b3f81b3620e95f73e5b769a62bb7a87ab6983f4", + "0x2ce77009e78c291cdf39eb6f8ddf7e2c3401b4f962ef1240bdac47e632f8eb7f", + ], + "surplus_fee": ["0", "0"], + "amount": [40.70410, 39.00522], + } + ) + + self.assertIsNone(pd.testing.assert_frame_equal(expected, rewards_df)) + + def test_get_batch_rewards(self): + block_number = 16846500 + block_range = BlockRange(block_number, block_number + 25) + rewards_df = OrderbookFetcher.get_batch_rewards(block_range) + expected = pd.DataFrame( + { + "block_number": pd.Series([16846495, 16846502, pd.NA], dtype="Int64"), + "block_deadline": [16846509, 16846516, 16846524], + "tx_hash": [ + "0x2189c2994dcffcd40cc92245e216b0fda42e0f30573ce4b131341e8ac776ed75", + "0x8328fa642f47adb61f751363cf718d707dafcdc258898fa953945afd42aa020f", + None, + ], + "solver": [ + "0xb20b86c4e6deeb432a22d773a221898bbbd03036", + "0x55a37a2e5e5973510ac9d9c723aec213fa161919", + "0x55a37a2e5e5973510ac9d9c723aec213fa161919", + ], + "execution_cost": [ + "5417013431615490", + "14681404168612460", + "0", + ], + "surplus": [ + "5867838023808109", + "104011002982952097", + "0", + ], + "fee": [ + "7751978767036064", + "10350680045815651", + "0", + ], + "uncapped_payment_eth": [ + "7232682540629268", + "82825156151734420", + "-3527106002507021", + ], + "capped_payment": [ + "7232682540629268", + "24681404168612460", + "-3527106002507021", + ], + "winning_score": [ + "6537976145828389", + "95640781782532198", + "3527282436747751", + ], + "reference_score": [ + "6387134250214905", + "31536526877033328", + "3527106002507021", + ], + "participating_solvers": [ + [ + "0x398890be7c4fac5d766e1aeffde44b2ee99f38ef", + "0xb20b86c4e6deeb432a22d773a221898bbbd03036", + ], + [ + "0x55a37a2e5e5973510ac9d9c723aec213fa161919", + "0x97ec0a17432d71a3234ef7173c6b48a2c0940896", + "0xa21740833858985e4d801533a808786d3647fb83", + "0xb20b86c4e6deeb432a22d773a221898bbbd03036", + "0xbff9a1b539516f9e20c7b621163e676949959a66", + "0xc9ec550bea1c64d779124b23a26292cc223327b6", + "0xda869be4adea17ad39e1dfece1bc92c02491504f", + ], + [ + "0x149d0f9282333681ee41d30589824b2798e9fb47", + "0x3cee8c7d9b5c8f225a8c36e7d3514e1860309651", + "0x55a37a2e5e5973510ac9d9c723aec213fa161919", + "0x7a0a8890d71a4834285efdc1d18bb3828e765c6a", + "0x97ec0a17432d71a3234ef7173c6b48a2c0940896", + "0xa21740833858985e4d801533a808786d3647fb83", + "0xb20b86c4e6deeb432a22d773a221898bbbd03036", + "0xbff9a1b539516f9e20c7b621163e676949959a66", + "0xc9ec550bea1c64d779124b23a26292cc223327b6", + "0xda869be4adea17ad39e1dfece1bc92c02491504f", + "0xe9ae2d792f981c53ea7f6493a17abf5b2a45a86b", + ], + ], + }, + ) + self.assertIsNone(pd.testing.assert_frame_equal(expected, rewards_df)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/integration/test_warehouse_fetcher.py b/tests/integration/test_warehouse_fetcher.py new file mode 100644 index 00000000..a832e2d5 --- /dev/null +++ b/tests/integration/test_warehouse_fetcher.py @@ -0,0 +1,42 @@ +import os +import unittest + +import pandas as pd +from dotenv import load_dotenv + +from src.fetch.postgres import PostgresFetcher +from src.models.block_range import BlockRange + + +class TestPostgresWarehouseFetching(unittest.TestCase): + def setUp(self) -> None: + load_dotenv() + # TODO - deploy test DB and populate with some records... + self.fetcher = PostgresFetcher(db_url=os.environ["WAREHOUSE_URL"]) + + def test_latest_block_reasonable(self): + self.assertGreater(self.fetcher.get_latest_block(), 17273090) + + def test_get_imbalances(self): + imbalance_df = self.fetcher.get_internal_imbalances( + BlockRange(17236982, 17236983) + ) + expected = pd.DataFrame( + { + "block_number": pd.Series([17236983, 17236983], dtype="int64"), + "tx_hash": [ + "0x9dc611149c7d6a936554b4be0e4fde455c015a9d5c81650af1433df5e904c791", + "0x9dc611149c7d6a936554b4be0e4fde455c015a9d5c81650af1433df5e904c791", + ], + "token": [ + "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", + "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + ], + "amount": ["-1438513324", "789652004205719637"], + }, + ) + self.assertIsNone(pd.testing.assert_frame_equal(expected, imbalance_df)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/test_batch_rewards_schema.py b/tests/unit/test_batch_rewards_schema.py new file mode 100644 index 00000000..cfdaaca1 --- /dev/null +++ b/tests/unit/test_batch_rewards_schema.py @@ -0,0 +1,101 @@ +import unittest + +import pandas +import pandas as pd + +from src.models.batch_rewards_schema import BatchRewards + +ONE_ETH = 1000000000000000000 + + +class TestModelBatchRewards(unittest.TestCase): + def test_order_rewards_transformation(self): + max_uint = 115792089237316195423570985008687907853269984665640564039457584007913129639936 + sample_df = pd.DataFrame( + { + "block_number": pd.Series([123, pandas.NA], dtype="Int64"), + "block_deadline": [789, 1011], + "tx_hash": [ + "0x71", + None, + ], + "solver": [ + "0x51", + "0x52", + ], + "execution_cost": [9999 * ONE_ETH, 1], + "surplus": [2 * ONE_ETH, 3 * ONE_ETH], + "fee": [ + 1000000000000000, + max_uint, + ], + "uncapped_payment_eth": [0, -10 * ONE_ETH], + "capped_payment": [-1000000000000000, -1000000000000000], + "winning_score": [123456 * ONE_ETH, 6789 * ONE_ETH], + "reference_score": [ONE_ETH, 2 * ONE_ETH], + "participating_solvers": [ + [ + "0x51", + "0x52", + "0x53", + ], + [ + "0x51", + "0x52", + "0x53", + "0x54", + "0x55", + "0x56", + ], + ], + } + ) + + self.assertEqual( + [ + { + "block_deadline": 789, + "block_number": 123, + "data": { + "capped_payment": -1000000000000000, + "execution_cost": 9999000000000000000000, + "fee": 1000000000000000, + "participating_solvers": ["0x51", "0x52", "0x53"], + "reference_score": 1000000000000000000, + "surplus": 2000000000000000000, + "uncapped_payment_eth": 0, + "winning_score": 123456000000000000000000, + }, + "solver": "0x51", + "tx_hash": "0x71", + }, + { + "block_deadline": 1011, + "block_number": None, + "data": { + "capped_payment": -1000000000000000, + "execution_cost": 1, + "fee": max_uint, + "participating_solvers": [ + "0x51", + "0x52", + "0x53", + "0x54", + "0x55", + "0x56", + ], + "reference_score": 2000000000000000000, + "surplus": 3000000000000000000, + "uncapped_payment_eth": -10000000000000000000, + "winning_score": 6789000000000000000000, + }, + "solver": "0x52", + "tx_hash": None, + }, + ], + BatchRewards.from_pdf_to_dune_records(sample_df), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_ipfs.py b/tests/unit/test_ipfs.py new file mode 100644 index 00000000..e26e2279 --- /dev/null +++ b/tests/unit/test_ipfs.py @@ -0,0 +1,215 @@ +import os +import unittest +from unittest import IsolatedAsyncioTestCase + +from dotenv import load_dotenv + +from src.fetch.ipfs import Cid + +load_dotenv() +ACCESS_KEY = os.environ["IPFS_ACCESS_KEY"] + + +class TestIPFS(unittest.TestCase): + def test_cid_parsing(self): + self.assertEqual( + "bafybeib5q5w6r7gxbfutjhes24y65mcif7ugm7hmub2vsk4hqueb2yylti", + str( + Cid.old_schema( + "0x3d876de8fcd70969349c92d731eeb0482fe8667ceca075592b8785081d630b9a" + ) + ), + ) + self.assertEqual( + "bafybeia747cvkwz7tqkp67da3ehrl4nfwena3jnr5cvainmcugzocbmnbq", + str( + Cid.old_schema( + "0x1FE7C5555B3F9C14FF7C60D90F15F1A5B11A0DA5B1E8AA043582A1B2E1058D0C" + ) + ), + ) + + def test_new_hash_to_cid(self): + self.assertEqual( + "bafkrwiek6tumtfzvo6yivqq5c7jtdkw6q3ar5pgfcjdujvrbzkbwl3eueq", + str( + Cid( + "0x8af4e8c9973577b08ac21d17d331aade86c11ebcc5124744d621ca8365ec9424" + ) + ), + ) + + def test_cid_constructor(self): + # works with or without 0x prefix: + hex_str = "0x3d876de8fcd70969349c92d731eeb0482fe8667ceca075592b8785081d630b9a" + self.assertEqual(Cid(hex_str), Cid(hex_str[2:])) + self.assertEqual(hex_str, Cid(hex_str).hex) + + def test_no_content(self): + null_cid = Cid( + "0000000000000000000000000000000000000000000000000000000000000000" + ) + + self.assertEqual(None, null_cid.get_content(ACCESS_KEY, max_retries=10)) + + def test_get_content(self): + self.assertEqual( + { + "version": "0.1.0", + "appCode": "CowSwap", + "metadata": { + "referrer": { + "version": "0.1.0", + "address": "0x424a46612794dbb8000194937834250Dc723fFa5", + } + }, + }, + Cid.old_schema( + "3d876de8fcd70969349c92d731eeb0482fe8667ceca075592b8785081d630b9a" + ).get_content(ACCESS_KEY, max_retries=10), + ) + + self.assertEqual( + { + "version": "1.0.0", + "appCode": "CowSwap", + "metadata": { + "referrer": { + "kind": "referrer", + "referrer": "0x8c35B7eE520277D14af5F6098835A584C337311b", + "version": "1.0.0", + } + }, + }, + Cid.old_schema( + "1FE7C5555B3F9C14FF7C60D90F15F1A5B11A0DA5B1E8AA043582A1B2E1058D0C" + ).get_content(ACCESS_KEY), + ) + + +class TestAsyncIPFS(IsolatedAsyncioTestCase): + async def test_fetch_many(self): + x = [ + { + "app_hash": "0xa0029a1376a317ea4af7d64c1a15cb02c5b1f33c72645cc8612a8d302a6e2ac8", + "first_seen_block": 13897827, + }, + { + "app_hash": "0xd8e0e541ebb486bfb4fbfeeaf097e56487b5b3ea7190bd1b286693ac02954ecd", + "first_seen_block": 15428678, + }, + { + "app_hash": "0xd93cc4feb701b8b7c8013c33be82f05d28f04c127116eb3764d36881e5cd7d07", + "first_seen_block": 13602894, + }, + { + "app_hash": "0x2c699c8c8b379a0ab6e6b1bc252dd2539b59b50056da1c62b2bcaf4f706d1e81", + "first_seen_block": 13643476, + }, + { + "app_hash": "0xe4096788536b002e3d0af9e3a4ac44cbd5456cbd3a88d96f02c6b23be319fc6b", + "first_seen_block": 15430776, + }, + { + "app_hash": "0xd7d476c0682b033fc4025f69dfb5967afe5ea96be872b1f6a740bbdc7dd97b25", + "first_seen_block": 13671622, + }, + { + "app_hash": "0x37a52cce8b0b5f4b2047b8d34992d27755c9af4341290d38e4cf9278e3b8fcc9", + "first_seen_block": 15104812, + }, + { + "app_hash": "0x6faca2b31493acf30239268b08ef6fa9a54ff093018c5c53a0847eb13ad8181f", + "first_seen_block": 15557705, + }, + { + "app_hash": "0x476e33b8975dd54ada4a99d52c9ea4b3d41bd50763377f6e078c60631d8bc02a", + "first_seen_block": 13783477, + }, + { + "app_hash": "0xd4f33cd977ef095b38aabb1ee7a2f6f69fabb4022601fdc29c9fc78c451f4e12", + "first_seen_block": 13824523, + }, + { + "app_hash": "0x8b98eaf7082a14b3201011c23a39b23706d880c1269e76c154675230daf6af8d", + "first_seen_block": 13709795, + }, + { + "app_hash": "0x5f3b188668cc36ab896f348913668550749c7b7f38304f45b9449cb5bea034b6", + "first_seen_block": 13608189, + }, + { + "app_hash": "0xec3143ebecec4ac0f5471ce611f8220bd0abe5509c29216c6837be29fe573830", + "first_seen_block": 13906769, + }, + { + "app_hash": "0x4c4d75807c15451613cead7de87e465fe9368708644fbad1e04bc442ee015466", + "first_seen_block": 13916779, + }, + { + "app_hash": "0x8ee92d0defae33a7471eb0e4abac92c7be3d6811f256ecd5d439609086a9e353", + "first_seen_block": 15684132, + }, + { + "app_hash": "0x5cf63a36847e6f6c98c8eb74d03cfbad8e234d7388637ae5ca53be9dd90eccca", + "first_seen_block": 15296113, + }, + { + "app_hash": "0x517af09fe972bd26beb08cd951783b64a494ed22b87e088754156f9fbc9b993f", + "first_seen_block": 13745623, + }, + { + "app_hash": "0x385802ecdb00e564d36aed7410454a64ecb3cfcb84792bad1ead79d1447ab090", + "first_seen_block": 13967575, + }, + { + "app_hash": "0x809877e2e366166a0bcbce17b759b90d2d74af0d830a0441a44ea246876ccec0", + "first_seen_block": 13938253, + }, + { + "app_hash": "0xe81711d4f01afa98128ec73441fbda767db1cf84b8c4c1bfb4243a5dddca1155", + "first_seen_block": 13944768, + }, + { + "app_hash": "0x1ec8fdb53a2697ab15bc4228c9e4d3125c955470b618fcb1659175d0f6e6c991", + "first_seen_block": 14624083, + }, + { + "app_hash": "0xa15b953113490d318baa1e35f1c9c33db3a6f400ce58539db287456b17bf3e58", + "first_seen_block": 13886533, + }, + { + "app_hash": "0x039ffb0546992a50e4eeb70b2b6bb04c0a9a82dcad586b1c4f971d6c66109894", + "first_seen_block": 13607684, + }, + { + "app_hash": "0xc56523f1422d97dcd77ab7471f3de724ac223d5a51c3af832c98a25e8ad30ab3", + "first_seen_block": 13664176, + }, + { + "app_hash": "0x690483cfb64d0a175e4cb85c313e8da3e1826966acefc6697026176adacc9b19", + "first_seen_block": 14699411, + }, + { + "app_hash": "0xe86fbb850981b11ab388e077c9a5a6da6cfe386c55f78709bc6549e5b806fc08", + "first_seen_block": 13905438, + }, + { + "app_hash": "0xba00f0857b72309aba6a395694d999e573d6b183cf5a605341b0f3ddfba333de", + "first_seen_block": 13779206, + }, + { + "app_hash": "0xdf93796ffbd982e40c53ea10517cc90eb1355eb3f843f0dafa441165a337557f", + "first_seen_block": 13616326, + }, + { + "app_hash": "0x12f79d7c232ee2ef20b7aaefaf677a69d00d179d003cf5257d557d9154427f0f", + "first_seen_block": 13971658, + }, + ] + results = await Cid.fetch_many(x, ACCESS_KEY) + print(results) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_order_rewards_schema.py b/tests/unit/test_order_rewards_schema.py new file mode 100644 index 00000000..e8e479b7 --- /dev/null +++ b/tests/unit/test_order_rewards_schema.py @@ -0,0 +1,59 @@ +import unittest + +import pandas as pd + +from src.models.order_rewards_schema import OrderRewards + + +class TestModelOrderRewards(unittest.TestCase): + def test_order_rewards_transformation(self): + rewards_df = pd.DataFrame( + { + "block_number": [1, 2, 3], + "order_uid": ["0x01", "0x02", "0x03"], + "solver": ["0x51", "0x52", "0x53"], + "tx_hash": ["0x71", "0x72", "0x73"], + "surplus_fee": [12345678910111213, 0, 0], + "amount": [40.70410, 39.00522, 0], + } + ) + + self.assertEqual( + [ + { + "block_number": 1, + "order_uid": "0x01", + "solver": "0x51", + "tx_hash": "0x71", + "data": { + "surplus_fee": "12345678910111213", + "amount": 40.70410, + }, + }, + { + "block_number": 2, + "order_uid": "0x02", + "solver": "0x52", + "tx_hash": "0x72", + "data": { + "surplus_fee": "0", + "amount": 39.00522, + }, + }, + { + "block_number": 3, + "order_uid": "0x03", + "solver": "0x53", + "tx_hash": "0x73", + "data": { + "surplus_fee": "0", + "amount": 0.0, + }, + }, + ], + OrderRewards.from_pdf_to_dune_records(rewards_df), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_token_imbalance_schema.py b/tests/unit/test_token_imbalance_schema.py new file mode 100644 index 00000000..136e3c37 --- /dev/null +++ b/tests/unit/test_token_imbalance_schema.py @@ -0,0 +1,46 @@ +import unittest + +import pandas as pd + +from src.models.token_imbalance_schema import TokenImbalance + + +class TestModelTokenImbalance(unittest.TestCase): + def test_token_imbalance_schema(self): + max_uint = 115792089237316195423570985008687907853269984665640564039457584007913129639936 + sample_df = pd.DataFrame( + { + "block_number": pd.Series([123, 456], dtype="int64"), + "tx_hash": [ + "0x71", + "0x72", + ], + "token": [ + "0xa0", + "0xa1", + ], + "amount": [-9999, max_uint], + } + ) + + self.assertEqual( + [ + { + "amount": "-9999", + "block_number": 123, + "token": "0xa0", + "tx_hash": "0x71", + }, + { + "amount": "115792089237316195423570985008687907853269984665640564039457584007913129639936", + "block_number": 456, + "token": "0xa1", + "tx_hash": "0x72", + }, + ], + TokenImbalance.from_pdf_to_dune_records(sample_df), + ) + + +if __name__ == "__main__": + unittest.main()