Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reader for JSON files. #69

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions dabapush/Reader/JSONReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""NDJSON Writer plug-in for dabapush"""

# pylint: disable=R,I1101
from typing import Iterator

import ujson

from ..Configuration.ReaderConfiguration import ReaderConfiguration
from ..Record import Record
from ..utils import flatten
from .Reader import FileReader



class JSONReader(FileReader):
"""Reader to read ready to read directories containing multiple json files.
It matches files in the path-tree against the pattern and reads the
content of each file as JSON.

Attributes
----------
config: DirectoryJSONReaderConfiguration
The configuration file used for reading
"""

def __init__(self, config: "JSONReaderConfiguration") -> None:
super().__init__(config)
self.config = config

def read(self) -> Iterator[Record]:
"""reads multiple JSON files and emits them."""

for file_record in self.records:
with file_record.payload.open("rt", encoding="utf8") as json_file:
parsed = ujson.load(json_file)
record = Record(
uuid=f"{str(file_record.uuid)}",
payload=(
parsed
if not self.config.flatten_dicts
else flatten(parsed)
),
source=file_record,
)
if record not in self.back_log:
yield record


class JSONReaderConfiguration(ReaderConfiguration):
"""Read directory containing JSON files.

Attributes
----------
flatten_dicts: bool
whether to flatten those nested dicts

"""

yaml_tag = "!dabapush:JSONReaderConfiguration"
"""internal tag for pyYAML
"""

def __init__(
self,
name,
id=None, # pylint: disable=W0622
read_path: str = ".",
pattern: str = "*.json",
flatten_dicts=True,
) -> None:
"""
Parameters
----------
name: str
target pipeline name
id : UUID
ID of the instance (default value = None, is set by super class)
read_path: str
path to directory to read
pattern: str
filename pattern to match files in `read_path` against
flatten_dicts: bool
whether nested dictionaries are flattend (for details see `dabapush.utils.flatten`)

"""
super().__init__(name, id=id, read_path=read_path, pattern=pattern)
self.flatten_dicts = flatten_dicts

def get_instance(self) -> JSONReader: # pylint: disable=W0221
"""Get a configured instance of NDJSONReader

Returns
-------
type: JSONReader
Configured JSONReader instance
"""
return JSONReader(self)
2 changes: 1 addition & 1 deletion dabapush/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"""

__version__ = "0.4.0-alpha7"
__version__ = "0.4.0-alpha8"

from .Reader import (
NDJSONReaderConfiguration,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.plugins.dabapush_readers]
"Twacapic" = "dabapush.Reader.TwacapicReader:TwacapicReaderConfiguration"
"NDJSON" = "dabapush.Reader.NDJSONReader:NDJSONReaderConfiguration"
"JSON" = "dabapush.Reader.JSONReader:JSONReaderConfiguration"
"Tegracli" = "dabapush.Reader.tegracli_reader:TegracliReaderConfiguration"

[tool.poetry.plugins.dabapush_writers]
Expand Down
57 changes: 57 additions & 0 deletions tests/Reader/test_JSONReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Tests for NDJSONReader."""

import json
from pathlib import Path

import pytest

from dabapush.Reader.JSONReader import JSONReader, JSONReaderConfiguration

@pytest.fixture
def input_json_directory(isolated_test_dir):
"Pytest fixture creating a directory with 20 json files."
for idx in range(10,30):
file_path = isolated_test_dir / f"test_{idx}.json"
with file_path.open("wt") as out_file:
json.dump({"test_key": idx}, out_file)
out_file.write("\n")
return isolated_test_dir

def test_read(input_json_directory: Path): # pylint: disable=W0621
"""Should read the data from the file."""
reader = JSONReader(
JSONReaderConfiguration("test", read_path=str(input_json_directory.resolve()))
)
records = list(reader.read())
assert len(records) == 20
for record in records:
print(record)
assert record.processed_at
assert record.payload == {"test_key": int(record.uuid[-7:-5])}


def test_read_with_backlog(input_json_directory: Path): # pylint: disable=W0621
"""Should only read the new data."""
reader = JSONReaderConfiguration(
"test", read_path=str(input_json_directory.resolve()), pattern="*.json"
).get_instance()

def wrapper():
n = None
for n, record in enumerate(reader.read()):
record.done()
return n or 0

n = wrapper()

assert n + 1 == 20

reader2 = JSONReaderConfiguration(
"test", read_path=str(input_json_directory.resolve())
).get_instance()

records2 = list(reader2.read())
log_path = input_json_directory / ".dabapush/test.jsonl"
assert log_path.exists()
assert len(reader2.back_log) == 20
assert len(records2) == 0
Loading