Skip to content

Commit

Permalink
Refactor log-keeping
Browse files Browse the repository at this point in the history
  • Loading branch information
pekasen committed Jan 22, 2025
1 parent 1388e93 commit 6df7525
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 108 deletions.
8 changes: 2 additions & 6 deletions dabapush/Reader/JSONReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from .Reader import FileReader



class JSONReader(FileReader):
"""Reader to read ready to read directories containing multiple json files.
It matches files in the path-tree against the pattern and reads the
Expand All @@ -36,14 +35,11 @@ def read(self) -> Iterator[Record]:
record = Record(
uuid=f"{str(file_record.uuid)}",
payload=(
parsed
if not self.config.flatten_dicts
else flatten(parsed)
parsed if not self.config.flatten_dicts else flatten(parsed)
),
source=file_record,
)
if record not in self.back_log:
yield record
yield record


class JSONReaderConfiguration(ReaderConfiguration):
Expand Down
8 changes: 2 additions & 6 deletions dabapush/Reader/NDJSONReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,9 @@ def read(self) -> Iterator[Record]:
"""reads multiple NDJSON files and emits them line by line"""

for file_record in self.records:
filtered_records = filter(
lambda x: x not in self.back_log,
file_record.split(
func=read_and_split, flatten_records=self.config.flatten_dicts
),
yield from file_record.split(
func=read_and_split, flatten_records=self.config.flatten_dicts
)
yield from filtered_records


class NDJSONReaderConfiguration(ReaderConfiguration):
Expand Down
27 changes: 1 addition & 26 deletions dabapush/Reader/Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from pathlib import Path
from typing import Iterator

import ujson
from loguru import logger as log
from tqdm.auto import tqdm

from ..Configuration.ReaderConfiguration import ReaderConfiguration
Expand Down Expand Up @@ -33,12 +31,6 @@ def __init__(self, config: ReaderConfiguration):
be a subclass of ReaderConfiguration.
"""
self.config = config
self.back_log = []
# initialize file log
if not Path(".dabapush/").exists():
Path(".dabapush/").mkdir()

self.log_path = Path(f".dabapush/{config.name}.jsonl")

@abc.abstractmethod
def read(self) -> Iterator[Record]:
Expand Down Expand Up @@ -77,28 +69,11 @@ def read(self) -> Iterator[Record]:
@property
def records(self) -> Iterator[Record]:
"""Generator for all files matching the pattern in the read_path."""
if self.log_path.exists():
log.debug(
f"Found log file for {self.config.name} at {self.log_path}. Loading..."
)
with self.log_path.open("rt", encoding="utf8") as f:
self.back_log = [Record(**ujson.loads(_)) for _ in f.readlines()]
else:
self.log_path.touch()

yield from (
Record(
uuid=str(a),
payload=a,
event_handlers={"on_done": [self.log]},
# event_handlers={"on_done": [self.log]},
)
for a in tqdm(list(Path(self.config.read_path).rglob(self.config.pattern)))
)

def log(self, record: Record):
"""Log the record to the persistent record log file."""
with self.log_path.open("a", encoding="utf8") as f:
for sub_record in record.walk_tree(only_leafs=True):
ujson.dump(sub_record.to_log(), f)
f.write("\n")
log.debug(f"Done with {record.uuid}")
8 changes: 4 additions & 4 deletions dabapush/Record.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ def done(self):
# Signal parent that this record is done
self._state_ = "done"
log.debug(f"Record {self.uuid} is set as done.")
if self.source:
self.source.signal_done()
log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
self.__dispatch_event__("on_done")
# if self.source:
# self.source.signal_done()
# log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
# self.__dispatch_event__("on_done")

def signal_done(self):
"""Signal that a child record is done."""
Expand Down
40 changes: 39 additions & 1 deletion dabapush/Writer/Writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
"""

import abc
from pathlib import Path
from typing import Iterator, List

import ujson
from loguru import logger as log

from ..Configuration.WriterConfiguration import WriterConfiguration
Expand All @@ -27,10 +29,31 @@ def __init__(self, config: WriterConfiguration):

self.config = config
self.buffer: List[Record] = []
self.back_log: List[Record] = []
# initialize file log
if not Path(".dabapush/").exists():
Path(".dabapush/").mkdir()

self.log_path = Path(f".dabapush/{config.name}.jsonl")
if self.log_path.exists():
log.debug(
f"Found log file for {self.config.name} at {self.log_path}. Loading..."
)
with self.log_path.open("rt", encoding="utf8") as f:
self.back_log = [
Record(**ujson.loads(_)) # pylint: disable=I1101
for _ in f.readlines()
]
else:
self.log_path.touch()
self.log_file = self.log_path.open( # pylint: disable=R1732
"a", encoding="utf8"
)

def __del__(self):
"""Ensures the buffer is flushed before the object is destroyed."""
self._trigger_persist()
self.log_file.close()

def write(self, queue: Iterator[Record]) -> None:
"""Consumes items from the provided queue.
Expand All @@ -39,16 +62,20 @@ def write(self, queue: Iterator[Record]) -> None:
queue (Iterator[Record]): Items to be consumed.
"""
for item in queue:
if item in self.back_log:
continue
self.buffer.append(item)
if len(self.buffer) >= self.config.chunk_size:
self._trigger_persist()

def _trigger_persist(self):
self.persist()
log.debug(f"Persisted {self.config.chunk_size} records. Setting to done.")
log.debug(f"Persisted {len(self.buffer)} records. Setting to done.")
for record in self.buffer:
log.debug(f"Setting record {record.uuid} as done.")
record.done()
self.log(record)
self.log_file.flush()
self.buffer = []

@abc.abstractmethod
Expand All @@ -72,3 +99,14 @@ def id(self):
str: The ID of the writer.
"""
return self.config.id

