From 0dc641e2ae296141bbf2e373b71a03e07a3ef87e Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Tue, 11 Jul 2023 13:24:31 +0200 Subject: [PATCH] [DE-544] Adjusting allowRetry implementation (#255) * Refactoring allow_retry implementation * Excluding the newer endpoint format from coverage * Temporarily excluding next_batch_id from coverage --- arango/aql.py | 4 +-- arango/cursor.py | 42 +++++++++++-------------------- docs/cursor.rst | 60 ++++++++++++++++++++++++++++++++++++++------ tests/test_cursor.py | 25 ++++++++++++------ 4 files changed, 88 insertions(+), 43 deletions(-) diff --git a/arango/aql.py b/arango/aql.py index 21ff248c..217664e6 100644 --- a/arango/aql.py +++ b/arango/aql.py @@ -313,7 +313,7 @@ def execute( development to catch issues early. If set to False, warnings are returned with the query result. There is a server configuration option "--query.fail-on-warning" for setting the default value for - this behaviour so it does not need to be set per-query. + this behaviour, so it does not need to be set per-query. :type fail_on_warning: bool :param profile: Return additional profiling details in the cursor, unless the query cache is used. @@ -437,7 +437,7 @@ def execute( def response_handler(resp: Response) -> Cursor: if not resp.is_success: raise AQLQueryExecuteError(resp, request) - return Cursor(self._conn, resp.body) + return Cursor(self._conn, resp.body, allow_retry=allow_retry) return self._execute(request, response_handler) diff --git a/arango/cursor.py b/arango/cursor.py index 5a51a715..83da8881 100644 --- a/arango/cursor.py +++ b/arango/cursor.py @@ -27,6 +27,10 @@ class Cursor: :type init_data: dict :param cursor_type: Cursor type ("cursor" or "export"). :type cursor_type: str + :param allow_retry: If set to True, the cursor will always attempt to fetch + the latest batch from server even if the previous attempt failed. + This option is only available for server versions 3.11 and above. + :type allow_retry: bool """ __slots__ = [ @@ -41,6 +45,7 @@ class Cursor: "_has_more", "_batch", "_next_batch_id", + "_allow_retry", ] def __init__( @@ -48,9 +53,11 @@ def __init__( connection: BaseConnection, init_data: Json, cursor_type: str = "cursor", + allow_retry: bool = False, ) -> None: self._conn = connection self._type = cursor_type + self._allow_retry = allow_retry self._batch: Deque[Any] = deque() self._id = None self._count: Optional[int] = None @@ -103,8 +110,10 @@ def _update(self, data: Json) -> Json: # New in 3.11 if "nextBatchId" in data: - self._next_batch_id = data["nextBatchId"] - result["next_batch_id"] = data["nextBatchId"] + # This is only available for server versions 3.11 and above. + # Currently, we are testing against 3.10.9 + self._next_batch_id = data["nextBatchId"] # pragma: no cover + result["next_batch_id"] = data["nextBatchId"] # pragma: no cover self._has_more = bool(data["hasMore"]) result["has_more"] = data["hasMore"] @@ -280,33 +289,12 @@ def fetch(self) -> Json: """ if self._id is None: raise CursorStateError("cursor ID not set") - request = Request(method="post", endpoint=f"/_api/{self._type}/{self._id}") - resp = self._conn.send_request(request) - - if not resp.is_success: - raise CursorNextError(resp, request) - return self._update(resp.body) - - def retry(self) -> Json: - """Retry fetching the next batch from server and update the cursor. + endpoint = f"/_api/{self._type}/{self._id}" + if self._allow_retry and self._next_batch_id is not None: + endpoint += f"/{self._next_batch_id}" # pragma: no cover - Available only if the ``allowRetry`` query options is enabled. - Introduced in 3.11. - - :return: New batch details. - :rtype: dict - :raise arango.exceptions.CursorNextError: If batch retrieval fails. - :raise arango.exceptions.CursorStateError: If cursor ID is not set. - """ - if self._id is None: - raise CursorStateError("cursor ID not set") - if self._id is None: - raise CursorStateError("nextBatchId not set") - request = Request( - method="post", - endpoint=f"/_api/{self._type}/{self._id}/{self._next_batch_id}", - ) + request = Request(method="post", endpoint=endpoint) resp = self._conn.send_request(request) if not resp.is_success: diff --git a/docs/cursor.rst b/docs/cursor.rst index 6a713733..c427dd37 100644 --- a/docs/cursor.rst +++ b/docs/cursor.rst @@ -33,8 +33,7 @@ number of items in the result set may or may not be known in advance. 'FOR doc IN students FILTER doc.age > @val RETURN doc', bind_vars={'val': 17}, batch_size=2, - count=True, - allow_retry=True + count=True ) # Get the cursor ID. @@ -73,11 +72,7 @@ number of items in the result set may or may not be known in advance. cursor.pop() # Fetch the next batch and add them to the cursor object. - try: - cursor.fetch() - except CursorNextError: - # Retry fetching the latest batch from the cursor. - cursor.retry() + cursor.fetch() # Delete the cursor from the server. cursor.close() @@ -121,3 +116,54 @@ instead. cursor.fetch() while not cursor.empty(): # Pop until nothing is left on the cursor. cursor.pop() + +With ArangoDB 3.11.0 or higher, you can also use the `allow_retry` +parameter of :func:`arango.aql.AQL.execute` to automatically retry +the request if the cursor encountered any issues during the previous +fetch operation. Note that this feature causes the server to cache the +last batch. To allow re-fetching of the very last batch of the query, +the server cannot automatically delete the cursor. Once you have successfully +received the last batch, you should call :func:`arango.cursor.Cursor.close`. + +**Example:** + +.. code-block:: python + + from arango import ArangoClient + + # Initialize the ArangoDB client. + client = ArangoClient() + + # Connect to "test" database as root user. + db = client.db('test', username='root', password='passwd') + + # Set up some test data to query against. + db.collection('students').insert_many([ + {'_key': 'Abby', 'age': 22}, + {'_key': 'John', 'age': 18}, + {'_key': 'Mary', 'age': 21}, + {'_key': 'Suzy', 'age': 23}, + {'_key': 'Dave', 'age': 20} + ]) + + # Execute an AQL query which returns a cursor object. + cursor = db.aql.execute( + 'FOR doc IN students FILTER doc.age > @val RETURN doc', + bind_vars={'val': 17}, + batch_size=2, + count=True, + allow_retry=True + ) + + while cursor.has_more(): + try: + cursor.fetch() + except ConnectionError: + # Retry the request. + continue + + while not cursor.empty(): + cursor.pop() + + # Delete the cursor from the server. + cursor.close() diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 2c370fd1..e03eae32 100644 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -263,7 +263,7 @@ def test_cursor_manual_fetch_and_pop(db, col, docs): assert err.value.message == "current batch is empty" -def test_cursor_retry_disabled(db, col, db_version): +def test_cursor_retry_disabled(db, col, docs, db_version): cursor = db.aql.execute( f"FOR d IN {col.name} SORT d._key RETURN d", count=True, @@ -275,12 +275,17 @@ def test_cursor_retry_disabled(db, col, db_version): ) result = cursor.fetch() assert result["id"] == cursor.id - cursor._next_batch_id = "2" - if db_version >= version.parse("3.11.0"): - with pytest.raises(CursorNextError) as err: - cursor.retry() - assert err.value.message == "batch id not found" + while not cursor.empty(): + cursor.pop() + + # The next batch ID should have no effect + cursor._next_batch_id = "2" + result = cursor.fetch() + if db_version >= version.parse("3.11.1"): + assert result["next_batch_id"] == "4" + doc = cursor.pop() + assert clean_doc(doc) == docs[2] assert cursor.close(ignore_missing=True) @@ -312,7 +317,7 @@ def test_cursor_retry(db, col, docs, db_version): # Decrease the next batch ID as if the previous fetch failed if db_version >= version.parse("3.11.0"): cursor._next_batch_id = "2" - result = cursor.retry() + result = cursor.fetch() assert result["id"] == cursor.id assert result["next_batch_id"] == "3" doc = cursor.pop() @@ -335,6 +340,12 @@ def test_cursor_retry(db, col, docs, db_version): doc = cursor.pop() assert clean_doc(doc) == docs[-1] + if db_version >= version.parse("3.11.0"): + # We should be able to fetch the last batch again + cursor.fetch() + doc = cursor.pop() + assert clean_doc(doc) == docs[-1] + if db_version >= version.parse("3.11.0"): assert cursor.close() else: