Skip to content

Commit

Permalink
WSS event loop + Kafka topic mapping
Browse files Browse the repository at this point in the history
<!-- ps-id: 420e92a6-0deb-4fbd-92a9-23d535df44d8 -->
  • Loading branch information
WonderBeat committed Aug 20, 2024
1 parent 6be9a2f commit d5f987d
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 21 deletions.
38 changes: 34 additions & 4 deletions blockchainetl/streaming/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
# SOFTWARE.


import asyncio
import logging
import os
from threading import Thread
import time
from typing import Optional

from blockchainetl.streaming.streamer_adapter_stub import StreamerAdapterStub
from blockchainetl.file_utils import smart_open
from ethereumetl.misc.wss_block_listener import WssBlockListener


class Streamer:
Expand All @@ -40,7 +44,9 @@ def __init__(
period_seconds=10,
block_batch_size=10,
retry_errors=True,
pid_file=None):
pid_file=None,
wss_listener: Optional[WssBlockListener]=None
):
self.blockchain_streamer_adapter = blockchain_streamer_adapter
self.last_synced_block_file = last_synced_block_file
self.lag = lag
Expand All @@ -50,6 +56,13 @@ def __init__(
self.block_batch_size = block_batch_size
self.retry_errors = retry_errors
self.pid_file = pid_file
if wss_listener:
self._loop = _get_threaded_loop()
asyncio.ensure_future(
wss_listener.subscribe(),
loop=self._loop)

self.wss_listener=wss_listener

if self.start_block is not None or not os.path.isfile(self.last_synced_block_file):
init_last_synced_block_file((self.start_block or 0) - 1, self.last_synced_block_file)
Expand Down Expand Up @@ -80,10 +93,16 @@ def _do_stream(self):
logging.exception('An exception occurred while syncing block data.')
if not self.retry_errors:
raise e

self.wss_listener
if synced_blocks <= 0:
logging.info('Nothing to sync. Sleeping for {} seconds...'.format(self.period_seconds))
time.sleep(self.period_seconds)
if self.wss_listener:
logging.debug('Waiting for a new block')
asyncio.run_coroutine_threadsafe(
self.wss_listener.wait_for_new_block(),
self._loop).result()
else:
logging.info('Nothing to sync. Sleeping for {} seconds...'.format(self.period_seconds))
time.sleep(self.period_seconds)

def _sync_cycle(self):
current_block = self.blockchain_streamer_adapter.get_current_block_number()
Expand Down Expand Up @@ -137,3 +156,14 @@ def read_last_synced_block(file):
def write_to_file(file, content):
with smart_open(file, 'w') as file_handle:
file_handle.write(content)

def _get_threaded_loop() -> asyncio.AbstractEventLoop:
new_loop = asyncio.new_event_loop()
thread_loop = Thread(target=_start_event_loop, args=(new_loop,), daemon=True)
thread_loop.start()
return new_loop

def _start_event_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
17 changes: 14 additions & 3 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from blockchainetl.streaming.streaming_utils import configure_signals, configure_logging
from ethereumetl.enumeration.entity_type import EntityType

from ethereumetl.misc.wss_block_listener import WssBlockListener
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.streaming.item_exporter_creator import create_item_exporters
from ethereumetl.thread_local_proxy import ThreadLocalProxy
Expand All @@ -37,6 +38,9 @@
@click.option('-p', '--provider-uri', default='https://mainnet.infura.io', show_default=True, type=str,
help='The URI of the web3 provider e.g. '
'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io')
@click.option('-p', '--provider-uri-wss', default='', show_default=False, type=str,
help='The URI of the web3 provider e.g. '
'file://$HOME/Library/Ethereum/geth.ipc or wss://mainnet.infura.io')
@click.option('-o', '--output', type=str,
help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; '
'or Postgres connection url e.g. postgresql+pg8000://postgres:[email protected]:5432/ethereum; '
Expand All @@ -53,7 +57,7 @@
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers')
@click.option('--log-file', default=None, show_default=True, type=str, help='Log file')
@click.option('--pid-file', default=None, show_default=True, type=str, help='pid file')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
def stream(last_synced_block_file, lag, provider_uri, provider_uri_wss, output, start_block, entity_types,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
Expand All @@ -65,6 +69,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit

# TODO: Implement fallback mechanism for provider uris instead of picking randomly
provider_uri = pick_random_provider_uri(provider_uri)
provider_uri_wss = pick_random_provider_uri(provider_uri_wss)
logging.info('Using ' + provider_uri)

streamer_adapter = EthStreamerAdapter(
Expand All @@ -74,14 +79,20 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
max_workers=max_workers,
entity_types=entity_types
)

block_listener = None
if provider_uri_wss:
block_listener = WssBlockListener(provider_uri_wss)

streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
last_synced_block_file=last_synced_block_file,
last_synced_block_file=last_synced_block_file,
lag=lag,
start_block=start_block,
period_seconds=period_seconds,
block_batch_size=block_batch_size,
pid_file=pid_file
pid_file=pid_file,
wss_listener=block_listener
)
streamer.stream()

Expand Down
47 changes: 47 additions & 0 deletions ethereumetl/misc/wss_block_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio
import logging
from websockets import connect


class WssBlockListener:
"""
Listens for new block events via a WebSocket connection.
"""

def __init__(self, wss_uri) -> None:
"""
Initializes the listener with the WebSocket URI.
Args:
wss_uri (str): The WebSocket URI to connect to.
"""
self.logger = logging.getLogger("CompositeItemExporter")
self.wss_uri = wss_uri
self.subscribed = False

async def subscribe(self) -> None:
"""
Subscribes to new block events.
"""
self.new_block_lock = asyncio.Condition()
self.subscribed = True
async with connect(self.wss_uri) as ws:
await ws.send(
'{"id": 1, "method": "eth_subscribe", "params": ["newHeads"]}'
)
subscription_response = await ws.recv()
self.logger.debug("Subscription acquired %s", subscription_response)
while self.subscribed:
await asyncio.wait_for(ws.recv(), timeout=60)
async with self.new_block_lock:
self.new_block_lock.notify_all()

async def wait_for_new_block(self) -> None:
async with self.new_block_lock:
await self.new_block_lock.wait()

def close(self) -> None:
"""
Closes the listener and unsubscribes.
"""
self.subscribed = False
15 changes: 12 additions & 3 deletions ethereumetl/providers/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
# SOFTWARE.


import logging
from urllib.parse import urlparse

from web3 import IPCProvider, HTTPProvider
from web3 import IPCProvider, HTTPProvider, WebsocketProvider

from ethereumetl.providers.ipc import BatchIPCProvider
from ethereumetl.providers.rpc import BatchHTTPProvider

DEFAULT_TIMEOUT = 60


def get_provider_from_uri(uri_string, timeout=DEFAULT_TIMEOUT, batch=False):
def get_provider_from_uri(uri_string: str, timeout=DEFAULT_TIMEOUT, batch=False):
uri = urlparse(uri_string)
if uri.scheme == 'file':
if batch:
Expand All @@ -41,9 +42,17 @@ def get_provider_from_uri(uri_string, timeout=DEFAULT_TIMEOUT, batch=False):
elif uri.scheme == 'http' or uri.scheme == 'https':
request_kwargs = {'timeout': timeout}
if batch:
return BatchHTTPProvider(uri_string, request_kwargs=request_kwargs)
return BatchHTTPProvider(endpoint_uri=uri_string, request_kwargs=request_kwargs)
else:
return HTTPProvider(uri_string, request_kwargs=request_kwargs)
elif uri.scheme == 'wss':
if batch:
uri_fixed = uri_string.replace('wss://', 'https://')
logging.getLogger('ProviderResolver').warn(f"WSS endpoint can't batch requests. Trying HTTPS {uri_fixed}")
request_kwargs = {'timeout': timeout}
return BatchHTTPProvider(endpoint_uri=uri_fixed, request_kwargs=request_kwargs)
else:
return WebsocketProvider(uri_string, websocket_timeout=timeout)
else:
raise ValueError('Unknown uri scheme {}'.format(uri_string))

39 changes: 28 additions & 11 deletions ethereumetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from collections.abc import Mapping
import json
import os
from typing import Dict
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter

Expand Down Expand Up @@ -92,22 +96,35 @@ def array_to_str(val):
item_exporter = ConsoleItemExporter()
elif item_exporter_type == ItemExporterType.KAFKA:
from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter
item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={
'block': 'blocks',
'transaction': 'transactions',
'log': 'logs',
'token_transfer': 'token_transfers',
'trace': 'traces',
'contract': 'contracts',
'token': 'tokens',
})

item_exporter = KafkaItemExporter(
output, item_type_to_topic_mapping=resolve_topic_mapping()
)
else:
raise ValueError('Unable to determine item exporter type for output ' + output)
raise ValueError("Unable to determine item exporter type for output " + output)

return item_exporter


def resolve_topic_mapping(
default_mapping={
"block": "blocks",
"transaction": "transactions",
"log": "logs",
"token_transfer": "token_transfers",
"trace": "traces",
"contract": "contracts",
"token": "tokens",
}
) -> Dict[str, str]:
"""Reads environment variables with topic mapping. returns defaults if no mapping found"""
env_mapping = os.environ.get("KAFKA_TOPIC_MAPPING_DICT")
if not env_mapping:
return default_mapping
decoded_mapping: Dict[str, str] = json.loads(env_mapping)
assert isinstance(decoded_mapping, Mapping)
return decoded_mapping


def get_bucket_and_path_from_gcs_output(output):
output = output.replace('gs://', '')
bucket_and_path = output.split('/', 1)
Expand Down

0 comments on commit d5f987d

Please sign in to comment.