Skip to content

Commit

Permalink
Merge pull request #546 from aiven/giuseppelillo/restore-backup-async…
Browse files Browse the repository at this point in the history
…-send

Async backup restoration with error handling
  • Loading branch information
Fleshgrinder authored Feb 15, 2023
2 parents 4fd802d + 8146995 commit 8624bc6
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from enum import Enum
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.errors import TopicAlreadyExistsError
from kafka.structs import PartitionMetadata
from karapace import constants
from karapace.anonymize_schemas import anonymize_avro
Expand Down Expand Up @@ -292,6 +292,8 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str]
self.timeout_ms = 1000
self.timeout_kafka_producer = 5

self.producer_exception: Optional[Exception] = None

# Schema key formatter
self.key_formatter = None
if self.topic_name == constants.DEFAULT_SCHEMA_TOPIC or self.config.get("force_key_correction", False):
Expand All @@ -311,19 +313,23 @@ def restore_backup(self) -> None:
self._restore_backup_version_2(producer, fp)
else:
self._restore_backup_version_1_single_array(producer, fp)
producer.flush(timeout=self.timeout_kafka_producer)
if self.producer_exception is not None:
raise BackupError("Error while producing restored messages") from self.producer_exception

def producer_error_callback(self, exception: Exception):
self.producer_exception = exception

def _handle_restore_message(self, producer: KafkaProducer, item: Tuple[str, str]) -> None:
key = self.encode_key(item[0])
value = encode_value(item[1])
LOG.debug("Trying to send kafka msg key: %r, value: %r", key, value)
try:
msg = producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO)
producer.flush(timeout=self.timeout_kafka_producer)
metadata = msg.get(timeout=self.timeout_kafka_producer)
except KafkaError as ex:
raise BackupError("Error while producing restored message") from ex
else:
LOG.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, metadata.offset)
LOG.debug("Sending kafka msg key: %r, value: %r", key, value)
producer.send(
self.topic_name,
key=key,
value=value,
partition=PARTITION_ZERO,
).add_errback(self.producer_error_callback)

def _restore_backup_version_1_single_array(self, producer: KafkaProducer, fp: IO) -> None:
raw_msg = fp.read()
Expand Down

0 comments on commit 8624bc6

Please sign in to comment.