Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for TSVs, JSONL, and caching #9

Merged
merged 7 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ pipx install git+https://github.com/MeltanoLabs/tap-file.git

### Accepted Config Options

| Setting | Required | Default | Description |
|:-----------------------|:--------:|:-------:|:------------|
| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. |
| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, `s3-bucket-name`. |
| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. Note that if you want to sync a subdirectory, use the `filepath` setting instead. |
| s3_anonymous_connection| False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| cache_mode | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. | <!-- Manually added entries begin below. -->
| Setting | Required | Default | Description |
|:----------------------------|:--------:|:-------:|:------------|
| stream_name | False | file | The name of the stream that is output by the tap. |
| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. |
| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. |
| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. |
| file_type | False | detect | Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates how to determine a file's type. If set to `detect`, file names containing a matching extension will be read as that type and other files will not be read. If set to a file type, *all* files will be read as that type. |
| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. |
| delimiter | False | detect | The character used to separate records in a CSV/TSV. Can be any character or the special value `detect`. If a character is provided, all CSV and TSV files will use that value. `detect` will use `,` for CSV files and `\t` for TSV files. |
| quote_character | False | " | The character used to indicate when a record in a CSV contains a delimiter character. |
| jsonl_sampling_strategy | False | first | The strategy determining how to read the keys in a JSONL file. Must be one of `first` or `all`. Currently, only `first` is supported, which will assume that the first record in a file is representative of all keys. |
| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `blob`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `blob` will deliver each JSONL row as a JSON object with no internal schema. Currently, only `any` and `string` are supported. |
| s3_anonymous_connection | False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| caching_strategy | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. One of `none`, `once`, or `persistent`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. | <!-- Manually added entries begin below. -->
| batch_config | False | None | Object containing batch configuration information, as specified in the [Meltano documentation](https://sdk.meltano.com/en/latest/batch.html). Has two child objects: `encoding` and `storage`. |
| batch_config.encoding | False | None | Object containing information about how to encode batch information. Has two child entries: `format` and `compression`. |
| batch_config.storage | False | None | Object containing information about how batch files should be stored. Has two child entries: `root` and `prefix`. |
Expand All @@ -57,9 +64,9 @@ config:
root: file:///foo/bar
prefix: batch-
```
```json
```python
{
// ... other config options ...
# ... other config options ...
"batch_config": {
"encoding": {
"format": "jsonl",
Expand Down
11 changes: 9 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,30 @@ plugins:
extractors:
- name: tap-file
namespace: tap_file
pip_url: -e .[s3]
pip_url: -e .
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to install for s3 by default anymore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that what we ended up deciding? I don't remember. When I saw that it was installed by default I thought it was an error that I'd forgotten to remove.

capabilities:
- state
- catalog
- discover
- about
- stream-maps
settings:
- name: stream_name
- name: protocol
- name: filepath
- name: file_regex
- name: file_type
- name: compression
- name: delimiter
- name: quote_character
- name: jsonl_sampling_strategy
- name: jsonl_type_coercion_strategy
- name: s3_anonymous_connection
- name: AWS_ACCESS_KEY_ID
kind: password
- name: AWS_SECRET_ACCESS_KEY
kind: password
- name: cache_mode
- name: caching_strategy
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
31 changes: 29 additions & 2 deletions tap_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def get_files(self) -> Generator[str, None, None]:

if empty:
msg = (
"No files found. Choose a different `filepath` or try a more "
"lenient `file_regex`."
"No files found. Choose a different `filepath` or try a more lenient "
"`file_regex`."
)
raise RuntimeError(msg)

Expand All @@ -72,6 +72,33 @@ def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]:
msg = "get_rows must be implemented by subclass."
raise NotImplementedError(msg)

def get_compression(self, file: str) -> str | None: # noqa: PLR0911
"""Determines what compression encoding is appropraite for a given file.

Args:
file: The file to determine the encoding of.

Returns:
A string representing the appropriate compression encoding, or `None` if no
compression is needed or if a compression encoding can't be determined.
"""
compression: str = self.config["compression"]
if compression == "none":
return None
if compression != "detect":
return compression
if re.match(".*\\.zip$", file):
return "zip"
if re.match(".*\\.bz2$", file):
return "bz2"
if re.match(".*\\.gz(ip)?$", file):
return "gzip"
if re.match(".*\\.lzma$", file):
return "lzma"
if re.match(".*\\.xz$", file):
return "xz"
return None

def get_records(
self,
context: dict | None, # noqa: ARG002
Expand Down
14 changes: 7 additions & 7 deletions tap_file/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@ def get_filesystem(self) -> fsspec.AbstractFileSystem:
"""
self._check_config()
protocol = self.config["protocol"]
cache_mode = self.config["cache_mode"]
caching_strategy = self.config["caching_strategy"]

if protocol == "file":
return fsspec.filesystem("file")

if cache_mode == "once":
if caching_strategy == "once":
return fsspec.filesystem(
"filecache",
target_protocol=self.config["protocol"],
target_options=self._get_args(),
)
if cache_mode == "persistent":
if caching_strategy == "persistent":
return fsspec.filesystem(
"filecache",
target_protocol=self.config["protocol"],
target_options=self._get_args(),
cache_storage=tempfile.gettempdir(),
)
if cache_mode == "none":
if caching_strategy == "none":
return fsspec.filesystem(
protocol=self.config["protocol"],
**self._get_args(),
Expand All @@ -79,12 +79,12 @@ def _get_args(self) -> dict[str, Any]:

def _check_config(self) -> None:
protocol = self.config["protocol"]
cache_mode = self.config["cache_mode"]
caching_strategy = self.config["caching_strategy"]

if protocol not in {"file", "s3"}:
msg = f"Protocol '{protocol}' is not valid."
raise ValueError(msg)

if cache_mode not in {"none", "once", "persistent"}:
msg = f"Cache mode '{cache_mode}' is not valid."
if caching_strategy not in {"none", "once", "persistent"}:
msg = f"Caching strategy '{caching_strategy}' is not valid."
raise ValueError(msg)
142 changes: 124 additions & 18 deletions tap_file/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,151 @@
from __future__ import annotations

import csv
import json
import re
from functools import cached_property
from typing import Any, Generator

from tap_file.client import FileStream


class CSVStream(FileStream):
"""Stream for reading CSVs."""

name = "CSV"
class DelimitedStream(FileStream):
"""Stream for reading CSVs and TSVs."""

def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]:
"""Retrive all rows from all CSVs.
"""Retrive all rows from all *SVs.
sebastianswms marked this conversation as resolved.
Show resolved Hide resolved

Yields:
A dictionary containing information about a row in a CSV.
A dictionary containing information about a row in a *SV.
"""
for file in self.get_files():
with self.filesystem.open(path=file, mode="rt") as f:
reader = csv.DictReader(f)
for row in reader:
yield row
for reader in self._get_readers():
for row in reader:
yield row

@cached_property
def schema(self) -> dict[str, dict]:
"""Create a schema for a CSV file.
"""Create a schema for a *SV file.

Each column in the CSV will have its own entry in the schema. All entries will
Each column in the *SV will have its own entry in the schema. All entries will
be of the form: `'FIELD_NAME': {'type': ['null', 'string']}`

Returns:
A schema representing a CSV.
A schema representing a *SV.
"""
properties = {}

for reader in self._get_readers():
for field in reader.fieldnames:
properties.update({field: {"type": ["null", "string"]}})

return {"properties": properties}

def _get_readers(self) -> Generator[csv.DictReader[str], None, None]:
quote_character: str = self.config["quote_character"]

for file in self.get_files():
if self.config["delimiter"] == "detect":
if re.match(".*\\.csv.*", file):
delimiter = ","
elif re.match(".*\\.tsv.*", file):
delimiter = "\t"
else:
msg = (
"Configuration option 'delimiter' is set to 'detect' but a "
"non-csv non-tsv file is present. Please manually specify "
"'delimiter'."
)
raise RuntimeError(msg)
else:
delimiter = self.config["delimiter"]

with self.filesystem.open(
visch marked this conversation as resolved.
Show resolved Hide resolved
path=file,
mode="rt",
compression=self.get_compression(file=file),
) as f:
yield csv.DictReader(f, delimiter=delimiter, quotechar=quote_character)


class JSONLStream(FileStream):
"""Stream for reading JSON files."""

def get_rows(self) -> Generator[dict[str, Any], None, None]:
"""Retrive all rows from all JSONL files.

Yields:
A dictionary containing information about a row in a JSONL file.
"""
for file in self.get_files():
with self.filesystem.open(path=file, mode="rt") as f:
reader = csv.DictReader(f)
for field in reader.fieldnames:
properties.update({field: {"type": ["null", "string"]}})
with self.filesystem.open(
path=file,
mode="rt",
compression=self.get_compression(file=file),
) as f:
for row in f:
yield self._pre_process(json.loads(row))

@cached_property
def schema(self) -> dict[str, dict]:
"""Create a schema for a JSONL file.

The format of the schema will depend on the jsonl_type_coercion_strategy config
option, but will always be a dictionary of field names and associated types.

Returns:
A schema representing a JSONL file.
"""
properties = {}
for field in self._get_fields():
properties.update(self._get_property(field=field))
return {"properties": properties}

def _get_property(self, field: str) -> dict[str, dict[str, list[str]]]:
strategy = self.config["jsonl_type_coercion_strategy"]
if strategy == "any":
return {
field: {
"type": [
"null",
"boolean",
"integer",
"number",
"string",
"array",
"object",
],
},
}
if strategy == "string":
return {field: {"type": ["null", "string"]}}
if strategy == "blob":
return {field: {"type": ["null", "object"]}}
msg = f"The coercion strategy '{strategy}' is not valid."
raise ValueError(msg)

def _get_fields(self) -> Generator[str, None, None]:
strategy = self.config["jsonl_sampling_strategy"]
if strategy == "first":
try:
yield from next(self.get_rows())
except StopIteration:
return
return
if strategy == "all":
msg = f"The sampling strategy '{strategy}' has not been implemented."
raise NotImplementedError(msg)
msg = f"The sampling strategy '{strategy}' is not valid."
raise ValueError(msg)

def _pre_process(self, row: dict[str, Any]) -> dict[str, Any]:
strategy = self.config["jsonl_type_coercion_strategy"]
if strategy == "any":
return row
if strategy == "string":
for entry in row:
row[entry] = str(row[entry])
return row
if strategy == "blob":
return {"record": row}
msg = f"The coercion strategy '{strategy}' is not valid."
raise ValueError(msg)
Loading