diff --git a/README.md b/README.md index 875d1bc..9d4474d 100644 --- a/README.md +++ b/README.md @@ -18,19 +18,26 @@ pipx install git+https://github.com/MeltanoLabs/tap-file.git ### Accepted Config Options -| Setting | Required | Default | Description | -|:-----------------------|:--------:|:-------:|:------------| -| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. | -| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, `s3-bucket-name`. | -| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. Note that if you want to sync a subdirectory, use the `filepath` setting instead. | -| s3_anonymous_connection| False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. | -| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | -| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | -| cache_mode | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. | -| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | -| stream_map_config | False | None | User-defined config values to be used within map expressions. | -| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | -| flattening_max_depth | False | None | The max depth to flatten schemas. | +| Setting | Required | Default | Description | +|:----------------------------|:--------:|:-------:|:------------| +| stream_name | False | file | The name of the stream that is output by the tap. | +| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. | +| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. | +| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. | +| file_type | False | detect | Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates how to determine a file's type. If set to `detect`, file names containing a matching extension will be read as that type and other files will not be read. If set to a file type, *all* files will be read as that type. | +| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. | +| delimiter | False | detect | The character used to separate records in a CSV/TSV. Can be any character or the special value `detect`. If a character is provided, all CSV and TSV files will use that value. `detect` will use `,` for CSV files and `\t` for TSV files. | +| quote_character | False | " | The character used to indicate when a record in a CSV contains a delimiter character. | +| jsonl_sampling_strategy | False | first | The strategy determining how to read the keys in a JSONL file. Must be one of `first` or `all`. Currently, only `first` is supported, which will assume that the first record in a file is representative of all keys. | +| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `blob`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `blob` will deliver each JSONL row as a JSON object with no internal schema. Currently, only `any` and `string` are supported. | +| s3_anonymous_connection | False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. | +| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | +| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | +| caching_strategy | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. One of `none`, `once`, or `persistent`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. | +| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | +| stream_map_config | False | None | User-defined config values to be used within map expressions. | +| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | +| flattening_max_depth | False | None | The max depth to flatten schemas. | | batch_config | False | None | Object containing batch configuration information, as specified in the [Meltano documentation](https://sdk.meltano.com/en/latest/batch.html). Has two child objects: `encoding` and `storage`. | | batch_config.encoding | False | None | Object containing information about how to encode batch information. Has two child entries: `format` and `compression`. | | batch_config.storage | False | None | Object containing information about how batch files should be stored. Has two child entries: `root` and `prefix`. | @@ -57,9 +64,9 @@ config: root: file:///foo/bar prefix: batch- ``` -```json +```python { - // ... other config options ... + # ... other config options ... "batch_config": { "encoding": { "format": "jsonl", diff --git a/meltano.yml b/meltano.yml index 76194f5..672278b 100644 --- a/meltano.yml +++ b/meltano.yml @@ -8,7 +8,7 @@ plugins: extractors: - name: tap-file namespace: tap_file - pip_url: -e .[s3] + pip_url: -e . capabilities: - state - catalog @@ -16,15 +16,22 @@ plugins: - about - stream-maps settings: + - name: stream_name - name: protocol - name: filepath - name: file_regex + - name: file_type + - name: compression + - name: delimiter + - name: quote_character + - name: jsonl_sampling_strategy + - name: jsonl_type_coercion_strategy - name: s3_anonymous_connection - name: AWS_ACCESS_KEY_ID kind: password - name: AWS_SECRET_ACCESS_KEY kind: password - - name: cache_mode + - name: caching_strategy loaders: - name: target-jsonl variant: andyh1203 diff --git a/tap_file/client.py b/tap_file/client.py index 49b1c96..30473a5 100644 --- a/tap_file/client.py +++ b/tap_file/client.py @@ -55,8 +55,8 @@ def get_files(self) -> Generator[str, None, None]: if empty: msg = ( - "No files found. Choose a different `filepath` or try a more " - "lenient `file_regex`." + "No files found. Choose a different `filepath` or try a more lenient " + "`file_regex`." ) raise RuntimeError(msg) @@ -72,6 +72,33 @@ def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: msg = "get_rows must be implemented by subclass." raise NotImplementedError(msg) + def get_compression(self, file: str) -> str | None: # noqa: PLR0911 + """Determines what compression encoding is appropraite for a given file. + + Args: + file: The file to determine the encoding of. + + Returns: + A string representing the appropriate compression encoding, or `None` if no + compression is needed or if a compression encoding can't be determined. + """ + compression: str = self.config["compression"] + if compression == "none": + return None + if compression != "detect": + return compression + if re.match(".*\\.zip$", file): + return "zip" + if re.match(".*\\.bz2$", file): + return "bz2" + if re.match(".*\\.gz(ip)?$", file): + return "gzip" + if re.match(".*\\.lzma$", file): + return "lzma" + if re.match(".*\\.xz$", file): + return "xz" + return None + def get_records( self, context: dict | None, # noqa: ARG002 diff --git a/tap_file/files.py b/tap_file/files.py index 6c8332f..5a74c50 100644 --- a/tap_file/files.py +++ b/tap_file/files.py @@ -35,25 +35,25 @@ def get_filesystem(self) -> fsspec.AbstractFileSystem: """ self._check_config() protocol = self.config["protocol"] - cache_mode = self.config["cache_mode"] + caching_strategy = self.config["caching_strategy"] if protocol == "file": return fsspec.filesystem("file") - if cache_mode == "once": + if caching_strategy == "once": return fsspec.filesystem( "filecache", target_protocol=self.config["protocol"], target_options=self._get_args(), ) - if cache_mode == "persistent": + if caching_strategy == "persistent": return fsspec.filesystem( "filecache", target_protocol=self.config["protocol"], target_options=self._get_args(), cache_storage=tempfile.gettempdir(), ) - if cache_mode == "none": + if caching_strategy == "none": return fsspec.filesystem( protocol=self.config["protocol"], **self._get_args(), @@ -79,12 +79,12 @@ def _get_args(self) -> dict[str, Any]: def _check_config(self) -> None: protocol = self.config["protocol"] - cache_mode = self.config["cache_mode"] + caching_strategy = self.config["caching_strategy"] if protocol not in {"file", "s3"}: msg = f"Protocol '{protocol}' is not valid." raise ValueError(msg) - if cache_mode not in {"none", "once", "persistent"}: - msg = f"Cache mode '{cache_mode}' is not valid." + if caching_strategy not in {"none", "once", "persistent"}: + msg = f"Caching strategy '{caching_strategy}' is not valid." raise ValueError(msg) diff --git a/tap_file/streams.py b/tap_file/streams.py index f084a65..7df3b4d 100644 --- a/tap_file/streams.py +++ b/tap_file/streams.py @@ -3,45 +3,151 @@ from __future__ import annotations import csv +import json +import re from functools import cached_property from typing import Any, Generator from tap_file.client import FileStream -class CSVStream(FileStream): - """Stream for reading CSVs.""" - - name = "CSV" +class DelimitedStream(FileStream): + """Stream for reading CSVs and TSVs.""" def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: - """Retrive all rows from all CSVs. + """Retrive all rows from all *SVs. Yields: - A dictionary containing information about a row in a CSV. + A dictionary containing information about a row in a *SV. """ - for file in self.get_files(): - with self.filesystem.open(path=file, mode="rt") as f: - reader = csv.DictReader(f) - for row in reader: - yield row + for reader in self._get_readers(): + for row in reader: + yield row @cached_property def schema(self) -> dict[str, dict]: - """Create a schema for a CSV file. + """Create a schema for a *SV file. - Each column in the CSV will have its own entry in the schema. All entries will + Each column in the *SV will have its own entry in the schema. All entries will be of the form: `'FIELD_NAME': {'type': ['null', 'string']}` Returns: - A schema representing a CSV. + A schema representing a *SV. """ properties = {} + for reader in self._get_readers(): + for field in reader.fieldnames: + properties.update({field: {"type": ["null", "string"]}}) + + return {"properties": properties} + + def _get_readers(self) -> Generator[csv.DictReader[str], None, None]: + quote_character: str = self.config["quote_character"] + + for file in self.get_files(): + if self.config["delimiter"] == "detect": + if re.match(".*\\.csv.*", file): + delimiter = "," + elif re.match(".*\\.tsv.*", file): + delimiter = "\t" + else: + msg = ( + "Configuration option 'delimiter' is set to 'detect' but a " + "non-csv non-tsv file is present. Please manually specify " + "'delimiter'." + ) + raise RuntimeError(msg) + else: + delimiter = self.config["delimiter"] + + with self.filesystem.open( + path=file, + mode="rt", + compression=self.get_compression(file=file), + ) as f: + yield csv.DictReader(f, delimiter=delimiter, quotechar=quote_character) + + +class JSONLStream(FileStream): + """Stream for reading JSON files.""" + + def get_rows(self) -> Generator[dict[str, Any], None, None]: + """Retrive all rows from all JSONL files. + + Yields: + A dictionary containing information about a row in a JSONL file. + """ for file in self.get_files(): - with self.filesystem.open(path=file, mode="rt") as f: - reader = csv.DictReader(f) - for field in reader.fieldnames: - properties.update({field: {"type": ["null", "string"]}}) + with self.filesystem.open( + path=file, + mode="rt", + compression=self.get_compression(file=file), + ) as f: + for row in f: + yield self._pre_process(json.loads(row)) + + @cached_property + def schema(self) -> dict[str, dict]: + """Create a schema for a JSONL file. + + The format of the schema will depend on the jsonl_type_coercion_strategy config + option, but will always be a dictionary of field names and associated types. + Returns: + A schema representing a JSONL file. + """ + properties = {} + for field in self._get_fields(): + properties.update(self._get_property(field=field)) return {"properties": properties} + + def _get_property(self, field: str) -> dict[str, dict[str, list[str]]]: + strategy = self.config["jsonl_type_coercion_strategy"] + if strategy == "any": + return { + field: { + "type": [ + "null", + "boolean", + "integer", + "number", + "string", + "array", + "object", + ], + }, + } + if strategy == "string": + return {field: {"type": ["null", "string"]}} + if strategy == "blob": + return {field: {"type": ["null", "object"]}} + msg = f"The coercion strategy '{strategy}' is not valid." + raise ValueError(msg) + + def _get_fields(self) -> Generator[str, None, None]: + strategy = self.config["jsonl_sampling_strategy"] + if strategy == "first": + try: + yield from next(self.get_rows()) + except StopIteration: + return + return + if strategy == "all": + msg = f"The sampling strategy '{strategy}' has not been implemented." + raise NotImplementedError(msg) + msg = f"The sampling strategy '{strategy}' is not valid." + raise ValueError(msg) + + def _pre_process(self, row: dict[str, Any]) -> dict[str, Any]: + strategy = self.config["jsonl_type_coercion_strategy"] + if strategy == "any": + return row + if strategy == "string": + for entry in row: + row[entry] = str(row[entry]) + return row + if strategy == "blob": + return {"record": row} + msg = f"The coercion strategy '{strategy}' is not valid." + raise ValueError(msg) diff --git a/tap_file/tap.py b/tap_file/tap.py index 25b29fe..5d620ea 100644 --- a/tap_file/tap.py +++ b/tap_file/tap.py @@ -16,6 +16,13 @@ class TapFile(Tap): name = "tap-file" config_jsonschema = th.PropertiesList( + th.Property( + "stream_name", + th.StringType, + required=False, + default="file", + description="The name of the stream that is output by the tap.", + ), th.Property( "protocol", th.StringType, @@ -36,9 +43,80 @@ class TapFile(Tap): "file_regex", th.RegexType, description=( - "A regex pattern to only include certain files. Example: `.*\\.csv$`. " - "Note that if you want to sync a subdirectory, use the `filepath` " - "setting instead." + "A regex pattern to only include certain files. Example: `.*\\.csv`." + ), + ), + th.Property( + "file_type", + th.RegexType, + default="delimited", + description=( + "Can be any of `delimited`, `jsonl`, or `avro`. Indicates the type of " + "file to sync, where `delimited` is for CSV/TSV files and similar. " + "Note that *all* files will be read as that type, regardless of file " + "extension. To only read from files with a matching file extension, " + "appropriately configure `file_regex`." + ), + ), + th.Property( + "compression", + th.StringType, + allowed_values=["none", "zip", "bz2", "gzip", "lzma", "xz", "detect"], + default="detect", + description=( + "The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, " + "`lzma`, `xz`, `none`, or `detect`. If set to `none` or any encoding, " + "that setting will be applied to *all* files, regardless of file " + "extension. If set to `detect`, encodings will be applied based on " + "file extension." + ), + ), + th.Property( + "delimiter", + th.StringType, + default="detect", + description=( + "The character used to separate records in a delimited file. Can be " + "any character or the special value `detect`. If a character is " + "provided, all delimited files will use that value. `detect` will use " + "`,` for `.csv` files, `\\t` for `.tsv` files, and fail if other file " + "types are present." + ), + ), + th.Property( + "quote_character", + th.StringType, + default='"', + description=( + "The character used to indicate when a record in a CSV contains a " + "delimiter character." + ), + ), + th.Property( + "jsonl_sampling_strategy", + th.StringType, + allowed_values=["first", "all"], + default="first", + description=( + "The strategy determining how to read the keys in a JSONL file. Must " + "be one of `first` or `all`. Currently, only `first` is supported, " + "which will assume that the first record in a file is representative " + "of all keys." + ), + ), + th.Property( + "jsonl_type_coercion_strategy", + th.StringType, + allowed_values=["any", "string", "blob"], + default="any", + description=( + "The strategy determining how to construct the schema for JSONL files " + "when the types represented are ambiguous. Must be one of `any`, " + "`string`, or `blob`. `any` will provide a generic schema for all " + "keys, allowing them to be any valid JSON type. `string` will require " + "all keys to be strings and will convert other values accordingly. " + "`blob` will deliver each JSONL row as a JSON object with no internal " + "schema. Currently, only `any` and `string` are supported." ), ), th.Property( @@ -71,7 +149,7 @@ class TapFile(Tap): ), ), th.Property( - "cache_mode", + "caching_strategy", th.StringType, default="once", allowed_values=["none", "once", "persistent"], @@ -93,9 +171,20 @@ def discover_streams(self) -> list[streams.FileStream]: Returns: A list of discovered streams. """ - return [ - streams.CSVStream(self), - ] + name = self.config["stream_name"] + file_type = self.config["file_type"] + if file_type == "delimited": + return [streams.DelimitedStream(self, name=name)] + if file_type == "jsonl": + return [streams.JSONLStream(self, name=name)] + if file_type == "avro": + msg = "avro has not yet been implemented." + raise NotImplementedError(msg) + if file_type in {"csv", "tsv"}: + msg = f"{file_type} is not a valid 'file_type'. Did you mean 'delimited'?" + raise ValueError(msg) + msg = f"{file_type} is not a valid 'file_type'." + raise ValueError(msg) if __name__ == "__main__": diff --git a/tests/test_core.py b/tests/test_core.py index 703a231..9b7e9b7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -6,7 +6,7 @@ SAMPLE_CONFIG = { "protocol": "s3", - "filepath": "tap-file-taptesting/alchemy", + "filepath": "tap-file-taptesting/grocery", # S3 access key and secret are retrieved from environment variables. }