From 6df7525c7b57aa4efd58585d68281f7b0cae32a3 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Wed, 22 Jan 2025 11:56:26 +0100
Subject: [PATCH] Refactor log-keeping
---
dabapush/Reader/JSONReader.py | 8 ++-----
dabapush/Reader/NDJSONReader.py | 8 ++-----
dabapush/Reader/Reader.py | 27 +--------------------
dabapush/Record.py | 8 +++----
dabapush/Writer/Writer.py | 40 ++++++++++++++++++++++++++++++-
tests/Reader/test_JSONReader.py | 31 +++---------------------
tests/Reader/test_NDJSONReader.py | 32 -------------------------
tests/Writer/test_Writer.py | 33 +++++++++++++++++++++----
8 files changed, 79 insertions(+), 108 deletions(-)
diff --git a/dabapush/Reader/JSONReader.py b/dabapush/Reader/JSONReader.py
index 0dabe72..9282100 100644
--- a/dabapush/Reader/JSONReader.py
+++ b/dabapush/Reader/JSONReader.py
@@ -11,7 +11,6 @@
from .Reader import FileReader
-
class JSONReader(FileReader):
"""Reader to read ready to read directories containing multiple json files.
It matches files in the path-tree against the pattern and reads the
@@ -36,14 +35,11 @@ def read(self) -> Iterator[Record]:
record = Record(
uuid=f"{str(file_record.uuid)}",
payload=(
- parsed
- if not self.config.flatten_dicts
- else flatten(parsed)
+ parsed if not self.config.flatten_dicts else flatten(parsed)
),
source=file_record,
)
- if record not in self.back_log:
- yield record
+ yield record
class JSONReaderConfiguration(ReaderConfiguration):
diff --git a/dabapush/Reader/NDJSONReader.py b/dabapush/Reader/NDJSONReader.py
index 29ff064..a8ac217 100644
--- a/dabapush/Reader/NDJSONReader.py
+++ b/dabapush/Reader/NDJSONReader.py
@@ -53,13 +53,9 @@ def read(self) -> Iterator[Record]:
"""reads multiple NDJSON files and emits them line by line"""
for file_record in self.records:
- filtered_records = filter(
- lambda x: x not in self.back_log,
- file_record.split(
- func=read_and_split, flatten_records=self.config.flatten_dicts
- ),
+ yield from file_record.split(
+ func=read_and_split, flatten_records=self.config.flatten_dicts
)
- yield from filtered_records
class NDJSONReaderConfiguration(ReaderConfiguration):
diff --git a/dabapush/Reader/Reader.py b/dabapush/Reader/Reader.py
index 09a81fd..94df09a 100644
--- a/dabapush/Reader/Reader.py
+++ b/dabapush/Reader/Reader.py
@@ -4,8 +4,6 @@
from pathlib import Path
from typing import Iterator
-import ujson
-from loguru import logger as log
from tqdm.auto import tqdm
from ..Configuration.ReaderConfiguration import ReaderConfiguration
@@ -33,12 +31,6 @@ def __init__(self, config: ReaderConfiguration):
be a subclass of ReaderConfiguration.
"""
self.config = config
- self.back_log = []
- # initialize file log
- if not Path(".dabapush/").exists():
- Path(".dabapush/").mkdir()
-
- self.log_path = Path(f".dabapush/{config.name}.jsonl")
@abc.abstractmethod
def read(self) -> Iterator[Record]:
@@ -77,28 +69,11 @@ def read(self) -> Iterator[Record]:
@property
def records(self) -> Iterator[Record]:
"""Generator for all files matching the pattern in the read_path."""
- if self.log_path.exists():
- log.debug(
- f"Found log file for {self.config.name} at {self.log_path}. Loading..."
- )
- with self.log_path.open("rt", encoding="utf8") as f:
- self.back_log = [Record(**ujson.loads(_)) for _ in f.readlines()]
- else:
- self.log_path.touch()
-
yield from (
Record(
uuid=str(a),
payload=a,
- event_handlers={"on_done": [self.log]},
+ # event_handlers={"on_done": [self.log]},
)
for a in tqdm(list(Path(self.config.read_path).rglob(self.config.pattern)))
)
-
- def log(self, record: Record):
- """Log the record to the persistent record log file."""
- with self.log_path.open("a", encoding="utf8") as f:
- for sub_record in record.walk_tree(only_leafs=True):
- ujson.dump(sub_record.to_log(), f)
- f.write("\n")
- log.debug(f"Done with {record.uuid}")
diff --git a/dabapush/Record.py b/dabapush/Record.py
index 382118a..d4a3273 100644
--- a/dabapush/Record.py
+++ b/dabapush/Record.py
@@ -185,10 +185,10 @@ def done(self):
# Signal parent that this record is done
self._state_ = "done"
log.debug(f"Record {self.uuid} is set as done.")
- if self.source:
- self.source.signal_done()
- log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
- self.__dispatch_event__("on_done")
+ # if self.source:
+ # self.source.signal_done()
+ # log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
+ # self.__dispatch_event__("on_done")
def signal_done(self):
"""Signal that a child record is done."""
diff --git a/dabapush/Writer/Writer.py b/dabapush/Writer/Writer.py
index 7efd0d5..7baf827 100644
--- a/dabapush/Writer/Writer.py
+++ b/dabapush/Writer/Writer.py
@@ -6,8 +6,10 @@
"""
import abc
+from pathlib import Path
from typing import Iterator, List
+import ujson
from loguru import logger as log
from ..Configuration.WriterConfiguration import WriterConfiguration
@@ -27,10 +29,31 @@ def __init__(self, config: WriterConfiguration):
self.config = config
self.buffer: List[Record] = []
+ self.back_log: List[Record] = []
+ # initialize file log
+ if not Path(".dabapush/").exists():
+ Path(".dabapush/").mkdir()
+
+ self.log_path = Path(f".dabapush/{config.name}.jsonl")
+ if self.log_path.exists():
+ log.debug(
+ f"Found log file for {self.config.name} at {self.log_path}. Loading..."
+ )
+ with self.log_path.open("rt", encoding="utf8") as f:
+ self.back_log = [
+ Record(**ujson.loads(_)) # pylint: disable=I1101
+ for _ in f.readlines()
+ ]
+ else:
+ self.log_path.touch()
+ self.log_file = self.log_path.open( # pylint: disable=R1732
+ "a", encoding="utf8"
+ )
def __del__(self):
"""Ensures the buffer is flushed before the object is destroyed."""
self._trigger_persist()
+ self.log_file.close()
def write(self, queue: Iterator[Record]) -> None:
"""Consumes items from the provided queue.
@@ -39,16 +62,20 @@ def write(self, queue: Iterator[Record]) -> None:
queue (Iterator[Record]): Items to be consumed.
"""
for item in queue:
+ if item in self.back_log:
+ continue
self.buffer.append(item)
if len(self.buffer) >= self.config.chunk_size:
self._trigger_persist()
def _trigger_persist(self):
self.persist()
- log.debug(f"Persisted {self.config.chunk_size} records. Setting to done.")
+ log.debug(f"Persisted {len(self.buffer)} records. Setting to done.")
for record in self.buffer:
log.debug(f"Setting record {record.uuid} as done.")
record.done()
+ self.log(record)
+ self.log_file.flush()
self.buffer = []
@abc.abstractmethod
@@ -72,3 +99,14 @@ def id(self):
str: The ID of the writer.
"""
return self.config.id
+
+ def log(self, record: Record):
+ """Log the record to the persistent record log file."""
+ ujson.dump(record.to_log(), self.log_file) # pylint: disable=I1101
+ self.log_file.write("\n")
+
+ # with self.log_path.open("a", encoding="utf8") as f:
+ # for sub_record in record.walk_tree(only_leafs=True):
+ # ujson.dump(sub_record.to_log(), f)
+ # f.write("\n")
+ log.debug(f"Done with {record.uuid}")
diff --git a/tests/Reader/test_JSONReader.py b/tests/Reader/test_JSONReader.py
index e4e74b8..797ccf3 100644
--- a/tests/Reader/test_JSONReader.py
+++ b/tests/Reader/test_JSONReader.py
@@ -7,16 +7,18 @@
from dabapush.Reader.JSONReader import JSONReader, JSONReaderConfiguration
+
@pytest.fixture
def input_json_directory(isolated_test_dir):
"Pytest fixture creating a directory with 20 json files."
- for idx in range(10,30):
+ for idx in range(10, 30):
file_path = isolated_test_dir / f"test_{idx}.json"
with file_path.open("wt") as out_file:
json.dump({"test_key": idx}, out_file)
out_file.write("\n")
return isolated_test_dir
+
def test_read(input_json_directory: Path): # pylint: disable=W0621
"""Should read the data from the file."""
reader = JSONReader(
@@ -28,30 +30,3 @@ def test_read(input_json_directory: Path): # pylint: disable=W0621
print(record)
assert record.processed_at
assert record.payload == {"test_key": int(record.uuid[-7:-5])}
-
-
-def test_read_with_backlog(input_json_directory: Path): # pylint: disable=W0621
- """Should only read the new data."""
- reader = JSONReaderConfiguration(
- "test", read_path=str(input_json_directory.resolve()), pattern="*.json"
- ).get_instance()
-
- def wrapper():
- n = None
- for n, record in enumerate(reader.read()):
- record.done()
- return n or 0
-
- n = wrapper()
-
- assert n + 1 == 20
-
- reader2 = JSONReaderConfiguration(
- "test", read_path=str(input_json_directory.resolve())
- ).get_instance()
-
- records2 = list(reader2.read())
- log_path = input_json_directory / ".dabapush/test.jsonl"
- assert log_path.exists()
- assert len(reader2.back_log) == 20
- assert len(records2) == 0
diff --git a/tests/Reader/test_NDJSONReader.py b/tests/Reader/test_NDJSONReader.py
index 124af67..d567b40 100644
--- a/tests/Reader/test_NDJSONReader.py
+++ b/tests/Reader/test_NDJSONReader.py
@@ -30,35 +30,3 @@ def test_read(isolated_test_dir: Path, data): # pylint: disable=W0621
for n, record in enumerate(records):
assert record.processed_at
assert record.payload == data[n]
-
-
-def test_read_with_backlog(isolated_test_dir: Path, data): # pylint: disable=W0621
- """Should only read the new data."""
- reader = NDJSONReaderConfiguration(
- "test", read_path=str(isolated_test_dir.resolve()), pattern="*.ndjson"
- ).get_instance()
- file_path = isolated_test_dir / "test.ndjson"
- with file_path.open("wt") as file:
- for line in data:
- json.dump(line, file)
- file.write("\n")
-
- def wrapper():
- n = None
- for n, record in enumerate(reader.read()):
- record.done()
- return n or 0
-
- n = wrapper()
-
- assert n + 1 == 20
-
- reader2 = NDJSONReaderConfiguration(
- "test", read_path=str(isolated_test_dir.resolve())
- ).get_instance()
-
- records2 = list(reader2.read())
- log_path = isolated_test_dir / ".dabapush/test.jsonl"
- assert log_path.exists()
- assert len(reader2.back_log) == 20
- assert len(records2) == 0
diff --git a/tests/Writer/test_Writer.py b/tests/Writer/test_Writer.py
index 7852289..abb7298 100644
--- a/tests/Writer/test_Writer.py
+++ b/tests/Writer/test_Writer.py
@@ -1,6 +1,6 @@
"""Tests for the Writer class."""
-# pylint: disable=W0621, C0114, C0115, C0116
+# pylint: disable=W0212, W0621, C0114, C0115, C0116
from pytest import fixture
from dabapush.Configuration.WriterConfiguration import WriterConfiguration
@@ -9,7 +9,8 @@
@fixture
-def writer() -> Writer:
+def writer(monkeypatch, tmp_path) -> Writer:
+ monkeypatch.chdir(tmp_path)
config = WriterConfiguration(name="test")
return Writer(config)
@@ -36,15 +37,37 @@ def __init__(self, config):
def persist(self):
self.persisted_data.extend((_.payload for _ in self.buffer))
- self.buffer = []
-def test_writer_persist_method():
+def test_writer_persist_method(monkeypatch, tmp_path):
"""Should persist the buffer."""
+ monkeypatch.chdir(tmp_path)
+
config = WriterConfiguration(name="test", id=1, chunk_size=3)
writer = MyTestWriter(config)
queue = (Record(uuid=str(i), payload=i) for i in range(10))
writer.write(queue)
- writer.persist()
+ writer._trigger_persist()
assert writer.persisted_data == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert not writer.buffer
+
+
+def test_writer_persist_method_with_backlog(monkeypatch, tmp_path):
+ """Should persist the buffer."""
+ monkeypatch.chdir(tmp_path)
+
+ config = WriterConfiguration(name="test", id=1, chunk_size=3)
+ queue = [Record(uuid=str(i), payload=i) for i in range(10)]
+ writer = MyTestWriter(config)
+ writer.write((_ for _ in queue))
+ writer._trigger_persist()
+
+ del writer
+
+ writer = MyTestWriter(config)
+ writer._trigger_persist()
+
+ assert not writer.persisted_data
+ assert not writer.buffer
+ assert len(writer.back_log) == 10
+ assert all(_.uuid == str(i) for i, _ in enumerate(writer.back_log))