Skip to content

Commit

Permalink
indexer: add option to configure timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek committed Jul 17, 2023
1 parent e248876 commit 92c8510
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
=========

Apibara Python SDK 0.7.2 (UNRELEASED)
--------------------------------------

Changed
^^^^^^^

- Add option to :code:`IndexerRunner` to configure timeout.


Apibara Python SDK 0.7.1 (2023-07-14)
--------------------------------------

Expand Down
6 changes: 5 additions & 1 deletion src/apibara/indexer/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class IndexerRunner(Generic[UserContext, Filter]):
options to set the input stream and connection string.
client_options:
list of options passed to the gRPC channel.
timeout:
custom timeout for a message to arrive.
"""

def __init__(
Expand All @@ -64,6 +66,7 @@ def __init__(
reset_state: bool = False,
config: Optional[IndexerRunnerConfiguration] = None,
client_options: Optional[List[Tuple[str, Any]]] = None,
timeout: Optional[int] = None,
) -> None:
if config is None:
config = IndexerRunnerConfiguration()
Expand All @@ -73,6 +76,7 @@ def __init__(
self._config = config
self._indexer_id = None
self._indexer_storage = None
self._timeout = timeout
self._client_options = client_options

async def run(self, indexer: Indexer, *, ctx: Optional[UserContext] = None):
Expand Down Expand Up @@ -120,7 +124,7 @@ async def _do_run(self, indexer: Indexer, ctx: Optional[UserContext] = None):

async def _connect_and_stream(self, indexer: Indexer, ctx: Optional[UserContext]):
channel = self._channel()
(client, stream) = StreamService(channel).stream_data()
(client, stream) = StreamService(channel).stream_data(timeout=self._timeout)

config = indexer.initial_configuration()
has_stored = self._indexer_storage.update_with_stored_configuration(config)
Expand Down

0 comments on commit 92c8510

Please sign in to comment.