Skip to content

Commit

Permalink
[DE-544] Adjusting allowRetry implementation (#255)
Browse files Browse the repository at this point in the history
* Refactoring allow_retry implementation

* Excluding the newer endpoint format from coverage

* Temporarily excluding next_batch_id from coverage
  • Loading branch information
apetenchea authored Jul 11, 2023
1 parent b82de8c commit 0dc641e
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 43 deletions.
4 changes: 2 additions & 2 deletions arango/aql.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def execute(
development to catch issues early. If set to False, warnings are
returned with the query result. There is a server configuration
option "--query.fail-on-warning" for setting the default value for
this behaviour so it does not need to be set per-query.
this behaviour, so it does not need to be set per-query.
:type fail_on_warning: bool
:param profile: Return additional profiling details in the cursor,
unless the query cache is used.
Expand Down Expand Up @@ -437,7 +437,7 @@ def execute(
def response_handler(resp: Response) -> Cursor:
if not resp.is_success:
raise AQLQueryExecuteError(resp, request)
return Cursor(self._conn, resp.body)
return Cursor(self._conn, resp.body, allow_retry=allow_retry)

return self._execute(request, response_handler)

Expand Down
42 changes: 15 additions & 27 deletions arango/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class Cursor:
:type init_data: dict
:param cursor_type: Cursor type ("cursor" or "export").
:type cursor_type: str
:param allow_retry: If set to True, the cursor will always attempt to fetch
the latest batch from server even if the previous attempt failed.
This option is only available for server versions 3.11 and above.
:type allow_retry: bool
"""

__slots__ = [
Expand All @@ -41,16 +45,19 @@ class Cursor:
"_has_more",
"_batch",
"_next_batch_id",
"_allow_retry",
]

def __init__(
self,
connection: BaseConnection,
init_data: Json,
cursor_type: str = "cursor",
allow_retry: bool = False,
) -> None:
self._conn = connection
self._type = cursor_type
self._allow_retry = allow_retry
self._batch: Deque[Any] = deque()
self._id = None
self._count: Optional[int] = None
Expand Down Expand Up @@ -103,8 +110,10 @@ def _update(self, data: Json) -> Json:

# New in 3.11
if "nextBatchId" in data:
self._next_batch_id = data["nextBatchId"]
result["next_batch_id"] = data["nextBatchId"]
# This is only available for server versions 3.11 and above.
# Currently, we are testing against 3.10.9
self._next_batch_id = data["nextBatchId"] # pragma: no cover
result["next_batch_id"] = data["nextBatchId"] # pragma: no cover

self._has_more = bool(data["hasMore"])
result["has_more"] = data["hasMore"]
Expand Down Expand Up @@ -280,33 +289,12 @@ def fetch(self) -> Json:
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
request = Request(method="post", endpoint=f"/_api/{self._type}/{self._id}")
resp = self._conn.send_request(request)

if not resp.is_success:
raise CursorNextError(resp, request)

return self._update(resp.body)

def retry(self) -> Json:
"""Retry fetching the next batch from server and update the cursor.
endpoint = f"/_api/{self._type}/{self._id}"
if self._allow_retry and self._next_batch_id is not None:
endpoint += f"/{self._next_batch_id}" # pragma: no cover

Available only if the ``allowRetry`` query options is enabled.
Introduced in 3.11.
:return: New batch details.
:rtype: dict
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
if self._id is None:
raise CursorStateError("nextBatchId not set")
request = Request(
method="post",
endpoint=f"/_api/{self._type}/{self._id}/{self._next_batch_id}",
)
request = Request(method="post", endpoint=endpoint)
resp = self._conn.send_request(request)

if not resp.is_success:
Expand Down
60 changes: 53 additions & 7 deletions docs/cursor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ number of items in the result set may or may not be known in advance.
'FOR doc IN students FILTER doc.age > @val RETURN doc',
bind_vars={'val': 17},
batch_size=2,
count=True,
allow_retry=True
count=True
)

# Get the cursor ID.
Expand Down Expand Up @@ -73,11 +72,7 @@ number of items in the result set may or may not be known in advance.
cursor.pop()

# Fetch the next batch and add them to the cursor object.
try:
cursor.fetch()
except CursorNextError:
# Retry fetching the latest batch from the cursor.
cursor.retry()
cursor.fetch()

# Delete the cursor from the server.
cursor.close()
Expand Down Expand Up @@ -121,3 +116,54 @@ instead.
cursor.fetch()
while not cursor.empty(): # Pop until nothing is left on the cursor.
cursor.pop()

With ArangoDB 3.11.0 or higher, you can also use the `allow_retry`
parameter of :func:`arango.aql.AQL.execute` to automatically retry
the request if the cursor encountered any issues during the previous
fetch operation. Note that this feature causes the server to cache the
last batch. To allow re-fetching of the very last batch of the query,
the server cannot automatically delete the cursor. Once you have successfully
received the last batch, you should call :func:`arango.cursor.Cursor.close`.

**Example:**

.. code-block:: python
from arango import ArangoClient
# Initialize the ArangoDB client.
client = ArangoClient()
# Connect to "test" database as root user.
db = client.db('test', username='root', password='passwd')
# Set up some test data to query against.
db.collection('students').insert_many([
{'_key': 'Abby', 'age': 22},
{'_key': 'John', 'age': 18},
{'_key': 'Mary', 'age': 21},
{'_key': 'Suzy', 'age': 23},
{'_key': 'Dave', 'age': 20}
])
# Execute an AQL query which returns a cursor object.
cursor = db.aql.execute(
'FOR doc IN students FILTER doc.age > @val RETURN doc',
bind_vars={'val': 17},
batch_size=2,
count=True,
allow_retry=True
)
while cursor.has_more():
try:
cursor.fetch()
except ConnectionError:
# Retry the request.
continue
while not cursor.empty():
cursor.pop()
# Delete the cursor from the server.
cursor.close()
25 changes: 18 additions & 7 deletions tests/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def test_cursor_manual_fetch_and_pop(db, col, docs):
assert err.value.message == "current batch is empty"


def test_cursor_retry_disabled(db, col, db_version):
def test_cursor_retry_disabled(db, col, docs, db_version):
cursor = db.aql.execute(
f"FOR d IN {col.name} SORT d._key RETURN d",
count=True,
Expand All @@ -275,12 +275,17 @@ def test_cursor_retry_disabled(db, col, db_version):
)
result = cursor.fetch()
assert result["id"] == cursor.id
cursor._next_batch_id = "2"

if db_version >= version.parse("3.11.0"):
with pytest.raises(CursorNextError) as err:
cursor.retry()
assert err.value.message == "batch id not found"
while not cursor.empty():
cursor.pop()

# The next batch ID should have no effect
cursor._next_batch_id = "2"
result = cursor.fetch()
if db_version >= version.parse("3.11.1"):
assert result["next_batch_id"] == "4"
doc = cursor.pop()
assert clean_doc(doc) == docs[2]

assert cursor.close(ignore_missing=True)

Expand Down Expand Up @@ -312,7 +317,7 @@ def test_cursor_retry(db, col, docs, db_version):
# Decrease the next batch ID as if the previous fetch failed
if db_version >= version.parse("3.11.0"):
cursor._next_batch_id = "2"
result = cursor.retry()
result = cursor.fetch()
assert result["id"] == cursor.id
assert result["next_batch_id"] == "3"
doc = cursor.pop()
Expand All @@ -335,6 +340,12 @@ def test_cursor_retry(db, col, docs, db_version):
doc = cursor.pop()
assert clean_doc(doc) == docs[-1]

if db_version >= version.parse("3.11.0"):
# We should be able to fetch the last batch again
cursor.fetch()
doc = cursor.pop()
assert clean_doc(doc) == docs[-1]

if db_version >= version.parse("3.11.0"):
assert cursor.close()
else:
Expand Down

0 comments on commit 0dc641e

Please sign in to comment.