Skip to content

Commit

Permalink
feat: add support for all networks from archive
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Dec 29, 2023
1 parent 46b1c6c commit 23ad0ad
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 115 deletions.
30 changes: 23 additions & 7 deletions ape_subsquid/archive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Literal, Optional, TypedDict, Union

from requests import Session
from requests import Response, Session
from requests.exceptions import HTTPError

from ape_subsquid.exceptions import ApeSubsquidError, DataIsNotAvailable, NotReadyToServeError

TraceType = Union[Literal["create"], Literal["call"], Literal["reward"], Literal["suicide"]]

Expand Down Expand Up @@ -229,14 +232,27 @@ class Block(TypedDict, total=False):
class Archive:
_session = Session()

def get_worker(self, start_block: int) -> str:
url = f"https://v2.archive.subsquid.io/network/ethereum-mainnet/{start_block}/worker"
def get_worker(self, network: str, start_block: int) -> str:
url = f"https://v2.archive.subsquid.io/network/{network}/{start_block}/worker"
response = self._session.get(url)
response.raise_for_status()
self._check_response(response)
return response.text

def query(self, query: Query) -> list[Block]:
worker_url = self.get_worker(query["fromBlock"])
def query(self, network: str, query: Query) -> list[Block]:
worker_url = self.get_worker(network, query["fromBlock"])
response = self._session.post(worker_url, json=query)
response.raise_for_status()
self._check_response(response)
return response.json()

def _check_response(self, response: Response):
try:
response.raise_for_status()
except HTTPError:
text = response.text

if "not ready to serve block" in text:
raise NotReadyToServeError(text)
elif "is not available" in text:
raise DataIsNotAvailable(text)
else:
raise ApeSubsquidError(text)
20 changes: 20 additions & 0 deletions ape_subsquid/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from ape.exceptions import ApeException


class ApeSubsquidError(ApeException):
"""
A base exception in the ape-subsquid plugin.
"""


class NotReadyToServeError(ApeSubsquidError):
"""
Raised when archive isn't ready to serve a specific block
or a network isn't supported.
"""


class DataIsNotAvailable(ApeSubsquidError):
"""
Raised when a specific worker has no requested data.
"""
82 changes: 82 additions & 0 deletions ape_subsquid/mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from hexbytes import HexBytes

from ape_subsquid.archive import BlockHeader, Log, Transaction


def hex_to_int(value: str):
return int(value, 16)


def map_header(value: BlockHeader, transactions: list[Transaction]) -> dict:
return {
"number": value["number"],
"hash": HexBytes(value["hash"]),
"parentHash": HexBytes(value["parentHash"]),
"baseFeePerGas": value["baseFeePerGas"] and hex_to_int(value["baseFeePerGas"]),
"difficulty": hex_to_int(value["difficulty"]),
"totalDifficulty": hex_to_int(value["totalDifficulty"]),
"extraData": HexBytes(value["extraData"]),
"gasLimit": hex_to_int(value["gasLimit"]),
"gasUsed": hex_to_int(value["gasUsed"]),
"logsBloom": HexBytes(value["logsBloom"]),
"miner": value["miner"],
"mixHash": HexBytes(value["mixHash"]),
"nonce": HexBytes(value["nonce"]),
"receiptsRoot": HexBytes(value["receiptsRoot"]),
"sha3Uncles": HexBytes(value["sha3Uncles"]),
"size": value["size"],
"stateRoot": HexBytes(value["stateRoot"]),
"timestamp": int(value["timestamp"]),
"transactionsRoot": HexBytes(value["transactionsRoot"]),
"transactions": transactions,
}


def map_receipt(
value: Transaction,
block_number: int,
block_hash: HexBytes,
logs: list[dict],
) -> dict:
return {
"blockNumber": block_number,
"blockHash": block_hash,
"from": value["from"],
"to": value["to"],
"hash": HexBytes(value["hash"]),
"status": value["status"],
"chainId": value["chainId"],
"contractAddress": value["contractAddress"],
"cumulativeGasUsed": hex_to_int(value["cumulativeGasUsed"]),
"effectiveGasPrice": hex_to_int(value["effectiveGasPrice"]),
"gas": hex_to_int(value["gas"]),
"gasPrice": hex_to_int(value["gasPrice"]),
"gasUsed": hex_to_int(value["gasUsed"]),
"input": HexBytes(value["input"]),
"maxFeePerGas": value["maxFeePerGas"] and hex_to_int(value["maxFeePerGas"]),
"maxPriorityFeePerGas": value["maxPriorityFeePerGas"]
and hex_to_int(value["maxPriorityFeePerGas"]),
"nonce": value["nonce"],
"v": hex_to_int(value["v"]),
"r": HexBytes(value["r"]),
"s": HexBytes(value["s"]),
"transactionIndex": value["transactionIndex"],
"type": value["type"],
"value": hex_to_int(value["value"]),
"yParity": value["yParity"],
"transactionHash": HexBytes(value["hash"]),
"logs": logs,
}


