Skip to content

Commit

Permalink
Merge pull request #9 from sebastianswms/tsv-jsonl-caching
Browse files Browse the repository at this point in the history
feat: Support for TSVs, JSONL, and caching
  • Loading branch information
visch authored Jun 23, 2023
2 parents 7c9f4c7 + e609331 commit 21b8d12
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 52 deletions.
37 changes: 22 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ pipx install git+https://github.com/MeltanoLabs/tap-file.git

### Accepted Config Options

| Setting | Required | Default | Description |
|:-----------------------|:--------:|:-------:|:------------|
| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. |
| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, `s3-bucket-name`. |
| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. Note that if you want to sync a subdirectory, use the `filepath` setting instead. |
| s3_anonymous_connection| False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| cache_mode | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. | <!-- Manually added entries begin below. -->
| Setting | Required | Default | Description |
|:----------------------------|:--------:|:-------:|:------------|
| stream_name | False | file | The name of the stream that is output by the tap. |
| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. |
| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. |
| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. |
| file_type | False | detect | Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates how to determine a file's type. If set to `detect`, file names containing a matching extension will be read as that type and other files will not be read. If set to a file type, *all* files will be read as that type. |
| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. |
| delimiter | False | detect | The character used to separate records in a CSV/TSV. Can be any character or the special value `detect`. If a character is provided, all CSV and TSV files will use that value. `detect` will use `,` for CSV files and `\t` for TSV files. |
| quote_character | False | " | The character used to indicate when a record in a CSV contains a delimiter character. |
| jsonl_sampling_strategy | False | first | The strategy determining how to read the keys in a JSONL file. Must be one of `first` or `all`. Currently, only `first` is supported, which will assume that the first record in a file is representative of all keys. |
| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `blob`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `blob` will deliver each JSONL row as a JSON object with no internal schema. Currently, only `any` and `string` are supported. |
| s3_anonymous_connection | False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| caching_strategy | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. One of `none`, `once`, or `persistent`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. | <!-- Manually added entries begin below. -->
| batch_config | False | None | Object containing batch configuration information, as specified in the [Meltano documentation](https://sdk.meltano.com/en/latest/batch.html). Has two child objects: `encoding` and `storage`. |
| batch_config.encoding | False | None | Object containing information about how to encode batch information. Has two child entries: `format` and `compression`. |
| batch_config.storage | False | None | Object containing information about how batch files should be stored. Has two child entries: `root` and `prefix`. |
Expand All @@ -57,9 +64,9 @@ config:
root: file:///foo/bar
prefix: batch-
```
```json
```python
{
// ... other config options ...
# ... other config options ...
"batch_config": {
"encoding": {
"format": "jsonl",
Expand Down
11 changes: 9 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,30 @@ plugins:
extractors:
- name: tap-file
namespace: tap_file
pip_url: -e .[s3]
pip_url: -e .
capabilities:
- state
- catalog
- discover
- about
- stream-maps
settings:
- name: stream_name
- name: protocol
- name: filepath
- name: file_regex
- name: file_type
- name: compression
- name: delimiter
- name: quote_character
- name: jsonl_sampling_strategy
- name: jsonl_type_coercion_strategy
- name: s3_anonymous_connection
- name: AWS_ACCESS_KEY_ID
kind: password
- name: AWS_SECRET_ACCESS_KEY
kind: password
- name: cache_mode
- name: caching_strategy
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
31 changes: 29 additions & 2 deletions tap_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def get_files(self) -> Generator[str, None, None]:

if empty:
msg = (
"No files found. Choose a different `filepath` or try a more "
"lenient `file_regex`."
"No files found. Choose a different `filepath` or try a more lenient "
"`file_regex`."
)
raise RuntimeError(msg)

Expand All @@ -72,6 +72,33 @@ def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]:
msg = "get_rows must be implemented by subclass."
raise NotImplementedError(msg)

def get_compression(self, file: str) -> str | None: # noqa: PLR0911
"""Determines what compression encoding is appropraite for a given file.
Args:
file: The file to determine the encoding of.
Returns:
A string representing the appropriate compression encoding, or `None` if no
compression is needed or if a compression encoding can't be determined.
"""
compression: str = self.config["compression"]
if compression == "none":
return None
if compression != "detect":
return compression
if re.match(".*\\.zip$", file):
return "zip"
if re.match(".*\\.bz2$", file):
return "bz2"
if re.match(".*\\.gz(ip)?$", file):
return "gzip"
if re.match(".*\\.lzma$", file):
return "lzma"
if re.match(".*\\.xz$", file):
return "xz"
return None

def get_records(
self,
context: dict | None, # noqa: ARG002
Expand Down
14 changes: 7 additions & 7 deletions tap_file/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@ def get_filesystem(self) -> fsspec.AbstractFileSystem:
"""
self._check_config()
protocol = self.config["protocol"]
cache_mode = self.config["cache_mode"]
caching_strategy = self.config["caching_strategy"]

if protocol == "file":
return fsspec.filesystem("file")

if cache_mode == "once":
if caching_strategy == "once":
return fsspec.filesystem(
"filecache",
target_protocol=self.config["protocol"],
target_options=self._get_args(),
)
if cache_mode == "persistent":
if caching_strategy == "persistent":
return fsspec.filesystem(
"filecache",
target_protocol=self.config["protocol"],
target_options=self._get_args(),
cache_storage=tempfile.gettempdir(),
)
if cache_mode == "none":
if caching_strategy == "none":
return fsspec.filesystem(
protocol=self.config["protocol"],
**self._get_args(),
Expand All @@ -79,12 +79,12 @@ def _get_args(self) -> dict[str, Any]:

def _check_config(self) -> None:
protocol = self.config["protocol"]
cache_mode = self.config["cache_mode"]
caching_strategy = self.config["caching_strategy"]

if protocol not in {"file", "s3"}:
msg = f"Protocol '{protocol}' is not valid."
raise ValueError(msg)

if cache_mode not in {"none", "once", "persistent"}:
msg = f"Cache mode '{cache_mode}' is not valid."
if caching_strategy not in {"none", "once", "persistent"}:
msg = f"Caching strategy '{caching_strategy}' is not valid."
raise ValueError(msg)
142 changes: 124 additions & 18 deletions tap_file/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,151 @@
from __future__ import annotations

import csv
import json
import re
from functools import cached_property
from typing import Any, Generator

from tap_file.client import FileStream


class CSVStream(FileStream):
"""Stream for reading CSVs."""

name = "CSV"
class DelimitedStream(FileStream):
"""Stream for reading CSVs and TSVs."""

def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]:
"""Retrive all rows from all CSVs.
"""Retrive all rows from all *SVs.
Yields:
A dictionary containing information about a row in a CSV.
A dictionary containing information about a row in a *SV.
"""
for file in self.get_files():
with self.filesystem.open(path=file, mode="rt") as f:
reader = csv.DictReader(f)
for row in reader:
yield row
for reader in self._get_readers():
for row in reader:
yield row

@cached_property
def schema(self) -> dict[str, dict]:
"""Create a schema for a CSV file.
"""Create a schema for a *SV file.
Each column in the CSV will have its own entry in the schema. All entries will
Each column in the *SV will have its own entry in the schema. All entries will
be of the form: `'FIELD_NAME': {'type': ['null', 'string']}`
Returns:
A schema representing a CSV.
A schema representing a *SV.
"""
properties = {}

for reader in self._get_readers():
for field in reader.fieldnames:
properties.update({field: {"type": ["null", "string"]}})

return {"properties": properties}

def _get_readers(self) -> Generator[csv.DictReader[str], None, None]:
quote_character: str = self.config["quote_character"]

for file in self.get_files():
if self.config["delimiter"] == "detect":
if re.match(".*\\.csv.*", file):
delimiter = ","
elif re.match(".*\\.tsv.*", file):
delimiter = "\t"
else:
msg = (
"Configuration option 'delimiter' is set to 'detect' but a "
"non-csv non-tsv file is present. Please manually specify "
"'delimiter'."
)
raise RuntimeError(msg)
else:
delimiter = self.config["delimiter"]

with self.filesystem.open(
path=file,
mode="rt",
compression=self.get_compression(file=file),
) as f:
yield csv.DictReader(f, delimiter=delimiter, quotechar=quote_character)


class JSONLStream(FileStream):
"""Stream for reading JSON files."""

def get_rows(self) -> Generator[dict[str, Any], None, None]:
"""Retrive all rows from all JSONL files.
Yields:
A dictionary containing information about a row in a JSONL file.
"""
for file in self.get_files():
with self.filesystem.open(path=file, mode="rt") as f:
reader = csv.DictReader(f)
for field in reader.fieldnames:
properties.update({field: {"type": ["null", "string"]}})
with self.filesystem.open(
path=file,
mode="rt",
compression=self.get_compression(file=file),
) as f:
for row in f:
yield self._pre_process(json.loads(row))

@cached_property
def schema(self) -> dict[str, dict]:
"""Create a schema for a JSONL file.
The format of the schema will depend on the jsonl_type_coercion_strategy config
option, but will always be a dictionary of field names and associated types.
Returns:
A schema representing a JSONL file.
"""
properties = {}
for field in self._get_fields():
properties.update(self._get_property(field=field))
return {"properties": properties}

def _get_property(self, field: str) -> dict[str, dict[str, list[str]]]:
strategy = self.config["jsonl_type_coercion_strategy"]
if strategy == "any":
return {
field: {
"type": [
"null",
"boolean",
"integer",
"number",
"string",
"array",
"object",
],
},
}
if strategy == "string":
return {field: {"type": ["null", "string"]}}
if strategy == "blob":
return {field: {"type": ["null", "object"]}}
msg = f"The coercion strategy '{strategy}' is not valid."
raise ValueError(msg)

def _get_fields(self) -> Generator[str, None, None]:
strategy = self.config["jsonl_sampling_strategy"]
if strategy == "first":
try:
yield from next(self.get_rows())
except StopIteration:
return
return
if strategy == "all":
msg = f"The sampling strategy '{strategy}' has not been implemented."
raise NotImplementedError(msg)
msg = f"The sampling strategy '{strategy}' is not valid."
raise ValueError(msg)

def _pre_process(self, row: dict[str, Any]) -> dict[str, Any]:
strategy = self.config["jsonl_type_coercion_strategy"]
if strategy == "any":
return row
if strategy == "string":
for entry in row:
row[entry] = str(row[entry])
return row
if strategy == "blob":
return {"record": row}
msg = f"The coercion strategy '{strategy}' is not valid."
raise ValueError(msg)
Loading

0 comments on commit 21b8d12

Please sign in to comment.