diff --git a/dabapush/Writer/Writer.py b/dabapush/Writer/Writer.py index 29a83f0..7efd0d5 100644 --- a/dabapush/Writer/Writer.py +++ b/dabapush/Writer/Writer.py @@ -30,7 +30,7 @@ def __init__(self, config: WriterConfiguration): def __del__(self): """Ensures the buffer is flushed before the object is destroyed.""" - self.persist() + self._trigger_persist() def write(self, queue: Iterator[Record]) -> None: """Consumes items from the provided queue. @@ -41,14 +41,15 @@ def write(self, queue: Iterator[Record]) -> None: for item in queue: self.buffer.append(item) if len(self.buffer) >= self.config.chunk_size: - self.persist() - log.debug( - f"Persisted {self.config.chunk_size} records. Setting to done." - ) - for record in self.buffer: - log.debug(f"Setting record {record.uuid} as done.") - record.done() - self.buffer = [] + self._trigger_persist() + + def _trigger_persist(self): + self.persist() + log.debug(f"Persisted {self.config.chunk_size} records. Setting to done.") + for record in self.buffer: + log.debug(f"Setting record {record.uuid} as done.") + record.done() + self.buffer = [] @abc.abstractmethod def persist(self) -> None: