Skip to content

Commit

Permalink
refactor logging trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
pekasen committed Jan 21, 2025
1 parent 42b3677 commit af25c5d
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions dabapush/Writer/Writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, config: WriterConfiguration):

def __del__(self):
"""Ensures the buffer is flushed before the object is destroyed."""
self.persist()
self._trigger_persist()

def write(self, queue: Iterator[Record]) -> None:
"""Consumes items from the provided queue.
Expand All @@ -41,14 +41,15 @@ def write(self, queue: Iterator[Record]) -> None:
for item in queue:
self.buffer.append(item)
if len(self.buffer) >= self.config.chunk_size:
self.persist()
log.debug(
f"Persisted {self.config.chunk_size} records. Setting to done."
)
for record in self.buffer:
log.debug(f"Setting record {record.uuid} as done.")
record.done()
self.buffer = []
self._trigger_persist()

def _trigger_persist(self):
self.persist()
log.debug(f"Persisted {self.config.chunk_size} records. Setting to done.")
for record in self.buffer:
log.debug(f"Setting record {record.uuid} as done.")
record.done()
self.buffer = []

@abc.abstractmethod
def persist(self) -> None:
Expand Down

0 comments on commit af25c5d

Please sign in to comment.