Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic refactoring + unit tests #14

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data-toolset is designed to simplify your data processing tasks by providing a m

## Installation

Python 3.9 and 3.10 are supported and tested (to some extent).
Python 3.8, Python 3.9 and 3.10 are supported and tested (to some extent).

```bash
python -m pip install --user data-toolset
Expand Down Expand Up @@ -97,9 +97,5 @@ Contributions are welcome! If you have any suggestions, bug reports, or feature

# TODO

- make parquet validation work with avsc schemas?
- create schema_evolution function
- mature create_sample function
- optimizations [TBD]
- support 3.11+
- benchmarking
6 changes: 6 additions & 0 deletions src/data_toolset/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,29 @@ def init_args() -> Namespace:
to_json_parser = subparsers.add_parser("to_json", help="Convert a file to JSON format")
to_json_parser.add_argument("file_path", type=Path, action="store", help="Path to the file to convert")
to_json_parser.add_argument("output_path", type=Path, action="store", help="Path to the output JSON file")
to_json_parser.add_argument("--pretty", default=False, type=bool, action="store")

# data-toolset to_csv
to_csv_parser = subparsers.add_parser("to_csv", help="Convert a file to CSV format")
to_csv_parser.add_argument("file_path", type=Path, action="store", help="Path to the file to convert")
to_csv_parser.add_argument("output_path", type=Path, action="store", help="Path to the output CSV file")
to_csv_parser.add_argument("--has_header", default=True, type=bool, action="store")
to_csv_parser.add_argument("--delimiter", default=",", type=str, action="store", help="The delimiter character used in the CSV file.")
to_csv_parser.add_argument("--line_terminator", default="\n", type=str, action="store")
to_csv_parser.add_argument("--quote", default="\"", type=str, action="store")

# data-toolset to_avro
to_avro_parser = subparsers.add_parser("to_avro", help="Convert a file to Avro format")
to_avro_parser.add_argument("file_path", type=Path, action="store", help="Path to the file to convert")
to_avro_parser.add_argument("output_path", type=Path, action="store", help="Path to the output Avro file")
to_avro_parser.add_argument("--compression", choices=["uncompressed", "snappy", "deflate"], default="uncompressed", action="store")

# data-toolset to_parquet
to_parquet_parser = subparsers.add_parser("to_parquet", help="Convert a file to Parquet format")
to_parquet_parser.add_argument("file_path", type=Path, action="store", help="Path to the file to convert")
to_parquet_parser.add_argument("output_path", type=Path, action="store", help="Path to the output Parquet file")
to_parquet_parser.add_argument("--compression", choices=[
"lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd"], default="uncompressed", action="store")

# data-toolset random_sample
random_sample_parser = subparsers.add_parser("random_sample", help="Randomly sample records from a file")
Expand Down
84 changes: 59 additions & 25 deletions src/data_toolset/utils/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,6 @@ def to_arrow_table(cls, file_path: Path) -> pa.Table:
table = pa.Table.from_pylist(list(avro_reader))
return table

@classmethod
def write_arrow_table(cls, table: pa.Table, file_path: Path) -> None:
"""
Write an Arrow Table to an Avro file.

:param table: Arrow Table to write to the Avro file.
:type table: pa.Table
:param file_path: Path to the Avro file to write.
:type file_path: Path
"""
with open(file_path, "wb") as f:
fastavro.writer(f, table.schema, table.to_pydict().values())

@classmethod
def validate_format(cls, file_path: Path) -> None:
"""
Expand Down Expand Up @@ -237,41 +224,88 @@ def validate(cls, file_path: Path, schema_path: Path = None) -> None:
logging.info("File is a valid Avro file.")

@classmethod
def to_json(cls, file_path: Path, output_path: Path) -> None:
def to_json(cls, file_path: Path, output_path: Path, pretty: bool = False) -> None:
"""
Convert an Avro file to a JSON file.

:param file_path: Path to the Avro file to convert.
:type file_path: Path
:param output_path: Path to the output JSON file.
:type output_path: Path
:param pretty: Whether to format the JSON file with indentation (default is False).
:type pretty: bool
"""
with open(file_path, "rb") as f:
data = list(fastavro.reader(f))
with open(output_path, "w") as json_file:
json.dump(data, json_file, sort_keys=True, indent=4)
avro_reader = fastavro.reader(f)
df = polars.from_records(list(avro_reader))
df.write_json(file=output_path, pretty=pretty, row_oriented=True)

@classmethod
def to_csv(cls, file_path: Path, output_path: Path, has_header: bool = True, delimiter: str = ",") -> None:
def to_csv(cls, file_path: Path, output_path: Path, has_header: bool = True, delimiter: str = ",",
line_terminator: str = "\n", quote: str = '\"') -> None:
"""
Convert an Avro file to a CSV file.

:param file_path: Path to the Avro file to convert.
:type file_path: Path
:param output_path: Path to the output CSV file.
:type output_path: Path
:param has_header: Whether the CSV file should include a header row (default is True).
:type has_header: bool
:param delimiter: The character used to separate fields in the CSV (default is ',').
:type delimiter: str
:param line_terminator: The character(s) used to terminate lines in the CSV (default is '\n').
:type line_terminator: str
:param quote: The character used to enclose fields in quotes (default is '\"').
:type quote: str
"""
with open(file_path, "rb") as f:
avro_reader = fastavro.reader(f)
df = polars.from_records(list(avro_reader))
df.write_csv(file=output_path, has_header=has_header, separator=delimiter)
df.write_csv(file=output_path, has_header=has_header, separator=delimiter, line_terminator=line_terminator,
quote=quote)

@classmethod
def to_avro(cls, file_path: Path, output_path: Path) -> None:
pass
def to_parquet(cls, file_path: Path, output_path: Path,
compression: T.Literal[
"lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd"] = "uncompressed") -> None:
"""
Convert an Avro file to a Parquet file.

@classmethod
def to_parquet(cls, file_path: Path, output_path: Path) -> None:
:param file_path: Path to the Avro file to convert.
:type file_path: Path
:param output_path: Path to the output Parquet file.
:type output_path: Path
:param compression: The compression method to use for the Parquet file (default is 'uncompressed').
:type compression: str
"""
table = cls.to_arrow_table(file_path)
df = polars.from_arrow(table)
df.write_parquet(file=output_path)

@classmethod
def random_sample(cls, file_path: Path, output_path: Path, n: int, fraction: float = None) -> None:
def random_sample(cls, file_path: Path, output_path: Path, n: int = None, fraction: float = None,
with_replacement: bool = False, shuffle: bool = False, seed: T.Any = None) -> None:
"""
Create a random sample from an Avro file and save it as an Avro file.

:param file_path: Path to the Avro file to sample from.
:type file_path: Path
:param output_path: Path to the output Avro file for the random sample.
:type output_path: Path
:param n: The number of records to include in the random sample.
:type n: int
:param fraction: The fraction of records to include in the random sample (alternative to 'n').
:type fraction: float
:param with_replacement: Whether to sample with replacement (default is False).
:type with_replacement: bool
:param shuffle: Whether to shuffle the input data before sampling (default is False).
:type shuffle: bool
:param seed: The seed for the random number generator (optional).
:type seed: Any
"""
with open(file_path, "rb") as f:
avro_reader = fastavro.reader(f)
df = polars.from_records(list(avro_reader))
sample_df = df.sample(n=n, fraction=fraction)
sample_df = df.sample(n=n, fraction=fraction, with_replacement=with_replacement, shuffle=shuffle, seed=seed)
sample_df.write_avro(output_path)
26 changes: 12 additions & 14 deletions src/data_toolset/utils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ class BaseUtils(ABC, UtilMixin):
def to_arrow_table(cls, file_path: Path) -> pa.Table:
...

@classmethod
@abstractmethod
def write_arrow_table(cls, table: pa.Table, file_path: Path) -> None:
...

@classmethod
@abstractmethod
def validate_format(cls, file_path: Path) -> None:
Expand Down Expand Up @@ -67,23 +62,25 @@ def validate(cls, file_path: Path, schema_path: Path = None) -> None:

@classmethod
@abstractmethod
def to_json(cls, file_path: Path, output_path: Path) -> None:
def to_json(cls, file_path: Path, output_path: Path, pretty: bool = False) -> None:
...

@classmethod
@abstractmethod
def to_csv(cls, file_path: Path, output_path: Path, *, has_header: bool = True, delimiter: str = ",") -> None:
def to_csv(cls, file_path: Path, output_path: Path, has_header: bool = True, delimiter: str = ",",
line_terminator: str = "\n", quote: str = '\"') -> None:
...

@classmethod
@abstractmethod
def to_avro(cls, file_path: Path, output_path: Path) -> None:
...
def to_avro(cls, file_path: Path, output_path: Path,
compression: T.Literal["uncompressed", "snappy", "deflate"] = "uncompressed") -> None:
pass

@classmethod
@abstractmethod
def to_parquet(cls, file_path: Path, output_path: Path) -> None:
...
def to_parquet(cls, file_path: Path, output_path: Path,
compression: T.Literal[
"lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd"] = "uncompressed") -> None:
pass

@classmethod
def query(cls, file_path: Path, query_expression: str, *, chunk_size: int = 1000000) -> polars.DataFrame:
Expand Down Expand Up @@ -131,5 +128,6 @@ def query(cls, file_path: Path, query_expression: str, *, chunk_size: int = 1000
return df

@classmethod
def random_sample(cls, file_path: Path, output_path: Path, *, n: int, fraction: float = None):
@abstractmethod
def random_sample(cls, file_path: Path, output_path: Path, *, n: int = None, fraction: float = None):
...
15 changes: 0 additions & 15 deletions src/data_toolset/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,6 @@ def has_comparison_methods(cls, obj):
except TypeError:
return False

@staticmethod
def _print_table(table):
# Convert table to dictionary of lists
data_dict = table.to_pydict()

# Get list of column names
column_names = table.column_names

# Iterate over rows and print as dictionaries
for i in range(len(data_dict[column_names[0]])):
row_dict = {}
for col_name in column_names:
row_dict[col_name] = data_dict[col_name][i]
print(row_dict)

@staticmethod
def print_metadata(schema, metadata, codec, serialized_size):
print(f"Schema: {schema}")
Expand Down
71 changes: 47 additions & 24 deletions src/data_toolset/utils/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,6 @@ def to_arrow_table(cls, file_path: Path) -> pa.Table:
table = pa.parquet.read_table(file_path)
return table

@classmethod
def write_arrow_table(cls, table: pa.Table, file_path: Path) -> None:
"""
Write an Arrow Table to a Parquet file.

:param table: Arrow Table to write to the Parquet file.
:type table: pa.Table
:param file_path: Path to the Parquet file to write.
:type file_path: Path
"""
pa.parquet.write_table(table, file_path)

@classmethod
def validate_format(cls, file_path: Path) -> None:
"""
Expand Down Expand Up @@ -217,47 +205,82 @@ def validate(cls, file_path: Path, schema_path: Path = None) -> None:
logging.info("File is a valid Parquet file.")

@classmethod
def to_json(cls, file_path: Path, output_path: Path) -> None:
def to_json(cls, file_path: Path, output_path: Path, pretty: bool = False) -> None:
"""
Convert an Parquet file to a JSON file.

:param file_path: Path to the Parquet file to convert.
:type file_path: Path
:param output_path: Path to the output JSON file.
:type output_path: Path
:param pretty: Whether to format the JSON file with indentation (default is False).
:type pretty: bool
"""
table = cls.to_arrow_table(file_path)
df = polars.from_arrow(table)
df.write_json(file=output_path, row_oriented=True)
df.write_json(file=output_path, pretty=pretty, row_oriented=True)

@classmethod
def to_csv(cls, file_path: Path, output_path: Path, has_header: bool = True, delimiter: str = ",") -> None:
def to_csv(cls, file_path: Path, output_path: Path, has_header: bool = True, delimiter: str = ",",
line_terminator: str = "\n", quote: str = '\"') -> None:
"""
Convert an Parquet file to a CSV file.

:param file_path: Path to the Parquet file to convert.
:type file_path: Path
:param output_path: Path to the output CSV file.
:type output_path: Path
:param delimiter: The delimiter character used in the CSV file (default is comma).
:param has_header: Whether the CSV file should include a header row (default is True).
:type has_header: bool
:param delimiter: The character used to separate fields in the CSV (default is ',').
:type delimiter: str
:param line_terminator: The character(s) used to terminate lines in the CSV (default is '\n').
:type line_terminator: str
:param quote: The character used to enclose fields in quotes (default is '\"').
:type quote: str
"""
table = cls.to_arrow_table(file_path)
df = polars.from_arrow(table)
df.write_csv(file=output_path, has_header=has_header, separator=delimiter)
df.write_csv(file=output_path, has_header=has_header, separator=delimiter, line_terminator=line_terminator, quote=quote)

@classmethod
def to_avro(cls, file_path: Path, output_path: Path) -> None:
def to_avro(cls, file_path: Path, output_path: Path,
compression: T.Literal["uncompressed", "snappy", "deflate"] = "uncompressed") -> None:
"""
Convert an Parquet file to a Avro file.

:param file_path: Path to the Avro file to convert.
:type file_path: Path
:param output_path: Path to the output Avro file.
:type output_path: Path
:param compression: The compression method to use for the Parquet file (default is 'uncompressed').
:type compression: str
"""
# @TODO(kirillb): not supporting timestamps at the moment
df = polars.read_parquet(source=file_path)
df.write_avro(file=output_path)
df.write_avro(file=output_path, compression=compression)

@classmethod
def to_parquet(cls, file_path: Path, output_path: Path) -> None:
pass
def random_sample(cls, file_path: Path, output_path: Path, n: int = None, fraction: float = None,
with_replacement: bool = False, shuffle: bool = False, seed: T.Any = None) -> None:
"""
Create a random sample from an Parquet file and save it as an Parquet file.

@classmethod
def random_sample(cls, file_path: Path, output_path: Path, n: int, fraction: float = None) -> None:
:param file_path: Path to the Parquet file to sample from.
:type file_path: Path
:param output_path: Path to the output Avro file for the random sample.
:type output_path: Path
:param n: The number of records to include in the random sample.
:type n: int
:param fraction: The fraction of records to include in the random sample (alternative to 'n').
:type fraction: float
:param with_replacement: Whether to sample with replacement (default is False).
:type with_replacement: bool
:param shuffle: Whether to shuffle the input data before sampling (default is False).
:type shuffle: bool
:param seed: The seed for the random number generator (optional).
:type seed: Any
"""
df = polars.read_parquet(source=file_path)
sample_df = df.sample(n=n, fraction=fraction)
sample_df = df.sample(n=n, fraction=fraction, with_replacement=with_replacement, shuffle=shuffle, seed=seed)
sample_df.write_parquet(output_path)
Loading