From a177ec7ae112e23325d6bfdcb33cedf42a5fac7f Mon Sep 17 00:00:00 2001 From: Sebastian Goodman Date: Thu, 17 Oct 2024 10:56:11 -0700 Subject: [PATCH 1/3] Make BulkIndexError and ScanError serializable --- elasticsearch/helpers/errors.py | 7 ++++++ .../test_server/test_helpers.py | 23 ++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/elasticsearch/helpers/errors.py b/elasticsearch/helpers/errors.py index 359fe87b1..2963a5633 100644 --- a/elasticsearch/helpers/errors.py +++ b/elasticsearch/helpers/errors.py @@ -23,6 +23,9 @@ def __init__(self, message: Any, errors: List[Dict[str, Any]]): super().__init__(message) self.errors: List[Dict[str, Any]] = errors + def __reduce__(self): + return (self.__class__, (self.args[0], self.errors)) + class ScanError(Exception): scroll_id: str @@ -30,3 +33,7 @@ class ScanError(Exception): def __init__(self, scroll_id: str, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.scroll_id = scroll_id + + def __reduce__(self): + return (self.__class__, (self.scroll_id,) + self.args) + diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index 011803bc9..14274b714 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -16,6 +16,7 @@ # under the License. import json +import pickle from datetime import datetime, timedelta from unittest.mock import call, patch @@ -24,7 +25,7 @@ from elastic_transport import ApiResponseMeta, ObjectApiResponse from elasticsearch import ApiError, helpers -from elasticsearch.helpers import ScanError +from elasticsearch.helpers import BulkIndexError, ScanError class FailingBulkClient: @@ -993,3 +994,23 @@ def test_reindex_index_datastream_op_type_index(sync_client): query={"query": {"bool": {"filter": {"term": {"type": "answers"}}}}}, op_type="_index", ) + +def test_serialize_bulk_index_error(): + msg = "message" + errors = {"error": 1} + error = BulkIndexError(msg, errors) + pickled = pickle.dumps(error) + actual = pickle.loads(pickled) + assert actual.__class__ == BulkIndexError + assert actual.errors == error.errors + assert actual.args == error.args + +def test_serialize_scan_error(): + scroll_id = "message" + args = ("a", "b") + error = ScanError(scroll_id, *args) + pickled = pickle.dumps(error) + actual = pickle.loads(pickled) + assert actual.__class__ == ScanError + assert actual.scroll_id == error.scroll_id + assert actual.args == error.args From 0eef81e1eff6c2ad1a09da5110937039b37d2a57 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Tue, 12 Nov 2024 10:15:58 +0400 Subject: [PATCH 2/3] Simplify constructors, move tests, type reduce function --- elasticsearch/helpers/errors.py | 17 +++++++------- test_elasticsearch/test_helpers.py | 17 ++++++++++++++ .../test_server/test_helpers.py | 23 +------------------ 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/elasticsearch/helpers/errors.py b/elasticsearch/helpers/errors.py index 2963a5633..dc46322ea 100644 --- a/elasticsearch/helpers/errors.py +++ b/elasticsearch/helpers/errors.py @@ -15,25 +15,26 @@ # specific language governing permissions and limitations # under the License. -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple, Type class BulkIndexError(Exception): - def __init__(self, message: Any, errors: List[Dict[str, Any]]): + def __init__(self, message: str, errors: List[Dict[str, Any]]): super().__init__(message) self.errors: List[Dict[str, Any]] = errors - def __reduce__(self): + def __reduce__( + self, + ) -> Tuple[Type["BulkIndexError"], Tuple[str, List[Dict[str, Any]]]]: return (self.__class__, (self.args[0], self.errors)) class ScanError(Exception): scroll_id: str - def __init__(self, scroll_id: str, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) + def __init__(self, scroll_id: str, shards_message: str) -> None: + super().__init__(shards_message) self.scroll_id = scroll_id - def __reduce__(self): - return (self.__class__, (self.scroll_id,) + self.args) - + def __reduce__(self) -> Tuple[Type["ScanError"], Tuple[str, str]]: + return (self.__class__, (self.scroll_id, self.args[0])) diff --git a/test_elasticsearch/test_helpers.py b/test_elasticsearch/test_helpers.py index c9284afc5..e30635f44 100644 --- a/test_elasticsearch/test_helpers.py +++ b/test_elasticsearch/test_helpers.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import pickle import threading import time from unittest import mock @@ -182,3 +183,19 @@ class TestExpandActions: @pytest.mark.parametrize("action", ["whatever", b"whatever"]) def test_string_actions_are_marked_as_simple_inserts(self, action): assert ({"index": {}}, b"whatever") == helpers.expand_action(action) + + +def test_serialize_bulk_index_error(): + error = helpers.BulkIndexError("message", [{"error": 1}]) + pickled = pickle.loads(pickle.dumps(error)) + assert pickled.__class__ == helpers.BulkIndexError + assert pickled.errors == error.errors + assert pickled.args == error.args + + +def test_serialize_scan_error(): + error = helpers.ScanError("scroll_id", "shard_message") + pickled = pickle.loads(pickle.dumps(error)) + assert pickled.__class__ == helpers.ScanError + assert pickled.scroll_id == error.scroll_id + assert pickled.args == error.args diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index 14274b714..011803bc9 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -16,7 +16,6 @@ # under the License. import json -import pickle from datetime import datetime, timedelta from unittest.mock import call, patch @@ -25,7 +24,7 @@ from elastic_transport import ApiResponseMeta, ObjectApiResponse from elasticsearch import ApiError, helpers -from elasticsearch.helpers import BulkIndexError, ScanError +from elasticsearch.helpers import ScanError class FailingBulkClient: @@ -994,23 +993,3 @@ def test_reindex_index_datastream_op_type_index(sync_client): query={"query": {"bool": {"filter": {"term": {"type": "answers"}}}}}, op_type="_index", ) - -def test_serialize_bulk_index_error(): - msg = "message" - errors = {"error": 1} - error = BulkIndexError(msg, errors) - pickled = pickle.dumps(error) - actual = pickle.loads(pickled) - assert actual.__class__ == BulkIndexError - assert actual.errors == error.errors - assert actual.args == error.args - -def test_serialize_scan_error(): - scroll_id = "message" - args = ("a", "b") - error = ScanError(scroll_id, *args) - pickled = pickle.dumps(error) - actual = pickle.loads(pickled) - assert actual.__class__ == ScanError - assert actual.scroll_id == error.scroll_id - assert actual.args == error.args From 0db50e4cee62542c74c1d5a4431bfaa170b760ce Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Tue, 12 Nov 2024 15:31:10 +0400 Subject: [PATCH 3/3] Restore backwards compatibility --- elasticsearch/helpers/errors.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/elasticsearch/helpers/errors.py b/elasticsearch/helpers/errors.py index dc46322ea..4814ca581 100644 --- a/elasticsearch/helpers/errors.py +++ b/elasticsearch/helpers/errors.py @@ -32,9 +32,9 @@ def __reduce__( class ScanError(Exception): scroll_id: str - def __init__(self, scroll_id: str, shards_message: str) -> None: - super().__init__(shards_message) + def __init__(self, scroll_id: str, *args: Any) -> None: + super().__init__(*args) self.scroll_id = scroll_id def __reduce__(self) -> Tuple[Type["ScanError"], Tuple[str, str]]: - return (self.__class__, (self.scroll_id, self.args[0])) + return (self.__class__, (self.scroll_id,) + self.args)