diff --git a/dabapush/Reader/JSONReader.py b/dabapush/Reader/JSONReader.py new file mode 100644 index 0000000..0dabe72 --- /dev/null +++ b/dabapush/Reader/JSONReader.py @@ -0,0 +1,97 @@ +"""NDJSON Writer plug-in for dabapush""" + +# pylint: disable=R,I1101 +from typing import Iterator + +import ujson + +from ..Configuration.ReaderConfiguration import ReaderConfiguration +from ..Record import Record +from ..utils import flatten +from .Reader import FileReader + + + +class JSONReader(FileReader): + """Reader to read ready to read directories containing multiple json files. + It matches files in the path-tree against the pattern and reads the + content of each file as JSON. + + Attributes + ---------- + config: DirectoryJSONReaderConfiguration + The configuration file used for reading + """ + + def __init__(self, config: "JSONReaderConfiguration") -> None: + super().__init__(config) + self.config = config + + def read(self) -> Iterator[Record]: + """reads multiple JSON files and emits them.""" + + for file_record in self.records: + with file_record.payload.open("rt", encoding="utf8") as json_file: + parsed = ujson.load(json_file) + record = Record( + uuid=f"{str(file_record.uuid)}", + payload=( + parsed + if not self.config.flatten_dicts + else flatten(parsed) + ), + source=file_record, + ) + if record not in self.back_log: + yield record + + +class JSONReaderConfiguration(ReaderConfiguration): + """Read directory containing JSON files. + + Attributes + ---------- + flatten_dicts: bool + whether to flatten those nested dicts + + """ + + yaml_tag = "!dabapush:JSONReaderConfiguration" + """internal tag for pyYAML + """ + + def __init__( + self, + name, + id=None, # pylint: disable=W0622 + read_path: str = ".", + pattern: str = "*.json", + flatten_dicts=True, + ) -> None: + """ + Parameters + ---------- + name: str + target pipeline name + id : UUID + ID of the instance (default value = None, is set by super class) + read_path: str + path to directory to read + pattern: str + filename pattern to match files in `read_path` against + flatten_dicts: bool + whether nested dictionaries are flattend (for details see `dabapush.utils.flatten`) + + """ + super().__init__(name, id=id, read_path=read_path, pattern=pattern) + self.flatten_dicts = flatten_dicts + + def get_instance(self) -> JSONReader: # pylint: disable=W0221 + """Get a configured instance of NDJSONReader + + Returns + ------- + type: JSONReader + Configured JSONReader instance + """ + return JSONReader(self) diff --git a/dabapush/__init__.py b/dabapush/__init__.py index d36efde..b9ad85a 100644 --- a/dabapush/__init__.py +++ b/dabapush/__init__.py @@ -163,7 +163,7 @@ """ -__version__ = "0.4.0-alpha7" +__version__ = "0.4.0-alpha8" from .Reader import ( NDJSONReaderConfiguration, diff --git a/pyproject.toml b/pyproject.toml index db1a9db..fd84382 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.plugins.dabapush_readers] "Twacapic" = "dabapush.Reader.TwacapicReader:TwacapicReaderConfiguration" "NDJSON" = "dabapush.Reader.NDJSONReader:NDJSONReaderConfiguration" +"JSON" = "dabapush.Reader.JSONReader:JSONReaderConfiguration" "Tegracli" = "dabapush.Reader.tegracli_reader:TegracliReaderConfiguration" [tool.poetry.plugins.dabapush_writers] diff --git a/tests/Reader/test_JSONReader.py b/tests/Reader/test_JSONReader.py new file mode 100644 index 0000000..e4e74b8 --- /dev/null +++ b/tests/Reader/test_JSONReader.py @@ -0,0 +1,57 @@ +"""Tests for NDJSONReader.""" + +import json +from pathlib import Path + +import pytest + +from dabapush.Reader.JSONReader import JSONReader, JSONReaderConfiguration + +@pytest.fixture +def input_json_directory(isolated_test_dir): + "Pytest fixture creating a directory with 20 json files." + for idx in range(10,30): + file_path = isolated_test_dir / f"test_{idx}.json" + with file_path.open("wt") as out_file: + json.dump({"test_key": idx}, out_file) + out_file.write("\n") + return isolated_test_dir + +def test_read(input_json_directory: Path): # pylint: disable=W0621 + """Should read the data from the file.""" + reader = JSONReader( + JSONReaderConfiguration("test", read_path=str(input_json_directory.resolve())) + ) + records = list(reader.read()) + assert len(records) == 20 + for record in records: + print(record) + assert record.processed_at + assert record.payload == {"test_key": int(record.uuid[-7:-5])} + + +def test_read_with_backlog(input_json_directory: Path): # pylint: disable=W0621 + """Should only read the new data.""" + reader = JSONReaderConfiguration( + "test", read_path=str(input_json_directory.resolve()), pattern="*.json" + ).get_instance() + + def wrapper(): + n = None + for n, record in enumerate(reader.read()): + record.done() + return n or 0 + + n = wrapper() + + assert n + 1 == 20 + + reader2 = JSONReaderConfiguration( + "test", read_path=str(input_json_directory.resolve()) + ).get_instance() + + records2 = list(reader2.read()) + log_path = input_json_directory / ".dabapush/test.jsonl" + assert log_path.exists() + assert len(reader2.back_log) == 20 + assert len(records2) == 0