From 9063aae1d884118f98d2de085614201748ab43cd Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 15 Feb 2023 15:39:55 +0100 Subject: [PATCH 1/2] Async backup restoration with error handling --- karapace/schema_backup.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/karapace/schema_backup.py b/karapace/schema_backup.py index d30443ec0..99c8bd2ef 100644 --- a/karapace/schema_backup.py +++ b/karapace/schema_backup.py @@ -7,7 +7,7 @@ from enum import Enum from kafka import KafkaConsumer, KafkaProducer from kafka.admin import KafkaAdminClient -from kafka.errors import KafkaError, TopicAlreadyExistsError +from kafka.errors import TopicAlreadyExistsError from kafka.structs import PartitionMetadata from karapace import constants from karapace.anonymize_schemas import anonymize_avro @@ -292,6 +292,8 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str] self.timeout_ms = 1000 self.timeout_kafka_producer = 5 + self.producer_exception: Optional[Exception] = None + # Schema key formatter self.key_formatter = None if self.topic_name == constants.DEFAULT_SCHEMA_TOPIC or self.config.get("force_key_correction", False): @@ -311,19 +313,23 @@ def restore_backup(self) -> None: self._restore_backup_version_2(producer, fp) else: self._restore_backup_version_1_single_array(producer, fp) + producer.flush(timeout=self.timeout_kafka_producer) + if self.producer_exception: + raise BackupError("Error while producing restored messages") from self.producer_exception + + def producer_error_callback(self, exception: Exception): + self.producer_exception = exception def _handle_restore_message(self, producer: KafkaProducer, item: Tuple[str, str]) -> None: key = self.encode_key(item[0]) value = encode_value(item[1]) - LOG.debug("Trying to send kafka msg key: %r, value: %r", key, value) - try: - msg = producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO) - producer.flush(timeout=self.timeout_kafka_producer) - metadata = msg.get(timeout=self.timeout_kafka_producer) - except KafkaError as ex: - raise BackupError("Error while producing restored message") from ex - else: - LOG.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, metadata.offset) + LOG.debug("Sending kafka msg key: %r, value: %r", key, value) + producer.send( + self.topic_name, + key=key, + value=value, + partition=PARTITION_ZERO, + ).add_errback(self.producer_error_callback) def _restore_backup_version_1_single_array(self, producer: KafkaProducer, fp: IO) -> None: raw_msg = fp.read() From 81469951f99e2e320b8a812b016a01fac80b465d Mon Sep 17 00:00:00 2001 From: giuseppelillo Date: Wed, 15 Feb 2023 16:05:48 +0100 Subject: [PATCH 2/2] Update karapace/schema_backup.py Co-authored-by: Richard Fussenegger --- karapace/schema_backup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/schema_backup.py b/karapace/schema_backup.py index 99c8bd2ef..ca1291983 100644 --- a/karapace/schema_backup.py +++ b/karapace/schema_backup.py @@ -314,7 +314,7 @@ def restore_backup(self) -> None: else: self._restore_backup_version_1_single_array(producer, fp) producer.flush(timeout=self.timeout_kafka_producer) - if self.producer_exception: + if self.producer_exception is not None: raise BackupError("Error while producing restored messages") from self.producer_exception def producer_error_callback(self, exception: Exception):