Skip to content

Commit

Permalink
Merge pull request #69 from Leibniz-HBI/json-dir
Browse files Browse the repository at this point in the history
Add reader for JSON files.
  • Loading branch information
pekasen authored Jan 21, 2025
2 parents ddfb283 + dacf269 commit 7fa04d6
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 1 deletion.
97 changes: 97 additions & 0 deletions dabapush/Reader/JSONReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""NDJSON Writer plug-in for dabapush"""

# pylint: disable=R,I1101
from typing import Iterator

import ujson

from ..Configuration.ReaderConfiguration import ReaderConfiguration
from ..Record import Record
from ..utils import flatten
from .Reader import FileReader



class JSONReader(FileReader):
"""Reader to read ready to read directories containing multiple json files.
It matches files in the path-tree against the pattern and reads the
content of each file as JSON.
Attributes
----------
config: DirectoryJSONReaderConfiguration
The configuration file used for reading
"""

def __init__(self, config: "JSONReaderConfiguration") -> None:
super().__init__(config)
self.config = config

def read(self) -> Iterator[Record]:
"""reads multiple JSON files and emits them."""

for file_record in self.records:
with file_record.payload.open("rt", encoding="utf8") as json_file:
parsed = ujson.load(json_file)
record = Record(
uuid=f"{str(file_record.uuid)}",
payload=(
parsed
if not self.config.flatten_dicts
else flatten(parsed)
),
source=file_record,
)
if record not in self.back_log:
yield record


class JSONReaderConfiguration(ReaderConfiguration):
"""Read directory containing JSON files.
Attributes
----------
flatten_dicts: bool
whether to flatten those nested dicts
"""

yaml_tag = "!dabapush:JSONReaderConfiguration"
"""internal tag for pyYAML
"""

def __init__(
self,
name,
id=None, # pylint: disable=W0622
read_path: str = ".",
pattern: str = "*.json",
flatten_dicts=True,
) -> None:
"""
Parameters
----------
name: str
target pipeline name
id : UUID
ID of the instance (default value = None, is set by super class)
read_path: str
path to directory to read
pattern: str
filename pattern to match files in `read_path` against
flatten_dicts: bool
whether nested dictionaries are flattend (for details see `dabapush.utils.flatten`)
"""
super().__init__(name, id=id, read_path=read_path, pattern=pattern)
self.flatten_dicts = flatten_dicts

def get_instance(self) -> JSONReader: # pylint: disable=W0221
"""Get a configured instance of NDJSONReader
Returns
-------
type: JSONReader
Configured JSONReader instance
"""
return JSONReader(self)
2 changes: 1 addition & 1 deletion dabapush/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"""

__version__ = "0.4.0-alpha7"
__version__ = "0.4.0-alpha8"

from .Reader import (
NDJSONReaderConfiguration,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.plugins.dabapush_readers]
"Twacapic" = "dabapush.Reader.TwacapicReader:TwacapicReaderConfiguration"
"NDJSON" = "dabapush.Reader.NDJSONReader:NDJSONReaderConfiguration"
"JSON" = "dabapush.Reader.JSONReader:JSONReaderConfiguration"
"Tegracli" = "dabapush.Reader.tegracli_reader:TegracliReaderConfiguration"

[tool.poetry.plugins.dabapush_writers]
Expand Down
57 changes: 57 additions & 0 deletions tests/Reader/test_JSONReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Tests for NDJSONReader."""

import json
from pathlib import Path

import pytest

from dabapush.Reader.JSONReader import JSONReader, JSONReaderConfiguration

@pytest.fixture
def input_json_directory(isolated_test_dir):
"Pytest fixture creating a directory with 20 json files."
for idx in range(10,30):
file_path = isolated_test_dir / f"test_{idx}.json"
with file_path.open("wt") as out_file:
json.dump({"test_key": idx}, out_file)
out_file.write("\n")
return isolated_test_dir

def test_read(input_json_directory: Path): # pylint: disable=W0621
"""Should read the data from the file."""
reader = JSONReader(
JSONReaderConfiguration("test", read_path=str(input_json_directory.resolve()))
)
records = list(reader.read())
assert len(records) == 20
for record in records:
print(record)
assert record.processed_at
assert record.payload == {"test_key": int(record.uuid[-7:-5])}


def test_read_with_backlog(input_json_directory: Path): # pylint: disable=W0621
"""Should only read the new data."""
reader = JSONReaderConfiguration(
"test", read_path=str(input_json_directory.resolve()), pattern="*.json"
).get_instance()

def wrapper():
n = None
for n, record in enumerate(reader.read()):
record.done()
return n or 0

n = wrapper()

assert n + 1 == 20

reader2 = JSONReaderConfiguration(
"test", read_path=str(input_json_directory.resolve())
).get_instance()

records2 = list(reader2.read())
log_path = input_json_directory / ".dabapush/test.jsonl"
assert log_path.exists()
assert len(reader2.back_log) == 20
assert len(records2) == 0

0 comments on commit 7fa04d6

Please sign in to comment.