Skip to content

Feature: Parallel message processing #796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c3d49ff
feat: get_next_pending_messages_by_address model accessors
1yam May 21, 2025
d555cf8
feat: process message in parallel based on address
1yam May 21, 2025
2987b94
fix: mypy error type
1yam May 21, 2025
c6df15c
Merge branch 'main' into 1yam-message-parallel
1yam May 21, 2025
5f7de9c
fix: freeze on test
1yam May 21, 2025
6cd74b8
fix: black formating (space)
1yam May 21, 2025
9237aea
fix: more debug info around processing message
1yam May 22, 2025
8f73e21
fix: async engine init
1yam May 26, 2025
5359014
Feat: async version of get_next_pending_messages_by_address
1yam May 26, 2025
42b3469
feat: add async session factory to PendingMessageProcessor
1yam May 26, 2025
ab9632f
Fix: use async_get_next_pending_messages_by_address and async_session…
1yam May 26, 2025
b3029b7
Refactor: simplify batch processing and single message
1yam May 26, 2025
d035421
feat: Cancel message batch if RetryMessageException is found durring …
1yam May 26, 2025
55e2722
fix: this parts not needed anymore
1yam May 26, 2025
a935a22
fix: linting issue (import not sorted)
1yam May 26, 2025
a6aafe4
Refactor: Usage of AsyncDB instead of sync
1yam May 30, 2025
ccbc549
Fix: Unit Test should now use AsynchronousDB
1yam May 30, 2025
29a2458
fix: fmt
1yam May 30, 2025
fb724d8
Refactor: db accessors aggregates should use AsyncDbSessions
1yam May 30, 2025
b1fe4cc
Feat: Message parallelization
1yam May 30, 2025
a625580
fix: db request for disctinct sender
1yam May 30, 2025
930fd68
fix: wip process_pending_messages
1yam May 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import aleph.config
from aleph.chains.signature_verifier import SignatureVerifier
from aleph.db.connection import make_engine, make_session_factory
from aleph.db.connection import make_async_engine, make_async_session_factory
from aleph.services.cache.node_cache import NodeCache
from aleph.services.ipfs import IpfsService
from aleph.services.p2p import init_p2p_client
Expand All @@ -34,12 +34,12 @@ async def configure_aiohttp_app(
with sentry_sdk.start_transaction(name="init-api-server"):
p2p_client = await init_p2p_client(config, service_name="api-server-aiohttp")

engine = make_engine(
engine = make_async_engine(
config,
echo=config.logging.level.value == logging.DEBUG,
application_name="aleph-api",
)
session_factory = make_session_factory(engine)
session_factory = make_async_session_factory(engine)

node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/bsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from aleph.chains.chain_data_service import PendingTxPublisher
from aleph.chains.indexer_reader import AlephIndexerReader
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory


class BscConnector(ChainReader):
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
):
self.indexer_reader = AlephIndexerReader(
Expand Down
26 changes: 13 additions & 13 deletions src/aleph/chains/chain_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@
from aleph.storage import StorageService
from aleph.toolkit.timestamp import utc_now
from aleph.types.chain_sync import ChainSyncProtocol
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.db_session import AsyncDbSession, AsyncDbSessionFactory
from aleph.types.files import FileType
from aleph.utils import get_sha256


class ChainDataService:
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
storage_service: StorageService,
):
self.session_factory = session_factory
self.storage_service = storage_service

async def prepare_sync_event_payload(
self, session: DbSession, messages: List[MessageDb]
self, session: AsyncDbSession, messages: List[MessageDb]
) -> OffChainSyncEventPayload:
"""
Returns the payload of a sync event to be published on chain.
Expand Down Expand Up @@ -129,22 +129,22 @@ async def _get_tx_messages_off_chain_protocol(
LOGGER.info("Got bulk data with %d items" % len(messages))
if config.ipfs.enabled.value:
try:
with self.session_factory() as session:
async with self.session_factory() as session:
# Some chain data files are duplicated, and can be treated in parallel,
# hence the upsert.
upsert_file(
await upsert_file(
session=session,
file_hash=sync_file_content.hash,
file_type=FileType.FILE,
size=len(sync_file_content.raw_value),
)
upsert_tx_file_pin(
await upsert_tx_file_pin(
session=session,
file_hash=file_hash,
tx_hash=tx.hash,
created=utc_now(),
)
session.commit()
await session.commit()

# Some IPFS fetches can take a while, hence the large timeout.
await asyncio.wait_for(
Expand Down Expand Up @@ -246,17 +246,17 @@ def __init__(self, pending_tx_exchange: aio_pika.abc.AbstractExchange):
self.pending_tx_exchange = pending_tx_exchange

@staticmethod
def add_pending_tx(session: DbSession, tx: ChainTxDb):
upsert_chain_tx(session=session, tx=tx)
upsert_pending_tx(session=session, tx_hash=tx.hash)
async def add_pending_tx(session: AsyncDbSession, tx: ChainTxDb):
await upsert_chain_tx(session=session, tx=tx)
await upsert_pending_tx(session=session, tx_hash=tx.hash)

async def publish_pending_tx(self, tx: ChainTxDb):
message = aio_pika.Message(body=tx.hash.encode("utf-8"))
await self.pending_tx_exchange.publish(
message=message, routing_key=f"{tx.chain.value}.{tx.publisher}.{tx.hash}"
)

async def add_and_publish_pending_tx(self, session: DbSession, tx: ChainTxDb):
async def add_and_publish_pending_tx(self, session: AsyncDbSession, tx: ChainTxDb):
"""
Add an event published on one of the supported chains.
Adds the tx to the database, creates a pending tx entry in the pending tx table
Expand All @@ -265,8 +265,8 @@ async def add_and_publish_pending_tx(self, session: DbSession, tx: ChainTxDb):
Note that this function commits changes to the database for consistency
between the DB and the message queue.
"""
self.add_pending_tx(session=session, tx=tx)
session.commit()
await self.add_pending_tx(session=session, tx=tx)
await session.commit()
await self.publish_pending_tx(tx)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from aleph_message.models import Chain
from configmanager import Config

from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory

from .abc import ChainReader, ChainWriter
from .bsc import BscConnector
Expand All @@ -29,7 +29,7 @@ class ChainConnector:

def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
Expand Down
28 changes: 14 additions & 14 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from aleph.schemas.chains.tx_context import TxContext
from aleph.toolkit.timestamp import utc_now
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory
from aleph.utils import run_in_executor

from .abc import ChainWriter
Expand Down Expand Up @@ -77,7 +77,7 @@ class EthereumVerifier(EVMVerifier):
class EthereumConnector(ChainWriter):
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
Expand All @@ -93,8 +93,8 @@ def __init__(

async def get_last_height(self, sync_type: ChainEventType) -> int:
"""Returns the last height for which we already have the ethereum data."""
with self.session_factory() as session:
last_height = get_last_height(
async with self.session_factory() as session:
last_height = await get_last_height(
session=session, chain=Chain.ETH, sync_type=sync_type
)

Expand Down Expand Up @@ -209,15 +209,15 @@ async def _request_transactions(
# block height to do next requests from there.
last_height = event_data.blockNumber
if last_height:
with self.session_factory() as session:
upsert_chain_sync_status(
async with self.session_factory() as session:
await upsert_chain_sync_status(
session=session,
chain=Chain.ETH,
sync_type=ChainEventType.SYNC,
height=last_height,
update_datetime=utc_now(),
)
session.commit()
await session.commit()

async def fetch_ethereum_sync_events(self, config: Config):
last_stored_height = await self.get_last_height(sync_type=ChainEventType.SYNC)
Expand All @@ -236,11 +236,11 @@ async def fetch_ethereum_sync_events(self, config: Config):
config, web3, contract, abi, last_stored_height
):
tx = ChainTxDb.from_sync_tx_context(tx_context=context, tx_data=jdata)
with self.session_factory() as session:
async with self.session_factory() as session:
await self.pending_tx_publisher.add_and_publish_pending_tx(
session=session, tx=tx
)
session.commit()
await session.commit()

async def fetch_sync_events_task(self, config: Config):
while True:
Expand Down Expand Up @@ -295,10 +295,10 @@ async def packer(self, config: Config):
i = 0
gas_price = web3.eth.generate_gas_price()
while True:
with self.session_factory() as session:
async with self.session_factory() as session:
# Wait for sync operations to complete
if (count_pending_txs(session=session, chain=Chain.ETH)) or (
count_pending_messages(session=session, chain=Chain.ETH)
if (await count_pending_txs(session=session, chain=Chain.ETH)) or (
await count_pending_messages(session=session, chain=Chain.ETH)
) > 1000:
await asyncio.sleep(30)
continue
Expand All @@ -317,7 +317,7 @@ async def packer(self, config: Config):
nonce = web3.eth.get_transaction_count(account.address)

messages = list(
get_unconfirmed_messages(
await get_unconfirmed_messages(
session=session, limit=10000, chain=Chain.ETH
)
)
Expand All @@ -332,7 +332,7 @@ async def packer(self, config: Config):
)
)
# Required to apply update to the files table in get_chaindata
session.commit()
await session.commit()
response = await run_in_executor(
None,
self._broadcast_content,
Expand Down
22 changes: 12 additions & 10 deletions src/aleph/chains/indexer_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from aleph.toolkit.range import MultiRange, Range
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.chain_sync import ChainEventType, ChainSyncProtocol
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.db_session import AsyncDbSession, AsyncDbSessionFactory

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -246,7 +246,7 @@ class AlephIndexerReader:
def __init__(
self,
chain: Chain,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
):
self.chain = chain
Expand All @@ -257,7 +257,7 @@ def __init__(

async def fetch_range(
self,
session: DbSession,
session: AsyncDbSession,
indexer_client: AlephIndexerClient,
chain: Chain,
event_type: ChainEventType,
Expand Down Expand Up @@ -295,7 +295,9 @@ async def fetch_range(
LOGGER.info("%d new txs", len(txs))
# Events are listed in reverse order in the indexer response
for tx in txs:
self.pending_tx_publisher.add_pending_tx(session=session, tx=tx)
await self.pending_tx_publisher.add_pending_tx(
session=session, tx=tx
)

if nb_events_fetched >= limit:
last_event_datetime = txs[-1].datetime
Expand All @@ -320,7 +322,7 @@ async def fetch_range(
str(synced_range),
)

add_indexer_range(
await add_indexer_range(
session=session,
chain=chain,
event_type=event_type,
Expand All @@ -329,7 +331,7 @@ async def fetch_range(

# Committing periodically reduces the size of DB transactions for large numbers
# of events.
session.commit()
await session.commit()

# Now that the txs are committed to the DB, add them to the pending tx message queue
for tx in txs:
Expand All @@ -347,7 +349,7 @@ async def fetch_range(

async def fetch_new_events(
self,
session: DbSession,
session: AsyncDbSession,
indexer_url: str,
smart_contract_address: str,
event_type: ChainEventType,
Expand All @@ -372,7 +374,7 @@ async def fetch_new_events(
]
)

multirange_to_sync = get_missing_indexer_datetime_multirange(
multirange_to_sync = await get_missing_indexer_datetime_multirange(
session=session,
chain=self.chain,
event_type=event_type,
Expand All @@ -399,14 +401,14 @@ async def fetcher(
):
while True:
try:
with self.session_factory() as session:
async with self.session_factory() as session:
await self.fetch_new_events(
session=session,
indexer_url=indexer_url,
smart_contract_address=smart_contract_address,
event_type=event_type,
)
session.commit()
await session.commit()
except Exception:
LOGGER.exception(
"An unexpected exception occurred, "
Expand Down
Loading
Loading