diff --git a/.env.sample b/.env.sample index 2360f6d5..c913a932 100644 --- a/.env.sample +++ b/.env.sample @@ -1,10 +1,3 @@ -VOLUME_PATH=data -APP_DATA_MAX_RETRIES=3 -APP_DATA_GIVE_UP_THRESHOLD=100 - -# Dune credentials -DUNE_API_KEY= - # AWS Credentials AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= @@ -13,10 +6,3 @@ AWS_INTERNAL_ROLE= AWS_EXTERNAL_ROLE= AWS_EXTERNAL_ID= AWS_BUCKET= - -#Orderbook DB Credentials -BARN_DB_URL={user}:{password}@{host}:{port}/{database} -PROD_DB_URL={user}:{password}@{host}:{port}/{database} - -# IPFS Gateway -IPFS_ACCESS_TOKEN= diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml deleted file mode 100644 index ddf5d275..00000000 --- a/.github/workflows/deploy.yaml +++ /dev/null @@ -1,34 +0,0 @@ -name: deploy - -on: - push: - branches: [main] - tags: [v*] - -jobs: - deploy: - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - - steps: - - uses: actions/checkout@v2 - - - uses: docker/login-action@v1 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - id: meta - uses: docker/metadata-action@v3 - with: - images: ghcr.io/${{ github.repository }} - - uses: docker/build-push-action@v2 - with: - context: . - file: Dockerfile - push: true - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 1d796116..8c03d1f2 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -24,29 +24,3 @@ jobs: run: black --check ./ - name: Type Check (mypy) run: mypy src --strict - - tests: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.10", "3.11"] - - steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - - name: Install Requirements - run: pip install -r requirements/dev.txt - - name: Unit Tests - run: python -m pytest tests/unit - env: - IPFS_ACCESS_KEY: ${{ secrets.IPFS_ACCESS_KEY }} - - name: Integration Tests - run: python -m pytest tests/integration - env: - PROD_DB_URL: ${{ secrets.PROD_DB_URL }} - BARN_DB_URL: ${{ secrets.BARN_DB_URL }} - WAREHOUSE_URL: ${{ secrets.WAREHOUSE_URL }} diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 335b2081..00000000 --- a/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM python:3.10 - -WORKDIR /app - -COPY requirements/* requirements/ -RUN pip install -r requirements/prod.txt -COPY ./src ./src -COPY logging.conf . - -ENTRYPOINT [ "python3", "-m" , "src.main"] diff --git a/requirements/dev.txt b/requirements/dev.txt index 1c31125c..ee2d2d1f 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -5,5 +5,4 @@ black>=22.6.0 mypy==1.3.0 mypy-extensions==1.0.0 pylint>=2.14.4 -pytest>=7.1.2 -sqlalchemy-stubs>=0.4 \ No newline at end of file +pytest>=7.1.2 \ No newline at end of file diff --git a/requirements/prod.txt b/requirements/prod.txt index 16b34982..495afc14 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -1,9 +1,5 @@ dune-client>=0.3.0 -psycopg2-binary>=2.9.3 python-dotenv>=0.20.0 requests>=2.28.1 pandas>=1.5.0 -ndjson>=0.3.1 -py-multiformats-cid>=0.4.4 -boto3>=1.26.12 -SQLAlchemy<2.0 \ No newline at end of file +boto3>=1.26.12 \ No newline at end of file diff --git a/seed_data.zip b/seed_data.zip deleted file mode 100644 index 33d7e1f3..00000000 Binary files a/seed_data.zip and /dev/null differ diff --git a/src/post/aws.py b/src/aws.py similarity index 82% rename from src/post/aws.py rename to src/aws.py index 3e8c455e..f9cb08e5 100644 --- a/src/post/aws.py +++ b/src/aws.py @@ -1,6 +1,7 @@ """Aws S3 Bucket functionality (namely upload_file)""" from __future__ import annotations +import json import os from collections import defaultdict from dataclasses import dataclass @@ -13,7 +14,7 @@ from dotenv import load_dotenv from src.logger import set_log -from src.models.tables import SyncTable +from src.text_io import BytesIteratorIO log = set_log(__name__) @@ -78,21 +79,17 @@ def from_bucket_collection(cls, bucket_objects: Any) -> BucketStructure: object_key = bucket_obj.key path, _ = object_key.split("/") grouped_files[path].append(BucketFileObject.from_key(object_key)) - if path not in SyncTable.supported_tables(): - # Catches any unrecognized filepath. - log.warning(f"Found unexpected file {object_key}") log.debug(f"loaded bucket filesystem: {grouped_files.keys()}") return cls(files=grouped_files) - def get(self, table: SyncTable | str) -> list[BucketFileObject]: + def get(self, table: str) -> list[BucketFileObject]: """ Returns the list of files under `table` - returns empty list if none available. """ - table_str = str(table) if isinstance(table, SyncTable) else table - return self.files.get(table_str, []) + return self.files.get(table, []) class AWSClient: @@ -131,16 +128,22 @@ def _assume_role(self) -> ServiceResource: """ sts_client = boto3.client("sts") + # TODO - assume that the internal role is already assumed. and use get session_token + # sts_client.get_session_token() internal_assumed_role_object = sts_client.assume_role( RoleArn=self.internal_role, RoleSessionName="InternalSession", ) credentials = internal_assumed_role_object["Credentials"] + # sts_client.get_session_token() + sts_client = boto3.client( "sts", - aws_access_key_id=credentials["AccessKeyId"], - aws_secret_access_key=credentials["SecretAccessKey"], - aws_session_token=credentials["SessionToken"], + aws_access_key_id=credentials["AccessKeyId"], # AWS_ACCESS_KEY_ID + aws_secret_access_key=credentials[ + "SecretAccessKey" + ], # AWS_SECRET_ACCESS_KEY + aws_session_token=credentials["SessionToken"], # AWS_SESSION_TOKEN ) external_assumed_role_object = sts_client.assume_role( @@ -176,6 +179,30 @@ def upload_file(self, filename: str, object_key: str) -> bool: log.debug(f"uploaded {filename} to {self.bucket}") return True + def put_object(self, data_set: list[dict[str, Any]], object_key: str) -> bool: + """Upload a file to an S3 bucket + + :param data_list: Data to upload. Should be a full path to file. + :param object_key: S3 object key. For our purposes, this would + be f"{table_name}/cow_{latest_block_number}.json" + :return: True if file was uploaded, else raises + """ + + file_object = BytesIteratorIO( + f"{json.dumps(row)}\n".encode("utf-8") for row in data_set + ) + + s3_client = self._get_s3_client(self._assume_role()) + + s3_client.upload_fileobj( + file_object, + bucket=self.bucket, + key=object_key, + extra_args={"ACL": "bucket-owner-full-control"}, + ) + log.debug(f"uploaded {object_key} to {self.bucket}") + return True + def delete_file(self, object_key: str) -> bool: """Delete a file from an S3 bucket @@ -183,7 +210,6 @@ def delete_file(self, object_key: str) -> bool: be f"{table_name}/cow_{latest_block_number}.json" :return: True if file was deleted, else raises """ - # TODO - types! error: "BaseClient" has no attribute "delete_object" s3_client = self._get_s3_client(self._assume_role()) s3_client.delete_object( # type: ignore Bucket=self.bucket, @@ -220,14 +246,13 @@ def existing_files(self) -> BucketStructure: bucket_objects = bucket.objects.all() return BucketStructure.from_bucket_collection(bucket_objects) - def last_sync_block(self, table: SyncTable | str) -> int: + def last_sync_block(self, table: str) -> int: """ Based on the existing bucket files, the last sync block is uniquely determined from the file names. """ - table_str = str(table) if isinstance(table, SyncTable) else table try: - table_files = self.existing_files().get(table_str) + table_files = self.existing_files().get(table) return max(file_obj.block for file_obj in table_files if file_obj.block) except ValueError as err: # Raised when table_files = [] @@ -235,7 +260,7 @@ def last_sync_block(self, table: SyncTable | str) -> int: f"Could not determine last sync block for {table} files. No files." ) from err - def delete_all(self, table: SyncTable | str) -> None: + def delete_all(self, table: str) -> None: """Deletes all files within the supported tables directory""" log.info(f"Emptying Bucket {table}") try: @@ -245,6 +270,4 @@ def delete_all(self, table: SyncTable | str) -> None: log.info(f"Deleting file {file_data.object_key}") self.delete_file(file_data.object_key) except KeyError as err: - raise ValueError( - f"Invalid table_name {table}, please chose from {SyncTable.supported_tables()}" - ) from err + raise ValueError(f"invalid table name {table}") from err diff --git a/src/dune_queries.py b/src/dune_queries.py deleted file mode 100644 index a484beff..00000000 --- a/src/dune_queries.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -Localized account of all Queries related to this project's main functionality -""" -from __future__ import annotations - -from copy import copy -from dataclasses import dataclass - -from dune_client.query import Query -from dune_client.types import QueryParameter - - -@dataclass -class QueryData: - """Stores name and a version of the query for each query.""" - - name: str - query: Query - - def __init__(self, name: str, query_id: int, filename: str) -> None: - self.name = name - self.filepath = filename - self.query = Query(query_id, name) - - def with_params(self, params: list[QueryParameter]) -> Query: - """ - Copies the query and adds parameters to it, returning the copy. - """ - # We currently default to the V1 Queries, soon to switch them out. - query_copy = copy(self.query) - query_copy.params = params - return query_copy - - -QUERIES = { - "APP_HASHES": QueryData( - query_id=1610025, name="Unique App Hashes", filename="app_hashes.sql" - ), - "LATEST_APP_HASH_BLOCK": QueryData( - query_id=1615490, - name="Latest Possible App Hash Block", - filename="app_hash_latest_block.sql", - ), -} diff --git a/src/environment.py b/src/environment.py index f78290eb..3c87b3c1 100644 --- a/src/environment.py +++ b/src/environment.py @@ -2,5 +2,4 @@ from pathlib import Path PROJECT_ROOT = Path(__file__).parent.parent -QUERY_PATH = PROJECT_ROOT / Path("src/sql") LOG_CONFIG_FILE = PROJECT_ROOT / Path("logging.conf") diff --git a/src/fetch/__init__.py b/src/fetch/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/fetch/dune.py b/src/fetch/dune.py deleted file mode 100644 index 7565399a..00000000 --- a/src/fetch/dune.py +++ /dev/null @@ -1,85 +0,0 @@ -""" -All Dune Query executions should be routed through this file. -TODO - Move reusable components into dune-client: - https://github.com/cowprotocol/dune-bridge/issues/40 -""" -import asyncio -import sys - -from dune_client.client import DuneClient -from dune_client.query import Query -from dune_client.types import DuneRecord -from requests import HTTPError - -from src.dune_queries import QUERIES -from src.logger import set_log -from src.models.block_range import BlockRange - -log = set_log(__name__) - - -class DuneFetcher: - """ - Class containing, DuneClient, FileIO and a logger for convenient Dune Fetching. - """ - - def __init__( - self, - api_key: str, - ) -> None: - """ - Class constructor. - Builds DuneClient from `api_key` along with a logger and FileIO object. - """ - self.dune = DuneClient(api_key) - - async def fetch(self, query: Query) -> list[DuneRecord]: - """Async Dune Fetcher with some exception handling.""" - log.debug(f"Executing {query}") - - try: - # Tried to use the AsyncDuneClient, without success: - # https://github.com/cowprotocol/dune-client/pull/31#issuecomment-1316045313 - response = await asyncio.to_thread( - self.dune.refresh, query, ping_frequency=10 - ) - if response.state.is_complete(): - response_rows = response.get_rows() - log.debug( - f"Got {len(response_rows)} results for execution {response.execution_id}" - ) - return response_rows - - message = ( - f"query execution {response.execution_id} incomplete {response.state}" - ) - log.error(message) - raise RuntimeError(f"no results for {message}") - except HTTPError as err: - log.error(f"Got {err} - Exiting") - sys.exit() - - async def latest_app_hash_block(self) -> int: - """ - Block Range is used to app hash fetcher where to find the new records. - block_from: read from file `fname` as a loaded singleton. - - uses genesis block is no file exists (should only ever happen once) - - raises RuntimeError if column specified does not exist. - block_to: fetched from Dune as the last indexed block for "GPv2Settlement_call_settle" - """ - return int( - # KeyError here means the query has been modified and column no longer exists - # IndexError means no results were returned from query (which is unlikely). - (await self.fetch(QUERIES["LATEST_APP_HASH_BLOCK"].query))[0][ - "latest_block" - ] - ) - - async def get_app_hashes(self, block_range: BlockRange) -> list[DuneRecord]: - """ - Executes APP_HASHES query for the given `block_range` and returns the results - """ - app_hash_query = QUERIES["APP_HASHES"].with_params( - block_range.as_query_params() - ) - return await self.fetch(app_hash_query) diff --git a/src/fetch/ipfs.py b/src/fetch/ipfs.py deleted file mode 100644 index 2f7566e8..00000000 --- a/src/fetch/ipfs.py +++ /dev/null @@ -1,154 +0,0 @@ -"""IPFS CID (de)serialization""" -from __future__ import annotations - -import asyncio -from typing import Any, Optional - -import aiohttp -import requests -from aiohttp import ClientSession -from multiformats_cid.cid import from_bytes - -from src.logger import set_log -from src.models.app_data_content import FoundContent, NotFoundContent - -log = set_log(__name__) - -OLD_PREFIX = bytearray([1, 112, 18, 32]) -# https://github.com/cowprotocol/services/blob/2db800aa38824e32fb542c9e2387d77ca6349676/crates/app-data-hash/src/lib.rs#L41-L44 -NEW_PREFIX = bytearray([1, 0x55, 0x1B, 32]) - - -class Cid: - """Holds logic for constructing and converting various representations of a Delegation ID""" - - def __init__(self, hex_str: str, prefix: bytearray = NEW_PREFIX) -> None: - """Builds Object (bytes as base representation) from hex string.""" - stripped_hex = hex_str.replace("0x", "") - # Anatomy of a CID: https://proto.school/anatomy-of-a-cid/04 - self.bytes = bytes(prefix + bytes.fromhex(stripped_hex)) - - @classmethod - def old_schema(cls, hex_str: str) -> Cid: - """Constructor of old CID format (with different prefix)""" - return cls(hex_str, OLD_PREFIX) - - @property - def hex(self) -> str: - """Returns hex representation""" - without_prefix = self.bytes[4:] - return "0x" + without_prefix.hex() - - def __str__(self) -> str: - """Returns string representation""" - return str(from_bytes(self.bytes)) - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Cid): - return False - return self.bytes == other.bytes - - def url(self) -> str: - """IPFS URL where content can be recovered""" - return f"https://ipfs.cow.fi/ipfs/{self}" - - def get_content(self, access_token: str, max_retries: int = 3) -> Optional[Any]: - """ - Attempts to fetch content at cid with a timeout of 1 second. - Trys `max_retries` times and otherwise returns None` - """ - attempts = 0 - while attempts < max_retries: - try: - response = requests.get( - self.url(), - timeout=1, - headers={"x-pinata-gateway-token": access_token}, - ) - return response.json() - except requests.exceptions.ReadTimeout: - attempts += 1 - except requests.exceptions.JSONDecodeError as err: - attempts += 1 - log.warning(f"unexpected error {err} retrying...") - return None - - @classmethod - async def fetch_many( # pylint: disable=too-many-locals - cls, missing_rows: list[dict[str, str]], access_token: str, max_retries: int = 3 - ) -> tuple[list[FoundContent], list[NotFoundContent]]: - """Async AppData Fetching""" - found, not_found = [], [] - async with aiohttp.ClientSession( - headers={"x-pinata-gateway-token": access_token} - ) as session: - while missing_rows: - row = missing_rows.pop() - app_hash = row["app_hash"] - - previous_attempts = int(row.get("attempts", 0)) - cid = cls(app_hash) - - first_seen_block = int(row["first_seen_block"]) - result = await cid.fetch_content( - max_retries, previous_attempts, session, first_seen_block - ) - if isinstance(result, NotFoundContent): - # Try Fetching the old format - result = await cls.old_schema(app_hash).fetch_content( - max_retries, previous_attempts, session, first_seen_block - ) - - if isinstance(result, FoundContent): - found.append(result) - else: - assert isinstance(result, NotFoundContent) - not_found.append(result) - - return found, not_found - - async def fetch_content( - self, - max_retries: int, - previous_attempts: int, - session: ClientSession, - first_seen_block: int, - ) -> FoundContent | NotFoundContent: - """Asynchronous content fetching""" - attempts = 0 - while attempts < max_retries: - try: - async with session.get(self.url(), timeout=1) as response: - content = await response.json() - if previous_attempts: - log.debug( - f"Found previously missing content hash {self.hex} at CID {self}" - ) - else: - log.debug( - f"Found content for {self.hex} at CID {self} ({attempts + 1} trys)" - ) - return FoundContent( - app_hash=self.hex, - first_seen_block=first_seen_block, - content=content, - ) - except asyncio.TimeoutError: - attempts += 1 - except aiohttp.ContentTypeError as err: - log.warning(f"failed to parse response {response} with {err}") - attempts += 1 - - # Content Not Found. - total_attempts = previous_attempts + max_retries - base_message = f"no content found for {self.hex} at CID {self} after" - if previous_attempts: - log.debug(f"still {base_message} {total_attempts} attempts") - else: - log.debug(f"{base_message} {max_retries} retries") - - return NotFoundContent( - app_hash=self.hex, - first_seen_block=first_seen_block, - attempts=total_attempts, - ) diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py deleted file mode 100644 index 06f65603..00000000 --- a/src/fetch/orderbook.py +++ /dev/null @@ -1,111 +0,0 @@ -"""Basic client for connecting to postgres database with login credentials""" -from __future__ import annotations - -import os -from dataclasses import dataclass -from enum import Enum -from typing import Optional - -import pandas as pd -from dotenv import load_dotenv -from pandas import DataFrame -from sqlalchemy import create_engine -from sqlalchemy.engine import Engine - -from src.models.block_range import BlockRange -from src.utils import open_query - -REORG_THRESHOLD = 65 - - -class OrderbookEnv(Enum): - """ - Enum for distinguishing between CoW Protocol's staging and production environment - """ - - BARN = "BARN" - PROD = "PROD" - - def __str__(self) -> str: - return str(self.value) - - -@dataclass -class OrderbookFetcher: - """ - A pair of Dataframes primarily intended to store query results - from production and staging orderbook databases - """ - - @staticmethod - def _pg_engine(db_env: OrderbookEnv) -> Engine: - """Returns a connection to postgres database""" - load_dotenv() - db_url = os.environ[f"{db_env}_DB_URL"] - db_string = f"postgresql+psycopg2://{db_url}" - return create_engine(db_string) - - @classmethod - def _read_query_for_env( - cls, query: str, env: OrderbookEnv, data_types: Optional[dict[str, str]] = None - ) -> DataFrame: - return pd.read_sql_query(query, con=cls._pg_engine(env), dtype=data_types) - - @classmethod - def _query_both_dbs( - cls, query: str, data_types: Optional[dict[str, str]] = None - ) -> tuple[DataFrame, DataFrame]: - barn = cls._read_query_for_env(query, OrderbookEnv.BARN, data_types) - prod = cls._read_query_for_env(query, OrderbookEnv.PROD, data_types) - return barn, prod - - @classmethod - def get_latest_block(cls) -> int: - """ - Fetches the latest mutually synced block from orderbook databases (with REORG protection) - """ - data_types = {"latest": "int64"} - barn, prod = cls._query_both_dbs( - open_query("orderbook/latest_block.sql"), data_types - ) - assert len(barn) == 1 == len(prod), "Expecting single record" - return min(int(barn["latest"][0]), int(prod["latest"][0])) - REORG_THRESHOLD - - @classmethod - def get_order_rewards(cls, block_range: BlockRange) -> DataFrame: - """ - Fetches and validates Order Reward DataFrame as concatenation from Prod and Staging DB - """ - cow_reward_query = ( - open_query("orderbook/order_rewards.sql") - .replace("{{start_block}}", str(block_range.block_from)) - .replace("{{end_block}}", str(block_range.block_to)) - ) - data_types = {"block_number": "int64", "amount": "float64"} - barn, prod = cls._query_both_dbs(cow_reward_query, data_types) - - # Solvers do not appear in both environments! - assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!" - return pd.concat([prod, barn]) - - @classmethod - def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame: - """ - Fetches and validates Batch Rewards DataFrame as concatenation from Prod and Staging DB - """ - cow_reward_query = ( - open_query("orderbook/batch_rewards.sql") - .replace("{{start_block}}", str(block_range.block_from)) - .replace("{{end_block}}", str(block_range.block_to)) - ) - data_types = { - # According to this: https://stackoverflow.com/a/11548224 - # capitalized int64 means `Optional` and it appears to work. - "block_number": "Int64", - "block_deadline": "int64", - } - barn, prod = cls._query_both_dbs(cow_reward_query, data_types) - - # Solvers do not appear in both environments! - assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!" - return pd.concat([prod, barn]) diff --git a/src/fetch/postgres.py b/src/fetch/postgres.py deleted file mode 100644 index 3364c519..00000000 --- a/src/fetch/postgres.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Generic Postgres Adapter for executing queries on a postgres DB.""" -from dataclasses import dataclass -from typing import Optional - -import pandas as pd -from pandas import DataFrame -from sqlalchemy import create_engine -from sqlalchemy.engine import Engine - -from src.fetch.orderbook import REORG_THRESHOLD -from src.models.block_range import BlockRange -from src.utils import open_query - - -@dataclass -class PostgresFetcher: - """ - Basic Postgres interface - """ - - engine: Engine - - def __init__(self, db_url: str): - db_string = f"postgresql+psycopg2://{db_url}" - - self.engine = create_engine(db_string) - - def _read_query( - self, query: str, data_types: Optional[dict[str, str]] = None - ) -> DataFrame: - return pd.read_sql_query(query, con=self.engine, dtype=data_types) - - def get_latest_block(self) -> int: - """ - Fetches the latest mutually synced block from orderbook databases (with REORG protection) - """ - data_types = {"latest": "int64"} - res = self._read_query(open_query("warehouse/latest_block.sql"), data_types) - assert len(res) == 1, "Expecting single record" - return int(res["latest"][0]) - REORG_THRESHOLD - - def get_internal_imbalances(self, block_range: BlockRange) -> DataFrame: - """ - Fetches and validates Internal Token Imbalances - """ - cow_reward_query = ( - open_query("warehouse/token_imbalances.sql") - .replace("{{start_block}}", str(block_range.block_from)) - .replace("{{end_block}}", str(block_range.block_to)) - ) - data_types = { - "block_number": "int64", - } - return self._read_query(cow_reward_query, data_types) diff --git a/src/main.py b/src/main.py deleted file mode 100644 index 834ca3c1..00000000 --- a/src/main.py +++ /dev/null @@ -1,95 +0,0 @@ -"""Main Entry point for app_hash sync""" -import argparse -import asyncio -import os -from dataclasses import dataclass -from pathlib import Path - -from dotenv import load_dotenv - -from src.fetch.dune import DuneFetcher -from src.fetch.orderbook import OrderbookFetcher -from src.fetch.postgres import PostgresFetcher -from src.logger import set_log -from src.models.tables import SyncTable -from src.post.aws import AWSClient -from src.sync import sync_app_data -from src.sync.config import SyncConfig, AppDataSyncConfig -from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards -from src.sync.token_imbalance import sync_internal_imbalance - -log = set_log(__name__) - - -@dataclass -class ScriptArgs: - """Runtime arguments' parser/initializer""" - - dry_run: bool - sync_table: SyncTable - - def __init__(self) -> None: - parser = argparse.ArgumentParser("Dune Community Sources Sync") - parser.add_argument( - "--sync-table", - type=SyncTable, - required=True, - choices=list(SyncTable), - ) - parser.add_argument( - "--dry-run", - type=bool, - help="Flag indicating whether script should not post files to AWS or not", - default=False, - ) - arguments, _ = parser.parse_known_args() - self.sync_table: SyncTable = arguments.sync_table - self.dry_run: bool = arguments.dry_run - - -if __name__ == "__main__": - load_dotenv() - volume_path = Path(os.environ["VOLUME_PATH"]) - args = ScriptArgs() - aws = AWSClient.new_from_environment() - - if args.sync_table == SyncTable.APP_DATA: - asyncio.run( - sync_app_data( - aws, - dune=DuneFetcher(os.environ["DUNE_API_KEY"]), - config=AppDataSyncConfig( - volume_path=volume_path, - missing_files_name="missing_app_hashes.json", - max_retries=int(os.environ.get("APP_DATA_MAX_RETRIES", 3)), - give_up_threshold=int( - os.environ.get("APP_DATA_GIVE_UP_THRESHOLD", 100) - ), - ), - ipfs_access_key=os.environ["IPFS_ACCESS_KEY"], - dry_run=args.dry_run, - ) - ) - elif args.sync_table == SyncTable.ORDER_REWARDS: - sync_order_rewards( - aws, - config=SyncConfig(volume_path), - fetcher=OrderbookFetcher(), - dry_run=args.dry_run, - ) - elif args.sync_table == SyncTable.BATCH_REWARDS: - sync_batch_rewards( - aws, - config=SyncConfig(volume_path), - fetcher=OrderbookFetcher(), - dry_run=args.dry_run, - ) - elif args.sync_table == SyncTable.INTERNAL_IMBALANCE: - sync_internal_imbalance( - aws, - config=SyncConfig(volume_path), - fetcher=PostgresFetcher(os.environ["WAREHOUSE_URL"]), - dry_run=args.dry_run, - ) - else: - log.error(f"unsupported sync_table '{args.sync_table}'") diff --git a/src/models/__init__.py b/src/models/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/models/app_data_content.py b/src/models/app_data_content.py deleted file mode 100644 index 20c3055a..00000000 --- a/src/models/app_data_content.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Models for Found and Not Found App Data Content. Also, responsible for type conversion""" -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - - -@dataclass -class FoundContent: - """Representation of AppData with Content""" - - app_hash: str - first_seen_block: int - content: dict[str, Any] - - @classmethod - def from_dict(cls, row: dict[str, Any]) -> FoundContent: - """Constructor from dictionary""" - return cls( - app_hash=row["app_hash"], - first_seen_block=int(row["first_seen_block"]), - content=row["content"], - ) - - def as_dune_record(self) -> dict[str, Any]: - """Converts to DuneRecord type""" - return { - "app_hash": self.app_hash, - "first_seen_block": self.first_seen_block, - "content": self.content, - } - - -@dataclass -class NotFoundContent: - """ - Representation of AppData with unknown content. - Records also number of attempts made to recover the content""" - - app_hash: str - first_seen_block: int - attempts: int - - @classmethod - def from_dict(cls, row: dict[str, Any]) -> NotFoundContent: - """Constructor from dictionary""" - return cls( - app_hash=row["app_hash"], - first_seen_block=int(row["first_seen_block"]), - attempts=int(row["attempts"]), - ) - - def as_dune_record(self) -> dict[str, Any]: - """Converts to DuneRecord type""" - return { - "app_hash": self.app_hash, - "first_seen_block": self.first_seen_block, - "attempts": self.attempts, - } diff --git a/src/models/batch_rewards_schema.py b/src/models/batch_rewards_schema.py deleted file mode 100644 index a3bc5bab..00000000 --- a/src/models/batch_rewards_schema.py +++ /dev/null @@ -1,41 +0,0 @@ -"""Model for Batch Rewards Data""" -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - -import pandas -from pandas import DataFrame - - -@dataclass -class BatchRewards: - """ - This class provides a transformation interface for the Dataframe we fetch from the orderbook - """ - - @classmethod - def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]]: - """Converts Pandas DataFrame into the expected stream type for Dune""" - return [ - { - "block_number": int(row["block_number"]) - if not pandas.isna(row["block_number"]) - else None, - "tx_hash": row["tx_hash"], - "solver": row["solver"], - "block_deadline": int(row["block_deadline"]), - "data": { - # All the following values are in WEI. - "uncapped_payment_eth": int(row["uncapped_payment_eth"]), - "capped_payment": int(row["capped_payment"]), - "execution_cost": int(row["execution_cost"]), - "surplus": int(row["surplus"]), - "fee": int(row["fee"]), - "winning_score": int(row["winning_score"]), - "reference_score": int(row["reference_score"]), - "participating_solvers": row["participating_solvers"], - }, - } - for row in rewards_df.to_dict(orient="records") - ] diff --git a/src/models/block_range.py b/src/models/block_range.py deleted file mode 100644 index 25f7a89b..00000000 --- a/src/models/block_range.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -BlockRange Model is just a data class for left and right bounds -""" -from dataclasses import dataclass - -from dune_client.types import QueryParameter - - -@dataclass -class BlockRange: - """ - Basic dataclass for an Ethereum block range with some Dune compatibility methods. - TODO (easy) - this data class could probably live in dune-client. - https://github.com/cowprotocol/dune-bridge/issues/40 - """ - - block_from: int - block_to: int - - def __str__(self) -> str: - return f"BlockRange(from={self.block_from}, to={self.block_to})" - - def __repr__(self) -> str: - return str(self) - - def as_query_params(self) -> list[QueryParameter]: - """Returns self as Dune QueryParameters""" - return [ - QueryParameter.number_type("BlockFrom", self.block_from), - QueryParameter.number_type("BlockTo", self.block_to), - ] diff --git a/src/models/order_rewards_schema.py b/src/models/order_rewards_schema.py deleted file mode 100644 index 2b7734a2..00000000 --- a/src/models/order_rewards_schema.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Model for Order Rewards Data""" -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - -from pandas import DataFrame - - -@dataclass -class OrderRewards: - """ - This class provides a transformation interface for the Dataframe we fetch from the orderbook - """ - - @classmethod - def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]]: - """Converts Pandas DataFrame into the expected stream type for Dune""" - return [ - { - "block_number": int(row["block_number"]), - "order_uid": row["order_uid"], - "tx_hash": row["tx_hash"], - "solver": row["solver"], - "data": { - "surplus_fee": str(row["surplus_fee"]), - "amount": float(row["amount"]), - }, - } - for row in rewards_df.to_dict(orient="records") - ] diff --git a/src/models/tables.py b/src/models/tables.py deleted file mode 100644 index 147f217a..00000000 --- a/src/models/tables.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Data structure containing the supported sync tables""" -from enum import Enum - - -class SyncTable(Enum): - """Enum for Deployment Supported Table Sync""" - - APP_DATA = "app_data" - ORDER_REWARDS = "order_rewards" - BATCH_REWARDS = "batch_rewards" - INTERNAL_IMBALANCE = "internal_imbalance" - - def __str__(self) -> str: - return str(self.value) - - @staticmethod - def supported_tables() -> list[str]: - """Returns a list of supported tables (i.e. valid object contructors).""" - return [str(t) for t in list(SyncTable)] diff --git a/src/models/token_imbalance_schema.py b/src/models/token_imbalance_schema.py deleted file mode 100644 index e7053cd4..00000000 --- a/src/models/token_imbalance_schema.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Model for Batch Rewards Data""" -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - -from pandas import DataFrame - - -@dataclass -class TokenImbalance: - """ - This class provides a transformation interface (to JSON) for Dataframe - """ - - @classmethod - def from_pdf_to_dune_records(cls, frame: DataFrame) -> list[dict[str, Any]]: - """Converts Pandas DataFrame into the expected stream type for Dune""" - return [ - { - "block_number": int(row["block_number"]), - "tx_hash": row["tx_hash"], - "token": row["token"], - "amount": str(row["amount"]), - } - for row in frame.to_dict(orient="records") - ] diff --git a/src/post/__init__.py b/src/post/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/record_handler.py b/src/record_handler.py new file mode 100644 index 00000000..718e6252 --- /dev/null +++ b/src/record_handler.py @@ -0,0 +1,107 @@ +""" +Abstraction for New Content Handling +provides a framework for writing new content to disk and posting to AWS +""" +import sys +from typing import Any + +from s3transfer import S3UploadFailedError +from src.aws import AWSClient + +from src.logger import set_log + +log = set_log(__name__) + + +def last_sync_block(aws: AWSClient, table: str, genesis_block: int = 0) -> int: + """Attempts to get last sync block from AWS Bucket files, otherwise uses genesis""" + try: + block_from = aws.last_sync_block(table) + except FileNotFoundError: + log.warning( + f"last sync could not be evaluated from AWS, using genesis block {genesis_block}" + ) + block_from = genesis_block + + return block_from + + +class RecordHandler: + + """ + This class is responsible for consuming new dune records and missing values from previous runs + it attempts to fetch content for them and filters them into "found" and "not found" as necessary + """ + + def __init__( + self, + file_index: int, + file_prefix: str, + table: str, + data_set: list[dict[str, Any]], + ): + self.file_index = file_index + self.file_prefix = file_prefix + self.table = table + self.data_set = data_set + + def num_records(self) -> int: + """Returns number of records to handle""" + return len(self.data_set) + + @property + def content_filename(self) -> str: + """returns filename""" + return f"{self.file_prefix}_{self.file_index}.json" + + @property + def object_key(self) -> str: + """returns object key""" + return f"{self.table}/{self.content_filename}" + + def _aws_login_and_upload( + self, aws: AWSClient, data_set: list[dict[str, Any]] + ) -> bool: + """Creates AWS client session and attempts to upload file""" + try: + return aws.put_object( + data_set, + object_key=self.object_key, + ) + except S3UploadFailedError as err: + log.error(err) + sys.exit(1) + + def write_and_upload_content(self, aws: AWSClient, dry_run: bool) -> None: + """ + - Writes record handlers content to persistent volume, + - attempts to upload to AWS and + - records last sync block on volume. + When dryrun flag is enabled, does not upload to IPFS. + """ + count = self.num_records() + if count > 0: + log.info( + f"posting {count} new {self.table} records for file index {self.file_index}" + ) + if dry_run: + log.info("DRY-RUN-ENABLED: new records not posted to AWS.") + else: + try: + aws.put_object( + data_set=self.data_set, + object_key=self.object_key, + ) + log.info( + f"{self.table} sync for file index {self.file_index} complete: " + f"synced {count} records" + ) + return + except S3UploadFailedError as err: + log.error(err) + sys.exit(1) + + else: + log.info( + f"No new {self.table} for file index {self.file_index}: no sync necessary" + ) diff --git a/src/scripts/__init__.py b/src/scripts/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/scripts/download_file.py b/src/scripts/download_file.py deleted file mode 100644 index 66100521..00000000 --- a/src/scripts/download_file.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Downloads file from AWS to PWD""" -import argparse - -from dotenv import load_dotenv - -from src.post.aws import AWSClient - - -def download_file(aws: AWSClient, object_key: str) -> None: - """Download file (`object_key`) from AWS""" - aws.download_file( - filename=object_key.split("/")[1], - object_key=object_key, - ) - - -if __name__ == "__main__": - load_dotenv() - parser = argparse.ArgumentParser("Download File from Bucket") - parser.add_argument( - "filename", - type=str, - required=True, - ) - args, _ = parser.parse_known_args() - download_file(aws=AWSClient.new_from_environment(), object_key=args.object_key) diff --git a/src/scripts/empty_bucket.py b/src/scripts/empty_bucket.py deleted file mode 100644 index b36a4b56..00000000 --- a/src/scripts/empty_bucket.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -Script to empty AWS bucket. -Used for re-deployments involving schema change. -""" -import os -import shutil -from pathlib import Path - -from dotenv import load_dotenv - -from src.main import ScriptArgs -from src.models.tables import SyncTable -from src.post.aws import AWSClient - - -def empty_bucket(table: SyncTable, aws: AWSClient, volume_path: Path) -> None: - """ - Empties the bucket for `table` - and deletes backup from mounted volume - """ - # Drop Data from AWS bucket - aws.delete_all(table) - # drop backup data from volume path - shutil.rmtree(volume_path / str(table)) - - -if __name__ == "__main__": - load_dotenv() - empty_bucket( - table=ScriptArgs().sync_table, - aws=AWSClient.new_from_environment(), - volume_path=Path(os.environ["VOLUME_PATH"]), - ) diff --git a/src/sql/app_hash_latest_block.sql b/src/sql/app_hash_latest_block.sql deleted file mode 100644 index 571fce55..00000000 --- a/src/sql/app_hash_latest_block.sql +++ /dev/null @@ -1,4 +0,0 @@ --- https://dune.com/queries/1615490 -select - max(call_block_number) as latest_block -from gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle \ No newline at end of file diff --git a/src/sql/app_hashes.sql b/src/sql/app_hashes.sql deleted file mode 100644 index daef7452..00000000 --- a/src/sql/app_hashes.sql +++ /dev/null @@ -1,21 +0,0 @@ --- App Hashes: https://dune.com/queries/1610025 --- MIN(first_block_seen) = 12153263 --- Nov 16, 2022: Query takes 4 seconds to run for on full block range -with -app_hashes as ( - select - min(call_block_number) first_seen_block, - get_json_object(trade, '$.appData') as app_hash - from gnosis_protocol_v2_ethereum.GPv2Settlement_call_settle - lateral view explode(trades) as trade - group by app_hash -) -select - app_hash, - first_seen_block -from app_hashes -where first_seen_block > '{{BlockFrom}}' -and first_seen_block <= '{{BlockTo}}' - --- For some additional stats, --- on this data see https://dune.com/queries/1608286 \ No newline at end of file diff --git a/src/sql/orderbook/batch_rewards.sql b/src/sql/orderbook/batch_rewards.sql deleted file mode 100644 index affa000b..00000000 --- a/src/sql/orderbook/batch_rewards.sql +++ /dev/null @@ -1,97 +0,0 @@ -WITH observed_settlements AS (SELECT - -- settlement - tx_hash, - solver, - s.block_number, - -- settlement_observations - effective_gas_price * gas_used AS execution_cost, - surplus, - fee, - -- auction_transaction - at.auction_id - FROM settlement_observations so - JOIN settlements s - ON s.block_number = so.block_number - AND s.log_index = so.log_index - JOIN auction_transaction at - ON s.tx_from = at.tx_from - AND s.tx_nonce = at.tx_nonce - JOIN settlement_scores ss - ON at.auction_id = ss.auction_id - WHERE ss.block_deadline > {{start_block}} - AND ss.block_deadline <= {{end_block}}), - - auction_participation as (SELECT ss.auction_id, - array_agg( - concat('0x', encode(participant, 'hex')) ORDER BY participant - ) as participating_solvers - FROM auction_participants - JOIN settlement_scores ss - ON auction_participants.auction_id = ss.auction_id - WHERE block_deadline > {{start_block}} - AND block_deadline <= {{end_block}} - GROUP BY ss.auction_id), - reward_data AS (SELECT - -- observations - tx_hash, - ss.auction_id, - -- TODO - Assuming that `solver == winner` when both not null - -- We will need to monitor that `solver == winner`! - coalesce(solver, winner) as solver, - block_number as settlement_block, - block_deadline, - case - when block_number is not null and block_number > block_deadline then 0 - else coalesce(execution_cost, 0) end as execution_cost, - case - when block_number is not null and block_number > block_deadline then 0 - else coalesce(surplus, 0) end as surplus, - case - when block_number is not null and block_number > block_deadline then 0 - else coalesce(fee, 0) end as fee, - -- scores - winning_score, - reference_score, - -- auction_participation - participating_solvers - FROM settlement_scores ss - -- If there are reported scores, - -- there will always be a record of auction participants - JOIN auction_participation ap - ON ss.auction_id = ap.auction_id - -- outer joins made in order to capture non-existent settlements. - LEFT OUTER JOIN observed_settlements os - ON os.auction_id = ss.auction_id), - reward_per_auction as (SELECT tx_hash, - settlement_block, - block_deadline, - solver, - execution_cost, - surplus, - fee, - surplus + fee - reference_score as uncapped_payment_eth, - -- Uncapped Reward = CLAMP_[-E, E + exec_cost](uncapped_payment_eth) - LEAST(GREATEST(-10000000000000000, surplus + fee - reference_score), - 10000000000000000 + execution_cost) as capped_payment, - winning_score, - reference_score, - participating_solvers - FROM reward_data) - - -SELECT settlement_block as block_number, - block_deadline, - case - when tx_hash is NULL then NULL - else concat('0x', encode(tx_hash, 'hex')) - end as tx_hash, - concat('0x', encode(solver, 'hex')) as solver, - execution_cost::text as execution_cost, - surplus::text as surplus, - fee::text as fee, - uncapped_payment_eth::text as uncapped_payment_eth, - capped_payment::text as capped_payment, - winning_score::text as winning_score, - reference_score::text as reference_score, - participating_solvers -FROM reward_per_auction diff --git a/src/sql/orderbook/latest_block.sql b/src/sql/orderbook/latest_block.sql deleted file mode 100644 index acc3a5d4..00000000 --- a/src/sql/orderbook/latest_block.sql +++ /dev/null @@ -1,3 +0,0 @@ -select min(block_number) latest -from settlements -where tx_from is null; \ No newline at end of file diff --git a/src/sql/orderbook/order_rewards.sql b/src/sql/orderbook/order_rewards.sql deleted file mode 100644 index d80666f5..00000000 --- a/src/sql/orderbook/order_rewards.sql +++ /dev/null @@ -1,33 +0,0 @@ -with trade_hashes as (SELECT solver, - block_number, - order_uid, - fee_amount, - settlement.tx_hash, - auction_id - FROM trades t - LEFT OUTER JOIN LATERAL ( - SELECT tx_hash, solver, tx_nonce, tx_from - FROM settlements s - WHERE s.block_number = t.block_number - AND s.log_index > t.log_index - ORDER BY s.log_index ASC - LIMIT 1 - ) AS settlement ON true - join auction_transaction - -- This join also eliminates overlapping - -- trades & settlements between barn and prod DB - on settlement.tx_from = auction_transaction.tx_from - and settlement.tx_nonce = auction_transaction.tx_nonce - where block_number > {{start_block}} and block_number <= {{end_block}}) - --- Most efficient column order for sorting would be having tx_hash or order_uid first -select block_number, - concat('0x', encode(trade_hashes.order_uid, 'hex')) as order_uid, - concat('0x', encode(solver, 'hex')) as solver, - concat('0x', encode(tx_hash, 'hex')) as tx_hash, - coalesce(surplus_fee, 0)::text as surplus_fee, - coalesce(reward, 0.0) as amount -from trade_hashes - left outer join order_execution o - on trade_hashes.order_uid = o.order_uid - and trade_hashes.auction_id = o.auction_id; diff --git a/src/sql/warehouse/latest_block.sql b/src/sql/warehouse/latest_block.sql deleted file mode 100644 index 609534b6..00000000 --- a/src/sql/warehouse/latest_block.sql +++ /dev/null @@ -1,2 +0,0 @@ -select max(block_number) as latest -from settlements; diff --git a/src/sql/warehouse/token_imbalances.sql b/src/sql/warehouse/token_imbalances.sql deleted file mode 100644 index 0b81f9ba..00000000 --- a/src/sql/warehouse/token_imbalances.sql +++ /dev/null @@ -1,8 +0,0 @@ -select block_number, - concat('0x', encode(s.tx_hash, 'hex')) as tx_hash, - concat('0x', encode(token, 'hex')) as token, - amount::text as amount -from internalized_imbalances - join settlements s - on internalized_imbalances.tx_hash = s.tx_hash -where block_number > {{start_block}} and block_number <= {{end_block}}; diff --git a/src/sync/__init__.py b/src/sync/__init__.py deleted file mode 100644 index 37b723b3..00000000 --- a/src/sync/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Re-exported sync methods.""" -from .app_data import sync_app_data diff --git a/src/sync/app_data.py b/src/sync/app_data.py deleted file mode 100644 index 8f5f143d..00000000 --- a/src/sync/app_data.py +++ /dev/null @@ -1,154 +0,0 @@ -"""Main Entry point for app_hash sync""" - -from dune_client.file.interface import FileIO -from dune_client.types import DuneRecord - -from src.fetch.dune import DuneFetcher -from src.fetch.ipfs import Cid -from src.logger import set_log -from src.models.app_data_content import FoundContent, NotFoundContent -from src.models.block_range import BlockRange -from src.models.tables import SyncTable -from src.post.aws import AWSClient -from src.sync.common import last_sync_block -from src.sync.config import SyncConfig, AppDataSyncConfig -from src.sync.record_handler import RecordHandler -from src.sync.upload_handler import UploadHandler - -log = set_log(__name__) - - -SYNC_TABLE = SyncTable.APP_DATA - - -class AppDataHandler(RecordHandler): # pylint:disable=too-many-instance-attributes - """ - This class is responsible for consuming new dune records and missing values from previous runs - it attempts to fetch content for them and filters them into "found" and "not found" as necessary - """ - - def __init__( # pylint:disable=too-many-arguments - self, - file_manager: FileIO, - new_rows: list[DuneRecord], - block_range: BlockRange, - config: SyncConfig, - ipfs_access_key: str, - missing_file_name: str, - ): - super().__init__(block_range, SYNC_TABLE, config) - self.file_manager = file_manager - self.ipfs_access_key = ipfs_access_key - - self._found: list[FoundContent] = [] - self._not_found: list[NotFoundContent] = [] - - self.new_rows = new_rows - self.missing_file_name = missing_file_name - try: - self.missing_values = self.file_manager.load_ndjson(missing_file_name) - except FileNotFoundError: - self.missing_values = [] - - def num_records(self) -> int: - assert len(self.new_rows) == 0, ( - "this function call is not allowed until self.new_rows have been processed! " - "call fetch_content_and_filter first" - ) - return len(self._found) - - async def _handle_new_records(self, max_retries: int) -> None: - # Drain the dune_results into "found" and "not found" categories - self._found, self._not_found = await Cid.fetch_many( - self.new_rows, self.ipfs_access_key, max_retries - ) - - async def _handle_missing_records( - self, max_retries: int, give_up_threshold: int - ) -> None: - found, not_found = await Cid.fetch_many( - self.missing_values, self.ipfs_access_key, max_retries - ) - while found: - self._found.append(found.pop()) - while not_found: - row = not_found.pop() - app_hash, attempts = row.app_hash, row.attempts - if attempts > give_up_threshold: - log.debug( - f"No content found after {attempts} attempts for {app_hash} assuming NULL." - ) - self._found.append( - FoundContent( - app_hash=app_hash, - first_seen_block=row.first_seen_block, - content={}, - ) - ) - else: - self._not_found.append(row) - - def write_found_content(self) -> None: - assert len(self.new_rows) == 0, "Must call _handle_new_records first!" - self.file_manager.write_ndjson( - data=[x.as_dune_record() for x in self._found], name=self.content_filename - ) - # When not_found is empty, we want to overwrite the file (hence skip_empty=False) - # This happens when number of attempts exceeds GIVE_UP_THRESHOLD - self.file_manager.write_ndjson( - data=[x.as_dune_record() for x in self._not_found], - name=self.missing_file_name, - skip_empty=False, - ) - - def write_sync_data(self) -> None: - # Only write these if upload was successful. - self.file_manager.write_csv( - data=[{self.config.sync_column: str(self.block_range.block_to)}], - name=self.config.sync_file, - ) - - async def fetch_content_and_filter( - self, max_retries: int, give_up_threshold: int - ) -> None: - """ - Run loop fetching app_data for hashes, - separates into (found and not found), returning the pair. - """ - await self._handle_new_records(max_retries) - log.info( - f"Attempting to recover missing {len(self.missing_values)} records from previous run" - ) - await self._handle_missing_records(max_retries, give_up_threshold) - - -async def sync_app_data( - aws: AWSClient, - dune: DuneFetcher, - config: AppDataSyncConfig, - ipfs_access_key: str, - dry_run: bool, -) -> None: - """App Data Sync Logic""" - block_range = BlockRange( - block_from=last_sync_block( - aws, - table=SYNC_TABLE, - genesis_block=12153262, # First App Hash Block - ), - block_to=await dune.latest_app_hash_block(), - ) - - data_handler = AppDataHandler( - file_manager=FileIO(config.volume_path / str(SYNC_TABLE)), - new_rows=await dune.get_app_hashes(block_range), - block_range=block_range, - config=config, - ipfs_access_key=ipfs_access_key, - missing_file_name=config.missing_files_name, - ) - await data_handler.fetch_content_and_filter( - max_retries=config.max_retries, give_up_threshold=config.give_up_threshold - ) - UploadHandler(aws, data_handler, table=SYNC_TABLE).write_and_upload_content(dry_run) - log.info("app_data sync run completed successfully") diff --git a/src/sync/common.py b/src/sync/common.py deleted file mode 100644 index c3a11195..00000000 --- a/src/sync/common.py +++ /dev/null @@ -1,20 +0,0 @@ -"""Shared methods between both sync scripts.""" - -from src.logger import set_log -from src.models.tables import SyncTable -from src.post.aws import AWSClient - -log = set_log(__name__) - - -def last_sync_block(aws: AWSClient, table: SyncTable, genesis_block: int = 0) -> int: - """Attempts to get last sync block from AWS Bucket files, otherwise uses genesis""" - try: - block_from = aws.last_sync_block(table) - except FileNotFoundError: - log.warning( - f"last sync could not be evaluated from AWS, using genesis block {genesis_block}" - ) - block_from = genesis_block - - return block_from diff --git a/src/sync/config.py b/src/sync/config.py deleted file mode 100644 index bbd663c6..00000000 --- a/src/sync/config.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Configuration details for sync jobs""" -from __future__ import annotations - -from dataclasses import dataclass -from pathlib import Path - - -@dataclass -class SyncConfig: - """ - This data class contains all the credentials and volume paths - required to sync with both a persistent volume and Dune's S3 Buckets. - """ - - volume_path: Path - # File System - sync_file: str = "sync_block.csv" - sync_column: str = "last_synced_block" - - -@dataclass -class AppDataSyncConfig(SyncConfig): - """Additional data field for app data sync.""" - - # Maximum number of retries on a single run - max_retries: int = 3 - # Total number of accumulated attempts before we assume no content - give_up_threshold: int = 100 - # Persisted file where we store the missing results and number of attempts. - missing_files_name: str = "missing_app_hashes.json" diff --git a/src/sync/order_rewards.py b/src/sync/order_rewards.py deleted file mode 100644 index 5c2e2596..00000000 --- a/src/sync/order_rewards.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Main Entry point for app_hash sync""" -from typing import Any - -from dune_client.file.interface import FileIO - -from src.fetch.orderbook import OrderbookFetcher -from src.logger import set_log -from src.models.batch_rewards_schema import BatchRewards -from src.models.block_range import BlockRange -from src.models.order_rewards_schema import OrderRewards -from src.models.tables import SyncTable -from src.post.aws import AWSClient -from src.sync.common import last_sync_block -from src.sync.config import SyncConfig -from src.sync.record_handler import RecordHandler -from src.sync.upload_handler import UploadHandler - -log = set_log(__name__) - - -class OrderbookDataHandler( - RecordHandler -): # pylint:disable=too-few-public-methods,too-many-arguments - """ - This class is responsible for consuming new dune records and missing values from previous runs - it attempts to fetch content for them and filters them into "found" and "not found" as necessary - """ - - def __init__( - self, - file_manager: FileIO, - block_range: BlockRange, - sync_table: SyncTable, - config: SyncConfig, - data_list: list[dict[str, Any]], - ): - super().__init__(block_range, sync_table, config) - log.info(f"Handling {len(data_list)} new records") - self.file_manager = file_manager - self.data_list = data_list - - def num_records(self) -> int: - return len(self.data_list) - - def write_found_content(self) -> None: - self.file_manager.write_ndjson(data=self.data_list, name=self.content_filename) - - def write_sync_data(self) -> None: - # Only write these if upload was successful. - self.file_manager.write_csv( - data=[{self.config.sync_column: str(self.block_range.block_to)}], - name=self.config.sync_file, - ) - - -def sync_order_rewards( - aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool -) -> None: - """Order Rewards Data Sync Logic""" - sync_table = SyncTable.ORDER_REWARDS - block_range = BlockRange( - block_from=last_sync_block( - aws, - table=sync_table, - genesis_block=15719994, # First Recorded Order Reward block - ), - block_to=fetcher.get_latest_block(), - ) - sync_orderbook_data( - aws, - block_range, - config, - dry_run, - sync_table=sync_table, - data_list=OrderRewards.from_pdf_to_dune_records( - fetcher.get_order_rewards(block_range) - ), - ) - - -def sync_batch_rewards( - aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool -) -> None: - """Batch Reward Sync Logic""" - sync_table = SyncTable.BATCH_REWARDS - block_range = BlockRange( - block_from=last_sync_block( - aws, - table=sync_table, - genesis_block=16862919, # First Recorded Batch Reward block - ), - block_to=fetcher.get_latest_block(), - ) - sync_orderbook_data( - aws, - block_range, - config, - dry_run, - sync_table, - data_list=BatchRewards.from_pdf_to_dune_records( - fetcher.get_batch_rewards(block_range) - ), - ) - - -def sync_orderbook_data( # pylint:disable=too-many-arguments - aws: AWSClient, - block_range: BlockRange, - config: SyncConfig, - dry_run: bool, - sync_table: SyncTable, - data_list: list[dict[str, Any]], -) -> None: - """Generic Orderbook Sync Logic""" - record_handler = OrderbookDataHandler( - file_manager=FileIO(config.volume_path / str(sync_table)), - block_range=block_range, - config=config, - data_list=data_list, - sync_table=sync_table, - ) - UploadHandler(aws, record_handler, table=sync_table).write_and_upload_content( - dry_run - ) - log.info(f"{sync_table} sync run completed successfully") diff --git a/src/sync/record_handler.py b/src/sync/record_handler.py deleted file mode 100644 index cca9e573..00000000 --- a/src/sync/record_handler.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Abstraction for New Content Handling -provides a framework for writing new content to disk and posting to AWS -""" -from abc import ABC, abstractmethod - -from src.logger import set_log -from src.models.block_range import BlockRange -from src.models.tables import SyncTable -from src.sync.config import SyncConfig - -log = set_log(__name__) - - -class RecordHandler(ABC): # pylint: disable=too-few-public-methods - - """ - This class is responsible for consuming new dune records and missing values from previous runs - it attempts to fetch content for them and filters them into "found" and "not found" as necessary - """ - - def __init__( - self, - block_range: BlockRange, - table: SyncTable, - config: SyncConfig, - ): - self.config = config - self.block_range = block_range - - self.name = str(table) - self.file_path = config.volume_path / self.name - self.content_filename = f"cow_{block_range.block_to}.json" - - @abstractmethod - def num_records(self) -> int: - """Returns number of records to handle""" - - @abstractmethod - def write_found_content(self) -> None: - """Writes content to disk""" - - @abstractmethod - def write_sync_data(self) -> None: - """Records last synced content file""" diff --git a/src/sync/token_imbalance.py b/src/sync/token_imbalance.py deleted file mode 100644 index fe0ca4f3..00000000 --- a/src/sync/token_imbalance.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Main Entry point for token_imbalance sync""" -from dune_client.file.interface import FileIO - -from src.fetch.postgres import PostgresFetcher -from src.logger import set_log -from src.models.block_range import BlockRange -from src.models.tables import SyncTable -from src.models.token_imbalance_schema import TokenImbalance -from src.post.aws import AWSClient -from src.sync.common import last_sync_block -from src.sync.config import SyncConfig -from src.sync.order_rewards import OrderbookDataHandler -from src.sync.upload_handler import UploadHandler - -log = set_log(__name__) - - -def sync_internal_imbalance( - aws: AWSClient, fetcher: PostgresFetcher, config: SyncConfig, dry_run: bool -) -> None: - """Token Imbalance Sync Logic""" - sync_table = SyncTable.INTERNAL_IMBALANCE - block_range = BlockRange( - block_from=last_sync_block( - aws, - table=sync_table, - # The first block for which solver competitions - # are available in production orderbook: - # select * from solver_competitions where id = 1; - genesis_block=15173540, - ), - block_to=fetcher.get_latest_block(), - ) - # TODO - Gap Detection (find missing txHashes and ensure they are accounted for!) - record_handler = OrderbookDataHandler( - file_manager=FileIO(config.volume_path / str(sync_table)), - block_range=block_range, - config=config, - data_list=TokenImbalance.from_pdf_to_dune_records( - fetcher.get_internal_imbalances(block_range) - ), - sync_table=sync_table, - ) - UploadHandler(aws, record_handler, table=sync_table).write_and_upload_content( - dry_run - ) - log.info(f"{sync_table} sync run completed successfully") diff --git a/src/sync/upload_handler.py b/src/sync/upload_handler.py deleted file mode 100644 index 498d042d..00000000 --- a/src/sync/upload_handler.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Upload handler responsible for local file updates and aws uploads""" -import os -import sys - -from s3transfer import S3UploadFailedError - -from src.logger import set_log -from src.models.tables import SyncTable -from src.post.aws import AWSClient -from src.sync.record_handler import RecordHandler - -log = set_log(__name__) - - -class UploadHandler: # pylint: disable=too-few-public-methods - """ - Given an instance of Record handler, ensures that - - files are written locally, - - uploaded to AWS and - - sync block is recorded - in the appropriate order. - """ - - def __init__(self, aws: AWSClient, record_handler: RecordHandler, table: SyncTable): - self.aws = aws - self.record_handler = record_handler - self.table = str(table) - - def _aws_login_and_upload(self) -> bool: - """Creates AWS client session and attempts to upload file""" - path, filename = ( - self.record_handler.file_path, - self.record_handler.content_filename, - ) - try: - return self.aws.upload_file( - filename=os.path.join(path, filename), - object_key=f"{self.table}/{filename}", - ) - except S3UploadFailedError as err: - log.error(err) - sys.exit(1) - - def write_and_upload_content(self, dry_run: bool) -> None: - """ - - Writes record handlers content to persistent volume, - - attempts to upload to AWS and - - records last sync block on volume. - When dryrun flag is enabled, does not upload to IPFS. - """ - record_handler = self.record_handler - num_records, block_range, name = ( - record_handler.num_records(), - record_handler.block_range, - record_handler.name, - ) - - record_handler.write_found_content() - if num_records > 0: - log.info( - f"attempting to post {num_records} new {name} records for block range {block_range}" - ) - if dry_run: - log.info( - "DRY-RUN-ENABLED: New records written to volume, but not posted to AWS." - ) - else: - self._aws_login_and_upload() - log.info( - f"{name} sync for block range {block_range} complete: " - f"synced {num_records} records" - ) - else: - log.info(f"No new {name} for block range {block_range}: no sync necessary") - - record_handler.write_sync_data() diff --git a/src/text_io.py b/src/text_io.py new file mode 100644 index 00000000..0f602e63 --- /dev/null +++ b/src/text_io.py @@ -0,0 +1,76 @@ +"""This is taken from https://hakibenita.com/fast-load-data-python-postgresql""" +# pylint: skip-file +import io +from typing import Iterator, Optional + + +class StringIteratorIO(io.TextIOBase): + def __init__(self, iter: Iterator[str]): + self._iter = iter + self._buff = "" + + def readable(self) -> bool: + return True + + def _read1(self, n: Optional[int] = None) -> str: + while not self._buff: + try: + self._buff = next(self._iter) + except StopIteration: + break + ret = self._buff[:n] + self._buff = self._buff[len(ret) :] + return ret + + def read(self, n: Optional[int] = None) -> str: + line = [] + if n is None or n < 0: + while True: + m = self._read1() + if not m: + break + line.append(m) + else: + while n > 0: + m = self._read1(n) + if not m: + break + n -= len(m) + line.append(m) + return "".join(line) + + +class BytesIteratorIO(io.BufferedIOBase): + def __init__(self, iter: Iterator[bytes]): + self._iter = iter + self._buff = b"" + + def readable(self) -> bool: + return True + + def _read1(self, n: Optional[int] = None) -> bytes: + while not self._buff: + try: + self._buff = next(self._iter) + except StopIteration: + break + ret = self._buff[:n] + self._buff = self._buff[len(ret) :] + return ret + + def read(self, n: Optional[int] = None) -> bytes: + line = [] + if n is None or n < 0: + while True: + m = self._read1() + if not m: + break + line.append(m) + else: + while n > 0: + m = self._read1(n) + if not m: + break + n -= len(m) + line.append(m) + return b"".join(line) diff --git a/src/utils.py b/src/utils.py deleted file mode 100644 index a601e8c9..00000000 --- a/src/utils.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Basic reusable utility functions""" -import os - -from src.environment import QUERY_PATH - - -def open_query(filename: str) -> str: - """Opens `filename` and returns as string""" - with open(query_file(filename), "r", encoding="utf-8") as file: - return file.read() - - -def query_file(filename: str) -> str: - """Returns proper path for filename in QUERY_PATH""" - return os.path.join(QUERY_PATH, filename) diff --git a/tests/e2e/test_sync_app_data.py b/tests/e2e/test_sync_app_data.py deleted file mode 100644 index e6aab234..00000000 --- a/tests/e2e/test_sync_app_data.py +++ /dev/null @@ -1,80 +0,0 @@ -import os -import shutil -import unittest -from pathlib import Path -from unittest import IsolatedAsyncioTestCase - -from dotenv import load_dotenv -from dune_client.file.interface import FileIO - -from src.fetch.dune import DuneFetcher -from src.models.block_range import BlockRange -from src.sync.app_data import AppDataHandler, SYNC_TABLE -from src.sync.config import AppDataSyncConfig - - -class TestSyncAppData(IsolatedAsyncioTestCase): - def setUp(self) -> None: - load_dotenv() - self.dune = DuneFetcher(os.environ["DUNE_API_KEY"]) - self.config = AppDataSyncConfig( - volume_path=Path(os.environ["VOLUME_PATH"]), - missing_files_name="missing_app_hashes.json", - max_retries=2, - give_up_threshold=3, - ) - self.file_manager = FileIO(self.config.volume_path / str(SYNC_TABLE)) - - def tearDown(self) -> None: - shutil.rmtree(self.config.volume_path) - - async def test_fetch_content_and_filter(self): - retries = self.config.max_retries - give_up = self.config.give_up_threshold - missing_files = self.config.missing_files_name - # block numbers - a, b, c = 15582187, 16082187, 16100000 - self.assertTrue(a < b < c) - - block_range_1 = BlockRange( - block_from=a, - block_to=b, - ) - data_handler = AppDataHandler( - file_manager=self.file_manager, - new_rows=await self.dune.get_app_hashes(block_range_1), - block_range=block_range_1, - config=self.config, - missing_file_name=missing_files, - ipfs_access_key=os.environ["IPFS_ACCESS_KEY"], - ) - - print(f"Beginning Content Fetching on {len(data_handler.new_rows)} records") - await data_handler.fetch_content_and_filter(retries, give_up) - data_handler.write_found_content() - self.assertEqual(0, len(data_handler.new_rows)) - - block_range_2 = BlockRange( - block_from=b, - block_to=c, - ) - data_handler = AppDataHandler( - file_manager=self.file_manager, - new_rows=await self.dune.get_app_hashes(block_range_2), - block_range=block_range_2, - config=self.config, - missing_file_name=missing_files, - ) - print( - f"Beginning Second Run Content Fetching on {len(data_handler.new_rows)} records" - ) - await data_handler.fetch_content_and_filter(retries, give_up) - data_handler.write_found_content() - - self.assertEqual(0, len(data_handler.new_rows)) - # Two runs with retries = 2 and give_up = 3 implies no more missing records. - self.assertEqual(0, len(data_handler._not_found)) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_aws.py b/tests/integration/test_aws.py index c4598902..222d09f8 100644 --- a/tests/integration/test_aws.py +++ b/tests/integration/test_aws.py @@ -6,7 +6,7 @@ import pytest from dotenv import load_dotenv -from src.post.aws import AWSClient, BucketFileObject +from src.aws import AWSClient, BucketFileObject class TestAWSConnection(unittest.TestCase): diff --git a/tests/integration/test_fetch_orderbook.py b/tests/integration/test_fetch_orderbook.py deleted file mode 100644 index e4be7fd6..00000000 --- a/tests/integration/test_fetch_orderbook.py +++ /dev/null @@ -1,130 +0,0 @@ -import unittest - -import pandas as pd - -from src.fetch.orderbook import OrderbookFetcher -from src.models.block_range import BlockRange - - -class TestFetchOrderbook(unittest.TestCase): - def test_latest_block_reasonable(self): - self.assertGreater(OrderbookFetcher.get_latest_block(), 16020300) - - def test_latest_block_increasing(self): - latest_block = OrderbookFetcher.get_latest_block() - self.assertGreaterEqual(OrderbookFetcher.get_latest_block(), latest_block) - - def test_get_order_rewards(self): - block_number = 16000000 - block_range = BlockRange(block_number, block_number + 50) - rewards_df = OrderbookFetcher.get_order_rewards(block_range) - expected = pd.DataFrame( - { - "block_number": [16000018, 16000050], - "order_uid": [ - "0xb52fecfe3df73f0e93f1f9b27c92e3def50322960f9c62d0b97bc2ceee36c07a0639dda84198dc06f5bc91bddbb62cd2e38c2f9a6378140f", - "0xf61cba0b42ed3e956f9db049c0523e123967723c5bcf76ccac0b179a66305b2a7fee439ed7a6bb1b8e7ca1ffdb0a5ca8d993c030637815ad", - ], - "solver": [ - "0x3cee8c7d9b5c8f225a8c36e7d3514e1860309651", - "0xc9ec550bea1c64d779124b23a26292cc223327b6", - ], - "tx_hash": [ - "0xb6f7df8a1114129f7b61f2863b3f81b3620e95f73e5b769a62bb7a87ab6983f4", - "0x2ce77009e78c291cdf39eb6f8ddf7e2c3401b4f962ef1240bdac47e632f8eb7f", - ], - "surplus_fee": ["0", "0"], - "amount": [40.70410, 39.00522], - } - ) - - self.assertIsNone(pd.testing.assert_frame_equal(expected, rewards_df)) - - def test_get_batch_rewards(self): - block_number = 16846500 - block_range = BlockRange(block_number, block_number + 25) - rewards_df = OrderbookFetcher.get_batch_rewards(block_range) - expected = pd.DataFrame( - { - "block_number": pd.Series([16846495, 16846502, pd.NA], dtype="Int64"), - "block_deadline": [16846509, 16846516, 16846524], - "tx_hash": [ - "0x2189c2994dcffcd40cc92245e216b0fda42e0f30573ce4b131341e8ac776ed75", - "0x8328fa642f47adb61f751363cf718d707dafcdc258898fa953945afd42aa020f", - None, - ], - "solver": [ - "0xb20b86c4e6deeb432a22d773a221898bbbd03036", - "0x55a37a2e5e5973510ac9d9c723aec213fa161919", - "0x55a37a2e5e5973510ac9d9c723aec213fa161919", - ], - "execution_cost": [ - "5417013431615490", - "14681404168612460", - "0", - ], - "surplus": [ - "5867838023808109", - "104011002982952097", - "0", - ], - "fee": [ - "7751978767036064", - "10350680045815651", - "0", - ], - "uncapped_payment_eth": [ - "7232682540629268", - "82825156151734420", - "-3527106002507021", - ], - "capped_payment": [ - "7232682540629268", - "24681404168612460", - "-3527106002507021", - ], - "winning_score": [ - "6537976145828389", - "95640781782532198", - "3527282436747751", - ], - "reference_score": [ - "6387134250214905", - "31536526877033328", - "3527106002507021", - ], - "participating_solvers": [ - [ - "0x398890be7c4fac5d766e1aeffde44b2ee99f38ef", - "0xb20b86c4e6deeb432a22d773a221898bbbd03036", - ], - [ - "0x55a37a2e5e5973510ac9d9c723aec213fa161919", - "0x97ec0a17432d71a3234ef7173c6b48a2c0940896", - "0xa21740833858985e4d801533a808786d3647fb83", - "0xb20b86c4e6deeb432a22d773a221898bbbd03036", - "0xbff9a1b539516f9e20c7b621163e676949959a66", - "0xc9ec550bea1c64d779124b23a26292cc223327b6", - "0xda869be4adea17ad39e1dfece1bc92c02491504f", - ], - [ - "0x149d0f9282333681ee41d30589824b2798e9fb47", - "0x3cee8c7d9b5c8f225a8c36e7d3514e1860309651", - "0x55a37a2e5e5973510ac9d9c723aec213fa161919", - "0x7a0a8890d71a4834285efdc1d18bb3828e765c6a", - "0x97ec0a17432d71a3234ef7173c6b48a2c0940896", - "0xa21740833858985e4d801533a808786d3647fb83", - "0xb20b86c4e6deeb432a22d773a221898bbbd03036", - "0xbff9a1b539516f9e20c7b621163e676949959a66", - "0xc9ec550bea1c64d779124b23a26292cc223327b6", - "0xda869be4adea17ad39e1dfece1bc92c02491504f", - "0xe9ae2d792f981c53ea7f6493a17abf5b2a45a86b", - ], - ], - }, - ) - self.assertIsNone(pd.testing.assert_frame_equal(expected, rewards_df)) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/integration/test_warehouse_fetcher.py b/tests/integration/test_warehouse_fetcher.py deleted file mode 100644 index a832e2d5..00000000 --- a/tests/integration/test_warehouse_fetcher.py +++ /dev/null @@ -1,42 +0,0 @@ -import os -import unittest - -import pandas as pd -from dotenv import load_dotenv - -from src.fetch.postgres import PostgresFetcher -from src.models.block_range import BlockRange - - -class TestPostgresWarehouseFetching(unittest.TestCase): - def setUp(self) -> None: - load_dotenv() - # TODO - deploy test DB and populate with some records... - self.fetcher = PostgresFetcher(db_url=os.environ["WAREHOUSE_URL"]) - - def test_latest_block_reasonable(self): - self.assertGreater(self.fetcher.get_latest_block(), 17273090) - - def test_get_imbalances(self): - imbalance_df = self.fetcher.get_internal_imbalances( - BlockRange(17236982, 17236983) - ) - expected = pd.DataFrame( - { - "block_number": pd.Series([17236983, 17236983], dtype="int64"), - "tx_hash": [ - "0x9dc611149c7d6a936554b4be0e4fde455c015a9d5c81650af1433df5e904c791", - "0x9dc611149c7d6a936554b4be0e4fde455c015a9d5c81650af1433df5e904c791", - ], - "token": [ - "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", - "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", - ], - "amount": ["-1438513324", "789652004205719637"], - }, - ) - self.assertIsNone(pd.testing.assert_frame_equal(expected, imbalance_df)) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/unit/test_batch_rewards_schema.py b/tests/unit/test_batch_rewards_schema.py deleted file mode 100644 index cfdaaca1..00000000 --- a/tests/unit/test_batch_rewards_schema.py +++ /dev/null @@ -1,101 +0,0 @@ -import unittest - -import pandas -import pandas as pd - -from src.models.batch_rewards_schema import BatchRewards - -ONE_ETH = 1000000000000000000 - - -class TestModelBatchRewards(unittest.TestCase): - def test_order_rewards_transformation(self): - max_uint = 115792089237316195423570985008687907853269984665640564039457584007913129639936 - sample_df = pd.DataFrame( - { - "block_number": pd.Series([123, pandas.NA], dtype="Int64"), - "block_deadline": [789, 1011], - "tx_hash": [ - "0x71", - None, - ], - "solver": [ - "0x51", - "0x52", - ], - "execution_cost": [9999 * ONE_ETH, 1], - "surplus": [2 * ONE_ETH, 3 * ONE_ETH], - "fee": [ - 1000000000000000, - max_uint, - ], - "uncapped_payment_eth": [0, -10 * ONE_ETH], - "capped_payment": [-1000000000000000, -1000000000000000], - "winning_score": [123456 * ONE_ETH, 6789 * ONE_ETH], - "reference_score": [ONE_ETH, 2 * ONE_ETH], - "participating_solvers": [ - [ - "0x51", - "0x52", - "0x53", - ], - [ - "0x51", - "0x52", - "0x53", - "0x54", - "0x55", - "0x56", - ], - ], - } - ) - - self.assertEqual( - [ - { - "block_deadline": 789, - "block_number": 123, - "data": { - "capped_payment": -1000000000000000, - "execution_cost": 9999000000000000000000, - "fee": 1000000000000000, - "participating_solvers": ["0x51", "0x52", "0x53"], - "reference_score": 1000000000000000000, - "surplus": 2000000000000000000, - "uncapped_payment_eth": 0, - "winning_score": 123456000000000000000000, - }, - "solver": "0x51", - "tx_hash": "0x71", - }, - { - "block_deadline": 1011, - "block_number": None, - "data": { - "capped_payment": -1000000000000000, - "execution_cost": 1, - "fee": max_uint, - "participating_solvers": [ - "0x51", - "0x52", - "0x53", - "0x54", - "0x55", - "0x56", - ], - "reference_score": 2000000000000000000, - "surplus": 3000000000000000000, - "uncapped_payment_eth": -10000000000000000000, - "winning_score": 6789000000000000000000, - }, - "solver": "0x52", - "tx_hash": None, - }, - ], - BatchRewards.from_pdf_to_dune_records(sample_df), - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_ipfs.py b/tests/unit/test_ipfs.py deleted file mode 100644 index e26e2279..00000000 --- a/tests/unit/test_ipfs.py +++ /dev/null @@ -1,215 +0,0 @@ -import os -import unittest -from unittest import IsolatedAsyncioTestCase - -from dotenv import load_dotenv - -from src.fetch.ipfs import Cid - -load_dotenv() -ACCESS_KEY = os.environ["IPFS_ACCESS_KEY"] - - -class TestIPFS(unittest.TestCase): - def test_cid_parsing(self): - self.assertEqual( - "bafybeib5q5w6r7gxbfutjhes24y65mcif7ugm7hmub2vsk4hqueb2yylti", - str( - Cid.old_schema( - "0x3d876de8fcd70969349c92d731eeb0482fe8667ceca075592b8785081d630b9a" - ) - ), - ) - self.assertEqual( - "bafybeia747cvkwz7tqkp67da3ehrl4nfwena3jnr5cvainmcugzocbmnbq", - str( - Cid.old_schema( - "0x1FE7C5555B3F9C14FF7C60D90F15F1A5B11A0DA5B1E8AA043582A1B2E1058D0C" - ) - ), - ) - - def test_new_hash_to_cid(self): - self.assertEqual( - "bafkrwiek6tumtfzvo6yivqq5c7jtdkw6q3ar5pgfcjdujvrbzkbwl3eueq", - str( - Cid( - "0x8af4e8c9973577b08ac21d17d331aade86c11ebcc5124744d621ca8365ec9424" - ) - ), - ) - - def test_cid_constructor(self): - # works with or without 0x prefix: - hex_str = "0x3d876de8fcd70969349c92d731eeb0482fe8667ceca075592b8785081d630b9a" - self.assertEqual(Cid(hex_str), Cid(hex_str[2:])) - self.assertEqual(hex_str, Cid(hex_str).hex) - - def test_no_content(self): - null_cid = Cid( - "0000000000000000000000000000000000000000000000000000000000000000" - ) - - self.assertEqual(None, null_cid.get_content(ACCESS_KEY, max_retries=10)) - - def test_get_content(self): - self.assertEqual( - { - "version": "0.1.0", - "appCode": "CowSwap", - "metadata": { - "referrer": { - "version": "0.1.0", - "address": "0x424a46612794dbb8000194937834250Dc723fFa5", - } - }, - }, - Cid.old_schema( - "3d876de8fcd70969349c92d731eeb0482fe8667ceca075592b8785081d630b9a" - ).get_content(ACCESS_KEY, max_retries=10), - ) - - self.assertEqual( - { - "version": "1.0.0", - "appCode": "CowSwap", - "metadata": { - "referrer": { - "kind": "referrer", - "referrer": "0x8c35B7eE520277D14af5F6098835A584C337311b", - "version": "1.0.0", - } - }, - }, - Cid.old_schema( - "1FE7C5555B3F9C14FF7C60D90F15F1A5B11A0DA5B1E8AA043582A1B2E1058D0C" - ).get_content(ACCESS_KEY), - ) - - -class TestAsyncIPFS(IsolatedAsyncioTestCase): - async def test_fetch_many(self): - x = [ - { - "app_hash": "0xa0029a1376a317ea4af7d64c1a15cb02c5b1f33c72645cc8612a8d302a6e2ac8", - "first_seen_block": 13897827, - }, - { - "app_hash": "0xd8e0e541ebb486bfb4fbfeeaf097e56487b5b3ea7190bd1b286693ac02954ecd", - "first_seen_block": 15428678, - }, - { - "app_hash": "0xd93cc4feb701b8b7c8013c33be82f05d28f04c127116eb3764d36881e5cd7d07", - "first_seen_block": 13602894, - }, - { - "app_hash": "0x2c699c8c8b379a0ab6e6b1bc252dd2539b59b50056da1c62b2bcaf4f706d1e81", - "first_seen_block": 13643476, - }, - { - "app_hash": "0xe4096788536b002e3d0af9e3a4ac44cbd5456cbd3a88d96f02c6b23be319fc6b", - "first_seen_block": 15430776, - }, - { - "app_hash": "0xd7d476c0682b033fc4025f69dfb5967afe5ea96be872b1f6a740bbdc7dd97b25", - "first_seen_block": 13671622, - }, - { - "app_hash": "0x37a52cce8b0b5f4b2047b8d34992d27755c9af4341290d38e4cf9278e3b8fcc9", - "first_seen_block": 15104812, - }, - { - "app_hash": "0x6faca2b31493acf30239268b08ef6fa9a54ff093018c5c53a0847eb13ad8181f", - "first_seen_block": 15557705, - }, - { - "app_hash": "0x476e33b8975dd54ada4a99d52c9ea4b3d41bd50763377f6e078c60631d8bc02a", - "first_seen_block": 13783477, - }, - { - "app_hash": "0xd4f33cd977ef095b38aabb1ee7a2f6f69fabb4022601fdc29c9fc78c451f4e12", - "first_seen_block": 13824523, - }, - { - "app_hash": "0x8b98eaf7082a14b3201011c23a39b23706d880c1269e76c154675230daf6af8d", - "first_seen_block": 13709795, - }, - { - "app_hash": "0x5f3b188668cc36ab896f348913668550749c7b7f38304f45b9449cb5bea034b6", - "first_seen_block": 13608189, - }, - { - "app_hash": "0xec3143ebecec4ac0f5471ce611f8220bd0abe5509c29216c6837be29fe573830", - "first_seen_block": 13906769, - }, - { - "app_hash": "0x4c4d75807c15451613cead7de87e465fe9368708644fbad1e04bc442ee015466", - "first_seen_block": 13916779, - }, - { - "app_hash": "0x8ee92d0defae33a7471eb0e4abac92c7be3d6811f256ecd5d439609086a9e353", - "first_seen_block": 15684132, - }, - { - "app_hash": "0x5cf63a36847e6f6c98c8eb74d03cfbad8e234d7388637ae5ca53be9dd90eccca", - "first_seen_block": 15296113, - }, - { - "app_hash": "0x517af09fe972bd26beb08cd951783b64a494ed22b87e088754156f9fbc9b993f", - "first_seen_block": 13745623, - }, - { - "app_hash": "0x385802ecdb00e564d36aed7410454a64ecb3cfcb84792bad1ead79d1447ab090", - "first_seen_block": 13967575, - }, - { - "app_hash": "0x809877e2e366166a0bcbce17b759b90d2d74af0d830a0441a44ea246876ccec0", - "first_seen_block": 13938253, - }, - { - "app_hash": "0xe81711d4f01afa98128ec73441fbda767db1cf84b8c4c1bfb4243a5dddca1155", - "first_seen_block": 13944768, - }, - { - "app_hash": "0x1ec8fdb53a2697ab15bc4228c9e4d3125c955470b618fcb1659175d0f6e6c991", - "first_seen_block": 14624083, - }, - { - "app_hash": "0xa15b953113490d318baa1e35f1c9c33db3a6f400ce58539db287456b17bf3e58", - "first_seen_block": 13886533, - }, - { - "app_hash": "0x039ffb0546992a50e4eeb70b2b6bb04c0a9a82dcad586b1c4f971d6c66109894", - "first_seen_block": 13607684, - }, - { - "app_hash": "0xc56523f1422d97dcd77ab7471f3de724ac223d5a51c3af832c98a25e8ad30ab3", - "first_seen_block": 13664176, - }, - { - "app_hash": "0x690483cfb64d0a175e4cb85c313e8da3e1826966acefc6697026176adacc9b19", - "first_seen_block": 14699411, - }, - { - "app_hash": "0xe86fbb850981b11ab388e077c9a5a6da6cfe386c55f78709bc6549e5b806fc08", - "first_seen_block": 13905438, - }, - { - "app_hash": "0xba00f0857b72309aba6a395694d999e573d6b183cf5a605341b0f3ddfba333de", - "first_seen_block": 13779206, - }, - { - "app_hash": "0xdf93796ffbd982e40c53ea10517cc90eb1355eb3f843f0dafa441165a337557f", - "first_seen_block": 13616326, - }, - { - "app_hash": "0x12f79d7c232ee2ef20b7aaefaf677a69d00d179d003cf5257d557d9154427f0f", - "first_seen_block": 13971658, - }, - ] - results = await Cid.fetch_many(x, ACCESS_KEY) - print(results) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_order_rewards_schema.py b/tests/unit/test_order_rewards_schema.py deleted file mode 100644 index e8e479b7..00000000 --- a/tests/unit/test_order_rewards_schema.py +++ /dev/null @@ -1,59 +0,0 @@ -import unittest - -import pandas as pd - -from src.models.order_rewards_schema import OrderRewards - - -class TestModelOrderRewards(unittest.TestCase): - def test_order_rewards_transformation(self): - rewards_df = pd.DataFrame( - { - "block_number": [1, 2, 3], - "order_uid": ["0x01", "0x02", "0x03"], - "solver": ["0x51", "0x52", "0x53"], - "tx_hash": ["0x71", "0x72", "0x73"], - "surplus_fee": [12345678910111213, 0, 0], - "amount": [40.70410, 39.00522, 0], - } - ) - - self.assertEqual( - [ - { - "block_number": 1, - "order_uid": "0x01", - "solver": "0x51", - "tx_hash": "0x71", - "data": { - "surplus_fee": "12345678910111213", - "amount": 40.70410, - }, - }, - { - "block_number": 2, - "order_uid": "0x02", - "solver": "0x52", - "tx_hash": "0x72", - "data": { - "surplus_fee": "0", - "amount": 39.00522, - }, - }, - { - "block_number": 3, - "order_uid": "0x03", - "solver": "0x53", - "tx_hash": "0x73", - "data": { - "surplus_fee": "0", - "amount": 0.0, - }, - }, - ], - OrderRewards.from_pdf_to_dune_records(rewards_df), - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/test_token_imbalance_schema.py b/tests/unit/test_token_imbalance_schema.py deleted file mode 100644 index 136e3c37..00000000 --- a/tests/unit/test_token_imbalance_schema.py +++ /dev/null @@ -1,46 +0,0 @@ -import unittest - -import pandas as pd - -from src.models.token_imbalance_schema import TokenImbalance - - -class TestModelTokenImbalance(unittest.TestCase): - def test_token_imbalance_schema(self): - max_uint = 115792089237316195423570985008687907853269984665640564039457584007913129639936 - sample_df = pd.DataFrame( - { - "block_number": pd.Series([123, 456], dtype="int64"), - "tx_hash": [ - "0x71", - "0x72", - ], - "token": [ - "0xa0", - "0xa1", - ], - "amount": [-9999, max_uint], - } - ) - - self.assertEqual( - [ - { - "amount": "-9999", - "block_number": 123, - "token": "0xa0", - "tx_hash": "0x71", - }, - { - "amount": "115792089237316195423570985008687907853269984665640564039457584007913129639936", - "block_number": 456, - "token": "0xa1", - "tx_hash": "0x72", - }, - ], - TokenImbalance.from_pdf_to_dune_records(sample_df), - ) - - -if __name__ == "__main__": - unittest.main()