def log(self, record: Record):
"""Log the record to the persistent record log file."""
ujson.dump(record.to_log(), self.log_file) # pylint: disable=I1101
self.log_file.write("\n")

# with self.log_path.open("a", encoding="utf8") as f:
# for sub_record in record.walk_tree(only_leafs=True):
# ujson.dump(sub_record.to_log(), f)
# f.write("\n")
log.debug(f"Done with {record.uuid}")
31 changes: 3 additions & 28 deletions tests/Reader/test_JSONReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@

from dabapush.Reader.JSONReader import JSONReader, JSONReaderConfiguration


@pytest.fixture
def input_json_directory(isolated_test_dir):
"Pytest fixture creating a directory with 20 json files."
for idx in range(10,30):
for idx in range(10, 30):
file_path = isolated_test_dir / f"test_{idx}.json"
with file_path.open("wt") as out_file:
json.dump({"test_key": idx}, out_file)
out_file.write("\n")
return isolated_test_dir


def test_read(input_json_directory: Path): # pylint: disable=W0621
"""Should read the data from the file."""
reader = JSONReader(
Expand All @@ -28,30 +30,3 @@ def test_read(input_json_directory: Path): # pylint: disable=W0621
print(record)
assert record.processed_at
assert record.payload == {"test_key": int(record.uuid[-7:-5])}


def test_read_with_backlog(input_json_directory: Path): # pylint: disable=W0621
"""Should only read the new data."""
reader = JSONReaderConfiguration(
"test", read_path=str(input_json_directory.resolve()), pattern="*.json"
).get_instance()

def wrapper():
n = None
for n, record in enumerate(reader.read()):
record.done()
return n or 0

n = wrapper()

assert n + 1 == 20

reader2 = JSONReaderConfiguration(
"test", read_path=str(input_json_directory.resolve())
).get_instance()

records2 = list(reader2.read())
log_path = input_json_directory / ".dabapush/test.jsonl"
assert log_path.exists()
assert len(reader2.back_log) == 20
assert len(records2) == 0
32 changes: 0 additions & 32 deletions tests/Reader/test_NDJSONReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,3 @@ def test_read(isolated_test_dir: Path, data): # pylint: disable=W0621
for n, record in enumerate(records):
assert record.processed_at
assert record.payload == data[n]


def test_read_with_backlog(isolated_test_dir: Path, data): # pylint: disable=W0621
"""Should only read the new data."""
reader = NDJSONReaderConfiguration(
"test", read_path=str(isolated_test_dir.resolve()), pattern="*.ndjson"
).get_instance()
file_path = isolated_test_dir / "test.ndjson"
with file_path.open("wt") as file:
for line in data:
json.dump(line, file)
file.write("\n")

def wrapper():
n = None
for n, record in enumerate(reader.read()):
record.done()
return n or 0

n = wrapper()

assert n + 1 == 20

reader2 = NDJSONReaderConfiguration(
"test", read_path=str(isolated_test_dir.resolve())
).get_instance()

records2 = list(reader2.read())
log_path = isolated_test_dir / ".dabapush/test.jsonl"
assert log_path.exists()
assert len(reader2.back_log) == 20
assert len(records2) == 0
33 changes: 28 additions & 5 deletions tests/Writer/test_Writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Tests for the Writer class."""

# pylint: disable=W0621, C0114, C0115, C0116
# pylint: disable=W0212, W0621, C0114, C0115, C0116
from pytest import fixture

from dabapush.Configuration.WriterConfiguration import WriterConfiguration
Expand All @@ -9,7 +9,8 @@


@fixture
def writer() -> Writer:
def writer(monkeypatch, tmp_path) -> Writer:
monkeypatch.chdir(tmp_path)
config = WriterConfiguration(name="test")
return Writer(config)

Expand All @@ -36,15 +37,37 @@ def __init__(self, config):

def persist(self):
self.persisted_data.extend((_.payload for _ in self.buffer))
self.buffer = []


def test_writer_persist_method():
def test_writer_persist_method(monkeypatch, tmp_path):
"""Should persist the buffer."""
monkeypatch.chdir(tmp_path)

config = WriterConfiguration(name="test", id=1, chunk_size=3)
writer = MyTestWriter(config)
queue = (Record(uuid=str(i), payload=i) for i in range(10))
writer.write(queue)
writer.persist()
writer._trigger_persist()
assert writer.persisted_data == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert not writer.buffer


def test_writer_persist_method_with_backlog(monkeypatch, tmp_path):
"""Should persist the buffer."""
monkeypatch.chdir(tmp_path)

config = WriterConfiguration(name="test", id=1, chunk_size=3)
queue = [Record(uuid=str(i), payload=i) for i in range(10)]
writer = MyTestWriter(config)
writer.write((_ for _ in queue))
writer._trigger_persist()

del writer

writer = MyTestWriter(config)
writer._trigger_persist()

assert not writer.persisted_data
assert not writer.buffer
assert len(writer.back_log) == 10
assert all(_.uuid == str(i) for i, _ in enumerate(writer.back_log))

0 comments on commit 6df7525

Please sign in to comment.