def map_log(value: Log, block_number: int, block_hash: HexBytes) -> dict:
return {
"blockNumber": block_number,
"blockHash": block_hash,
"address": value["address"],
"transactionIndex": value["transactionIndex"],
"transactionHash": HexBytes(value["transactionHash"]),
"logIndex": value["logIndex"],
"data": HexBytes(value["data"]),
"topics": [HexBytes(topic) for topic in value["topics"]],
}
146 changes: 38 additions & 108 deletions ape_subsquid/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,114 +18,11 @@
Archive,
Block,
BlockFieldSelection,
BlockHeader,
Log,
LogFieldSelection,
Query,
Transaction,
TxFieldSelection,
)

T = TypeVar("T")


def all_fields(cls: Type[T]) -> T:
fields = cls.__annotations__
for field in fields:
fields[field] = True
return cast(T, fields)


def hex_to_int(value: str):
return int(value, 16)


def map_header(value: BlockHeader, transactions: list[Transaction]) -> dict:
return {
"number": value["number"],
"hash": HexBytes(value["hash"]),
"parentHash": HexBytes(value["parentHash"]),
"baseFeePerGas": value["baseFeePerGas"] and hex_to_int(value["baseFeePerGas"]),
"difficulty": hex_to_int(value["difficulty"]),
"totalDifficulty": hex_to_int(value["totalDifficulty"]),
"extraData": HexBytes(value["extraData"]),
"gasLimit": hex_to_int(value["gasLimit"]),
"gasUsed": hex_to_int(value["gasUsed"]),
"logsBloom": HexBytes(value["logsBloom"]),
"miner": value["miner"],
"mixHash": HexBytes(value["mixHash"]),
"nonce": HexBytes(value["nonce"]),
"receiptsRoot": HexBytes(value["receiptsRoot"]),
"sha3Uncles": HexBytes(value["sha3Uncles"]),
"size": value["size"],
"stateRoot": HexBytes(value["stateRoot"]),
"timestamp": int(value["timestamp"]),
"transactionsRoot": HexBytes(value["transactionsRoot"]),
"transactions": transactions,
}


def map_receipt(
value: Transaction,
block_number: int,
block_hash: HexBytes,
logs: list[dict],
) -> dict:
return {
"blockNumber": block_number,
"blockHash": block_hash,
"from": value["from"],
"to": value["to"],
"hash": HexBytes(value["hash"]),
"status": value["status"],
"chainId": value["chainId"],
"contractAddress": value["contractAddress"],
"cumulativeGasUsed": hex_to_int(value["cumulativeGasUsed"]),
"effectiveGasPrice": hex_to_int(value["effectiveGasPrice"]),
"gas": hex_to_int(value["gas"]),
"gasPrice": hex_to_int(value["gasPrice"]),
"gasUsed": hex_to_int(value["gasUsed"]),
"input": HexBytes(value["input"]),
"maxFeePerGas": value["maxFeePerGas"] and hex_to_int(value["maxFeePerGas"]),
"maxPriorityFeePerGas": value["maxPriorityFeePerGas"]
and hex_to_int(value["maxPriorityFeePerGas"]),
"nonce": value["nonce"],
"v": hex_to_int(value["v"]),
"r": HexBytes(value["r"]),
"s": HexBytes(value["s"]),
"transactionIndex": value["transactionIndex"],
"type": value["type"],
"value": hex_to_int(value["value"]),
"yParity": value["yParity"],
"transactionHash": HexBytes(value["hash"]),
"logs": logs,
}


def map_log(value: Log, block_number: int, block_hash: HexBytes) -> dict:
return {
"blockNumber": block_number,
"blockHash": block_hash,
"address": value["address"],
"transactionIndex": value["transactionIndex"],
"transactionHash": HexBytes(value["transactionHash"]),
"logIndex": value["logIndex"],
"data": HexBytes(value["data"]),
"topics": [HexBytes(topic) for topic in value["topics"]],
}


def archive_ingest(archive: Archive, query: Query) -> Iterator[list[Block]]:
while True:
data = archive.query(query)
yield data

last_block = data[-1]
if "toBlock" in query:
if last_block["header"]["number"] == query["toBlock"]:
break

query["fromBlock"] = last_block["header"]["number"] + 1
from ape_subsquid.mappings import map_header, map_log, map_receipt


class SubsquidQueryEngine(QueryAPI):
Expand Down Expand Up @@ -159,6 +56,7 @@ def perform_query(self, query: QueryType) -> Iterator: # type: ignore[override]

@perform_query.register
def perform_block_query(self, query: BlockQuery) -> Iterator[BlockAPI]:
network = get_network(self)
q: Query = {
"fromBlock": query.start_block,
"toBlock": query.stop_block,
Expand All @@ -167,7 +65,7 @@ def perform_block_query(self, query: BlockQuery) -> Iterator[BlockAPI]:
"transactions": [{}],
}

for data in archive_ingest(self._archive, q):
for data in archive_ingest(self._archive, network, q):
for block in data:
header_data = map_header(block["header"], block["transactions"])
yield self.provider.network.ecosystem.decode_block(header_data)
Expand All @@ -176,6 +74,7 @@ def perform_block_query(self, query: BlockQuery) -> Iterator[BlockAPI]:
def perform_account_transaction_query(
self, query: AccountTransactionQuery
) -> Iterator[ReceiptAPI]:
network = get_network(self)
q: Query = {
"fromBlock": 0,
"fields": {
Expand All @@ -192,7 +91,7 @@ def perform_account_transaction_query(
],
}

for data in archive_ingest(self._archive, q):
for data in archive_ingest(self._archive, network, q):
for block in data:
for tx in block["transactions"]:
assert tx["nonce"] >= query.start_nonce
Expand All @@ -214,6 +113,7 @@ def perform_account_transaction_query(

@perform_query.register
def perform_contract_creation_query(self, query: ContractCreationQuery) -> Iterator[ReceiptAPI]:
network = get_network(self)
q: Query = {
"fromBlock": query.start_block,
"toBlock": query.stop_block,
Expand All @@ -228,7 +128,7 @@ def perform_contract_creation_query(self, query: ContractCreationQuery) -> Itera
"traces": [{"type": ["create"], "transaction": True, "transactionLogs": True}],
}

for data in archive_ingest(self._archive, q):
for data in archive_ingest(self._archive, network, q):
for block in data:
for trace in block["traces"]:
if "result" in trace:
Expand All @@ -252,6 +152,7 @@ def perform_contract_creation_query(self, query: ContractCreationQuery) -> Itera

@perform_query.register
def perform_contract_event_query(self, query: ContractEventQuery) -> Iterator[ContractLog]:
network = get_network(self)
if isinstance(query.contract, list):
address = [address.lower() for address in query.contract]
else:
Expand All @@ -264,10 +165,39 @@ def perform_contract_event_query(self, query: ContractEventQuery) -> Iterator[Co
"logs": [{"address": address}],
}

for data in archive_ingest(self._archive, q):
for data in archive_ingest(self._archive, network, q):
for block in data:
block_number = block["header"]["number"]
block_hash = HexBytes(block["header"]["hash"])
logs = [map_log(log, block_number, block_hash) for log in block["logs"]]

yield from self.provider.network.ecosystem.decode_logs(logs, query.event)


T = TypeVar("T")


def all_fields(cls: Type[T]) -> T:
fields = cls.__annotations__
for field in fields:
fields[field] = True
return cast(T, fields)


def archive_ingest(archive: Archive, network: str, query: Query) -> Iterator[list[Block]]:
while True:
data = archive.query(network, query)
yield data

last_block = data[-1]
if "toBlock" in query:
if last_block["header"]["number"] == query["toBlock"]:
break

query["fromBlock"] = last_block["header"]["number"] + 1


def get_network(engine: QueryAPI) -> str:
ecosystem_name = engine.network_manager.ecosystem.name
network_name = engine.network_manager.network.name
return f"{ecosystem_name}-{network_name}"

0 comments on commit 23ad0ad

Please sign in to comment.