From 870e15e7d66baf5737efe74d0201255a6ff04f91 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Fri, 16 Jun 2023 14:32:27 -0400 Subject: [PATCH 1/5] Remove redundant GitHub actions code. --- .github/workflows/test.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 522abda..6373818 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,10 +21,6 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} steps: - uses: actions/checkout@v3 - - name: Check environment variables. - run: | - echo "secret : $TESTING_SECRET" - echo "variable : $TESTING_VARIABLE" - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: From bb9be1a68fbfdfdbeb876bc2d11a478e5e2b06a7 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Fri, 16 Jun 2023 15:21:19 -0400 Subject: [PATCH 2/5] Custom stream name. --- meltano.yml | 1 + tap_file/streams.py | 2 -- tap_file/tap.py | 14 ++++++++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/meltano.yml b/meltano.yml index b477db3..ad15fa3 100644 --- a/meltano.yml +++ b/meltano.yml @@ -16,6 +16,7 @@ plugins: - about - stream-maps settings: + - name: stream_name - name: protocol - name: filepath - name: file_regex diff --git a/tap_file/streams.py b/tap_file/streams.py index f084a65..cc023ca 100644 --- a/tap_file/streams.py +++ b/tap_file/streams.py @@ -12,8 +12,6 @@ class CSVStream(FileStream): """Stream for reading CSVs.""" - name = "CSV" - def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: """Retrive all rows from all CSVs. diff --git a/tap_file/tap.py b/tap_file/tap.py index ac0e37a..511cd00 100644 --- a/tap_file/tap.py +++ b/tap_file/tap.py @@ -17,6 +17,13 @@ class TapFile(Tap): name = "tap-file" config_jsonschema = th.PropertiesList( + th.Property( + "stream_name", + th.StringType, + required=False, + default="file", + description="The name of the stream that is output by the tap.", + ), th.Property( "protocol", th.StringType, @@ -28,7 +35,9 @@ class TapFile(Tap): "filepath", th.StringType, required=True, - description="The path to obtain files from. Example: `/foo/bar`.", + description=( + "The path to obtain files from. Examples: `/foo/bar`, `s3-bucket-name`." + ), ), th.Property( "file_regex", @@ -84,8 +93,9 @@ def discover_streams(self) -> list[streams.FileStream]: Returns: A list of discovered streams. """ + name = self.config["stream_name"] return [ - streams.CSVStream(self), + streams.CSVStream(self, name = name), ] From f01842b944fc80efe3a56b2f75bd49767eba50ca Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Mon, 19 Jun 2023 13:48:49 -0400 Subject: [PATCH 3/5] Adds compression, delimiter, and quote_character. csv and tsv are valid file types. Scaffolding in place for json and avro. --- meltano.yml | 4 +++ tap_file/client.py | 31 ++++++++++++++++- tap_file/streams.py | 83 +++++++++++++++++++++++++++++++++++---------- tap_file/tap.py | 45 +++++++++++++++++++++++- 4 files changed, 144 insertions(+), 19 deletions(-) diff --git a/meltano.yml b/meltano.yml index ad15fa3..0ad1d6e 100644 --- a/meltano.yml +++ b/meltano.yml @@ -20,7 +20,11 @@ plugins: - name: protocol - name: filepath - name: file_regex + - name: file_type - name: cache_filepath + - name: compression + - name: delimiter + - name: quote_character - name: s3_anonymous_connection - name: AWS_ACCESS_KEY_ID kind: password diff --git a/tap_file/client.py b/tap_file/client.py index 2ed0a59..bd5a8e5 100644 --- a/tap_file/client.py +++ b/tap_file/client.py @@ -102,7 +102,7 @@ def filesystem(self) -> fsspec.AbstractFileSystem: # noqa: PLR0911 msg = f"Protocol '{protocol}' is not valid." raise ValueError(msg) - def get_files(self) -> Generator[str, None, None]: + def get_files(self, regex: str | None = None) -> Generator[str, None, None]: """Gets file names to be synced. Yields: @@ -118,6 +118,8 @@ def get_files(self) -> Generator[str, None, None]: Path(file).name, ): continue + if regex is not None and not re.match(regex, Path(file).name): + continue yield file def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: @@ -132,6 +134,33 @@ def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: msg = "get_rows must be implemented by subclass." raise NotImplementedError(msg) + def get_compression(self, file: str) -> str | None: # noqa: PLR0911 + """Determines what compression encoding is appropraite for a given file. + + Args: + file: The file to determine the encoding of. + + Returns: + A string representing the appropriate compression encoding, or `None` if no + compression is needed or if a compression encoding can't be determined. + """ + compression: str = self.config["compression"] + if compression == "none": + return None + if compression != "detect": + return compression + if re.match(".*\\.zip$", file): + return "zip" + if re.match(".*\\.bz2$", file): + return "bz2" + if re.match(".*\\.(gzip|gz)$", file): + return "gzip" + if re.match(".*\\.lzma$", file): + return "lzma" + if re.match(".*\\.xz$", file): + return "xz" + return None + def get_records( self, context: dict | None, # noqa: ARG002 diff --git a/tap_file/streams.py b/tap_file/streams.py index cc023ca..41b6ec5 100644 --- a/tap_file/streams.py +++ b/tap_file/streams.py @@ -9,37 +9,86 @@ from tap_file.client import FileStream -class CSVStream(FileStream): - """Stream for reading CSVs.""" +class SeparatedValuesStream(FileStream): + """Stream for reading CSVs and TSVs.""" def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: - """Retrive all rows from all CSVs. + """Retrive all rows from all *SVs. Yields: - A dictionary containing information about a row in a CSV. + A dictionary containing information about a row in a *SV. """ - for file in self.get_files(): - with self.filesystem.open(path=file, mode="rt") as f: - reader = csv.DictReader(f) - for row in reader: - yield row + for reader in self.get_readers(): + for row in reader: + yield row @cached_property def schema(self) -> dict[str, dict]: - """Create a schema for a CSV file. + """Create a schema for a *SV file. - Each column in the CSV will have its own entry in the schema. All entries will + Each column in the *SV will have its own entry in the schema. All entries will be of the form: `'FIELD_NAME': {'type': ['null', 'string']}` Returns: - A schema representing a CSV. + A schema representing a *SV. """ properties = {} - for file in self.get_files(): - with self.filesystem.open(path=file, mode="rt") as f: - reader = csv.DictReader(f) - for field in reader.fieldnames: - properties.update({field: {"type": ["null", "string"]}}) + for reader in self.get_readers(): + for field in reader.fieldnames: + properties.update({field: {"type": ["null", "string"]}}) return {"properties": properties} + + def get_readers(self) -> Generator[csv.DictReader[str], None, None]: + """Get all *SV readers. + + Raises: + StopIteration: To end iteration early if no more readers exist. + + Yields: + A *SV reader. + """ + file_type: str = self.config["file_type"] + delimiter: str = self.config["delimiter"] + delimiters: dict[str, str] = { + "csv": "," if delimiter == "detect" else delimiter, + "tsv": "\t" if delimiter == "detect" else delimiter, + } + + if file_type == "detect": + for reader in self.get_readers_helper( + delimiter == delimiters["csv"], + regex=".*\\.csv.*", + ): + yield reader + for reader in self.get_readers_helper( + delimiter == delimiters["tsv"], + regex=".*\\.tsv.*", + ): + yield reader + elif file_type in {"csv", "tsv"}: + for reader in self.get_readers_helper(delimiter=delimiters[file_type]): + yield reader + else: + raise StopIteration + + def get_readers_helper( + self, + delimiter: str, + regex: str | None = None, + ) -> Generator[csv.DictReader[str], None, None]: + """Get a subset of *SV readers matching certain criteria. + + Yields: + A *SV reader. + """ + quote_character: str = self.config["quote_character"] + + for file in self.get_files(regex=regex): + with self.filesystem.open( + path=file, + mode="rt", + compression=self.get_compression(file=file), + ) as f: + yield csv.DictReader(f, delimiter=delimiter, quotechar=quote_character) diff --git a/tap_file/tap.py b/tap_file/tap.py index 511cd00..576d4ab 100644 --- a/tap_file/tap.py +++ b/tap_file/tap.py @@ -46,6 +46,18 @@ class TapFile(Tap): "A regex pattern to only include certain files. Example: `.*\\.csv`." ), ), + th.Property( + "file_type", + th.RegexType, + default="detect", + description=( + "Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates " + "how to determine a file's type. If set to `detect`, file names " + "containing a matching extension will be read as that type and other " + "files will not be read. If set to a file type, *all* files will be " + "read as that type." + ), + ), th.Property( "cache_filepath", th.StringType, @@ -56,6 +68,37 @@ class TapFile(Tap): "will be fetched for each read operation." ), ), + th.Property( + "compression", + th.StringType, + required=True, + allowed_values=["none", "zip", "bz2", "gzip", "lzma", "xz", "detect"], + default="none", + description=( + "The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, " + "`lzma`, `xz`, `none`, or `detect`." + ), + ), + th.Property( + "delimiter", + th.StringType, + default="detect", + description=( + "The character used to separate records in a CSV. Can be any single " + "character or the special value `detect`. If a value is provided, all " + "CSV and TSV files will use that value. Otherwise, `,` will be used " + "for CSV files and `\\t` will be used for TSV files." + ), + ), + th.Property( + "quote_character", + th.StringType, + default='"', + description=( + "The character used to indicate when a record in a CSV contains a " + 'delimiter character. Defaults to `"`.' + ), + ), th.Property( "s3_anonymous_connection", th.BooleanType, @@ -95,7 +138,7 @@ def discover_streams(self) -> list[streams.FileStream]: """ name = self.config["stream_name"] return [ - streams.CSVStream(self, name = name), + streams.SeparatedValuesStream(self, name=name), ] From 6e5e91330feb7ee56c8645c509ab01205b250301 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Tue, 20 Jun 2023 23:30:57 -0400 Subject: [PATCH 4/5] jsonl support. Caching config rename. Documentation fixes. --- README.md | 42 +++++++++--------- meltano.yml | 6 ++- tap_file/client.py | 3 +- tap_file/files.py | 14 +++--- tap_file/streams.py | 106 ++++++++++++++++++++++++++++++++++++++++---- tap_file/tap.py | 38 +++++++++++++--- 6 files changed, 165 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index ddf4404..9d4474d 100644 --- a/README.md +++ b/README.md @@ -18,24 +18,26 @@ pipx install git+https://github.com/MeltanoLabs/tap-file.git ### Accepted Config Options -| Setting | Required | Default | Description | -|:-----------------------|:--------:|:-------:|:------------| -| stream_name | False | file | The name of the stream that is output by the tap. | -| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. | -| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. | -| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. | -| file_type | False | detect | Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates how to determine a file's type. If set to `detect`, file names containing a matching extension will be read as that type and other files will not be read. If set to a file type, *all* files will be read as that type. | -| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. | -| delimiter | False | detect | The character used to separate records in a CSV/TSV. Can be any character or the special value `detect`. If a value is provided, all CSV and TSV files will use that value. Otherwise, `,` will be used for CSV files and `\t` will be used for TSV files. | -| quote_character | False | " | The character used to indicate when a record in a CSV contains a delimiter character. Defaults to `"`. | -| s3_anonymous_connection| False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. | -| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | -| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | -| cache_mode | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. One of `none`, `once`, or `persistent`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. | -| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | -| stream_map_config | False | None | User-defined config values to be used within map expressions. | -| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | -| flattening_max_depth | False | None | The max depth to flatten schemas. | +| Setting | Required | Default | Description | +|:----------------------------|:--------:|:-------:|:------------| +| stream_name | False | file | The name of the stream that is output by the tap. | +| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. | +| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. | +| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. | +| file_type | False | detect | Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates how to determine a file's type. If set to `detect`, file names containing a matching extension will be read as that type and other files will not be read. If set to a file type, *all* files will be read as that type. | +| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. | +| delimiter | False | detect | The character used to separate records in a CSV/TSV. Can be any character or the special value `detect`. If a character is provided, all CSV and TSV files will use that value. `detect` will use `,` for CSV files and `\t` for TSV files. | +| quote_character | False | " | The character used to indicate when a record in a CSV contains a delimiter character. | +| jsonl_sampling_strategy | False | first | The strategy determining how to read the keys in a JSONL file. Must be one of `first` or `all`. Currently, only `first` is supported, which will assume that the first record in a file is representative of all keys. | +| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `blob`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `blob` will deliver each JSONL row as a JSON object with no internal schema. Currently, only `any` and `string` are supported. | +| s3_anonymous_connection | False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. | +| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | +| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. | +| caching_strategy | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. One of `none`, `once`, or `persistent`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. | +| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | +| stream_map_config | False | None | User-defined config values to be used within map expressions. | +| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | +| flattening_max_depth | False | None | The max depth to flatten schemas. | | batch_config | False | None | Object containing batch configuration information, as specified in the [Meltano documentation](https://sdk.meltano.com/en/latest/batch.html). Has two child objects: `encoding` and `storage`. | | batch_config.encoding | False | None | Object containing information about how to encode batch information. Has two child entries: `format` and `compression`. | | batch_config.storage | False | None | Object containing information about how batch files should be stored. Has two child entries: `root` and `prefix`. | @@ -62,9 +64,9 @@ config: root: file:///foo/bar prefix: batch- ``` -```json +```python { - // ... other config options ... + # ... other config options ... "batch_config": { "encoding": { "format": "jsonl", diff --git a/meltano.yml b/meltano.yml index 2a4c95a..672278b 100644 --- a/meltano.yml +++ b/meltano.yml @@ -8,7 +8,7 @@ plugins: extractors: - name: tap-file namespace: tap_file - pip_url: -e .[s3] + pip_url: -e . capabilities: - state - catalog @@ -24,12 +24,14 @@ plugins: - name: compression - name: delimiter - name: quote_character + - name: jsonl_sampling_strategy + - name: jsonl_type_coercion_strategy - name: s3_anonymous_connection - name: AWS_ACCESS_KEY_ID kind: password - name: AWS_SECRET_ACCESS_KEY kind: password - - name: cache_mode + - name: caching_strategy loaders: - name: target-jsonl variant: andyh1203 diff --git a/tap_file/client.py b/tap_file/client.py index 70beacf..39294e2 100644 --- a/tap_file/client.py +++ b/tap_file/client.py @@ -50,9 +50,9 @@ def get_files(self, regex: str | None = None) -> Generator[str, None, None]: Path(file["name"]).name, ): continue + empty = False if regex is not None and not re.match(regex, Path(file["name"]).name): continue - empty = False yield file["name"] if empty: @@ -62,7 +62,6 @@ def get_files(self, regex: str | None = None) -> Generator[str, None, None]: ) raise RuntimeError(msg) - def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: """Gets rows of all files that should be synced. diff --git a/tap_file/files.py b/tap_file/files.py index 6c8332f..5a74c50 100644 --- a/tap_file/files.py +++ b/tap_file/files.py @@ -35,25 +35,25 @@ def get_filesystem(self) -> fsspec.AbstractFileSystem: """ self._check_config() protocol = self.config["protocol"] - cache_mode = self.config["cache_mode"] + caching_strategy = self.config["caching_strategy"] if protocol == "file": return fsspec.filesystem("file") - if cache_mode == "once": + if caching_strategy == "once": return fsspec.filesystem( "filecache", target_protocol=self.config["protocol"], target_options=self._get_args(), ) - if cache_mode == "persistent": + if caching_strategy == "persistent": return fsspec.filesystem( "filecache", target_protocol=self.config["protocol"], target_options=self._get_args(), cache_storage=tempfile.gettempdir(), ) - if cache_mode == "none": + if caching_strategy == "none": return fsspec.filesystem( protocol=self.config["protocol"], **self._get_args(), @@ -79,12 +79,12 @@ def _get_args(self) -> dict[str, Any]: def _check_config(self) -> None: protocol = self.config["protocol"] - cache_mode = self.config["cache_mode"] + caching_strategy = self.config["caching_strategy"] if protocol not in {"file", "s3"}: msg = f"Protocol '{protocol}' is not valid." raise ValueError(msg) - if cache_mode not in {"none", "once", "persistent"}: - msg = f"Cache mode '{cache_mode}' is not valid." + if caching_strategy not in {"none", "once", "persistent"}: + msg = f"Caching strategy '{caching_strategy}' is not valid." raise ValueError(msg) diff --git a/tap_file/streams.py b/tap_file/streams.py index fe7ce12..f4f0cfc 100644 --- a/tap_file/streams.py +++ b/tap_file/streams.py @@ -3,8 +3,9 @@ from __future__ import annotations import csv +import json from functools import cached_property -from typing import Any, Generator +from typing import Any, Generator, Iterable from tap_file.client import FileStream @@ -18,7 +19,7 @@ def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: Yields: A dictionary containing information about a row in a *SV. """ - for reader in self.get_readers(): + for reader in self._get_readers(): for row in reader: yield row @@ -34,13 +35,13 @@ def schema(self) -> dict[str, dict]: """ properties = {} - for reader in self.get_readers(): + for reader in self._get_readers(): for field in reader.fieldnames: properties.update({field: {"type": ["null", "string"]}}) return {"properties": properties} - def get_readers(self) -> Generator[csv.DictReader[str], None, None]: + def _get_readers(self) -> Generator[csv.DictReader[str], None, None]: """Get all *SV readers. Raises: @@ -57,23 +58,23 @@ def get_readers(self) -> Generator[csv.DictReader[str], None, None]: } if file_type == "detect": - for reader in self.get_readers_helper( + for reader in self._get_readers_helper( delimiter=delimiters["csv"], regex=".*\\.csv.*", ): yield reader - for reader in self.get_readers_helper( + for reader in self._get_readers_helper( delimiter=delimiters["tsv"], regex=".*\\.tsv.*", ): yield reader elif file_type in {"csv", "tsv"}: - for reader in self.get_readers_helper(delimiter=delimiters[file_type]): + for reader in self._get_readers_helper(delimiter=delimiters[file_type]): yield reader else: raise StopIteration - def get_readers_helper( + def _get_readers_helper( self, delimiter: str, regex: str | None = None, @@ -92,3 +93,92 @@ def get_readers_helper( compression=self.get_compression(file=file), ) as f: yield csv.DictReader(f, delimiter=delimiter, quotechar=quote_character) + + +class JSONLStream(FileStream): + """Stream for reading JSON files.""" + + def get_rows(self) -> Generator[dict[str, Any], None, None]: + """Retrive all rows from all JSONL files. + + Yields: + A dictionary containing information about a row in a JSONL file. + """ + file_type: str = self.config["file_type"] + + if file_type == "detect": + regex = None + elif file_type == "jsonl": + regex = ".*\\.jsonl.*" + else: + raise StopIteration + + for file in self.get_files(regex=regex): + with self.filesystem.open( + path=file, + mode="rt", + compression=self.get_compression(file=file), + ) as f: + for row in f: + yield self._pre_process(json.loads(row)) + + @cached_property + def schema(self) -> dict[str, dict]: + """Create a schema for a JSONL file. + + The format of the schema will depend on the jsonl_type_coercion_strategy config + option, but will always be a dictionary of field names and associated types. + + Returns: + A schema representing a JSONL file. + """ + properties = {} + for field in self._get_fields(): + properties.update(self._get_property(field=field)) + return {"properties": properties} + + def _get_property(self, field: str) -> dict[str, dict[str, list[str]]]: + strategy = self.config["jsonl_type_coercion_strategy"] + if strategy == "any": + return { + field: { + "type": [ + "null", + "object", + "integer", + "array", + "number", + "boolean", + "string", + ], + }, + } + if strategy == "string": + return {field: {"type": ["null", "string"]}} + if strategy == "blob": + return {field: {"type": ["null", "object"]}} + msg = f"The coercion strategy '{strategy}' is not valid." + raise ValueError(msg) + + def _get_fields(self) -> Iterable[str]: + strategy = self.config["jsonl_sampling_strategy"] + if strategy == "first": + return list(next(self.get_rows())) + if strategy == "all": + msg = f"The sampling strategy '{strategy}' has not been implemented." + raise NotImplementedError(msg) + msg = f"The sampling strategy '{strategy}' is not valid." + raise ValueError(msg) + + def _pre_process(self, row: dict[str, Any]) -> dict[str, Any]: + strategy = self.config["jsonl_type_coercion_strategy"] + if strategy == "any": + return row + if strategy == "string": + for entry in row: + row[entry] = str(row[entry]) + return row + if strategy == "blob": + return {"record": row} + msg = f"The coercion strategy '{strategy}' is not valid." + raise ValueError(msg) diff --git a/tap_file/tap.py b/tap_file/tap.py index 0d7dd79..3f739f9 100644 --- a/tap_file/tap.py +++ b/tap_file/tap.py @@ -74,9 +74,9 @@ class TapFile(Tap): default="detect", description=( "The character used to separate records in a CSV/TSV. Can be any " - "character or the special value `detect`. If a value is provided, all " - "CSV and TSV files will use that value. Otherwise, `,` will be used " - "for CSV files and `\\t` will be used for TSV files." + "character or the special value `detect`. If a character is provided, " + "all CSV and TSV files will use that value. `detect` will use `,` for " + " CSV files and `\\t` for TSV files." ), ), th.Property( @@ -85,7 +85,34 @@ class TapFile(Tap): default='"', description=( "The character used to indicate when a record in a CSV contains a " - 'delimiter character. Defaults to `"`.' + "delimiter character." + ), + ), + th.Property( + "jsonl_sampling_strategy", + th.StringType, + allowed_values=["first", "all"], + default="first", + description=( + "The strategy determining how to read the keys in a JSONL file. Must " + "be one of `first` or `all`. Currently, only `first` is supported, " + "which will assume that the first record in a file is representative " + "of all keys." + ), + ), + th.Property( + "jsonl_type_coercion_strategy", + th.StringType, + allowed_values=["any", "string", "blob"], + default="any", + description=( + "The strategy determining how to construct the schema for JSONL files " + "when the types represented are ambiguous. Must be one of `any`, " + "`string`, or `blob`. `any` will provide a generic schema for all " + "keys, allowing them to be any valid JSON type. `string` will require " + "all keys to be strings and will convert other values accordingly. " + "`blob` will deliver each JSONL row as a JSON object with no internal " + "schema. Currently, only `any` and `string` are supported." ), ), th.Property( @@ -118,7 +145,7 @@ class TapFile(Tap): ), ), th.Property( - "cache_mode", + "caching_strategy", th.StringType, default="once", allowed_values=["none", "once", "persistent"], @@ -143,6 +170,7 @@ def discover_streams(self) -> list[streams.FileStream]: name = self.config["stream_name"] return [ streams.SeparatedValuesStream(self, name=name), + streams.JSONLStream(self, name=name), ] From e609331e18a90386b7a05fe4db04f9f05816402c Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Thu, 22 Jun 2023 14:30:44 -0400 Subject: [PATCH 5/5] Remove file_type=detect to fix tests. --- tap_file/client.py | 4 +- tap_file/streams.py | 89 +++++++++++++++------------------------------ tap_file/tap.py | 43 ++++++++++++++-------- 3 files changed, 58 insertions(+), 78 deletions(-) diff --git a/tap_file/client.py b/tap_file/client.py index 39294e2..30473a5 100644 --- a/tap_file/client.py +++ b/tap_file/client.py @@ -30,7 +30,7 @@ def filesystem(self) -> fsspec.AbstractFileSystem: """ return FilesystemManager(self.config, self.logger).get_filesystem() - def get_files(self, regex: str | None = None) -> Generator[str, None, None]: + def get_files(self) -> Generator[str, None, None]: """Gets file names to be synced. Yields: @@ -51,8 +51,6 @@ def get_files(self, regex: str | None = None) -> Generator[str, None, None]: ): continue empty = False - if regex is not None and not re.match(regex, Path(file["name"]).name): - continue yield file["name"] if empty: diff --git a/tap_file/streams.py b/tap_file/streams.py index f4f0cfc..7df3b4d 100644 --- a/tap_file/streams.py +++ b/tap_file/streams.py @@ -4,13 +4,14 @@ import csv import json +import re from functools import cached_property -from typing import Any, Generator, Iterable +from typing import Any, Generator from tap_file.client import FileStream -class SeparatedValuesStream(FileStream): +class DelimitedStream(FileStream): """Stream for reading CSVs and TSVs.""" def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]: @@ -42,51 +43,24 @@ def schema(self) -> dict[str, dict]: return {"properties": properties} def _get_readers(self) -> Generator[csv.DictReader[str], None, None]: - """Get all *SV readers. - - Raises: - StopIteration: To end iteration early if no more readers exist. - - Yields: - A *SV reader. - """ - file_type: str = self.config["file_type"] - delimiter: str = self.config["delimiter"] - delimiters: dict[str, str] = { - "csv": "," if delimiter == "detect" else delimiter, - "tsv": "\t" if delimiter == "detect" else delimiter, - } - - if file_type == "detect": - for reader in self._get_readers_helper( - delimiter=delimiters["csv"], - regex=".*\\.csv.*", - ): - yield reader - for reader in self._get_readers_helper( - delimiter=delimiters["tsv"], - regex=".*\\.tsv.*", - ): - yield reader - elif file_type in {"csv", "tsv"}: - for reader in self._get_readers_helper(delimiter=delimiters[file_type]): - yield reader - else: - raise StopIteration - - def _get_readers_helper( - self, - delimiter: str, - regex: str | None = None, - ) -> Generator[csv.DictReader[str], None, None]: - """Get a subset of *SV readers matching certain criteria. - - Yields: - A *SV reader. - """ quote_character: str = self.config["quote_character"] - for file in self.get_files(regex=regex): + for file in self.get_files(): + if self.config["delimiter"] == "detect": + if re.match(".*\\.csv.*", file): + delimiter = "," + elif re.match(".*\\.tsv.*", file): + delimiter = "\t" + else: + msg = ( + "Configuration option 'delimiter' is set to 'detect' but a " + "non-csv non-tsv file is present. Please manually specify " + "'delimiter'." + ) + raise RuntimeError(msg) + else: + delimiter = self.config["delimiter"] + with self.filesystem.open( path=file, mode="rt", @@ -104,16 +78,7 @@ def get_rows(self) -> Generator[dict[str, Any], None, None]: Yields: A dictionary containing information about a row in a JSONL file. """ - file_type: str = self.config["file_type"] - - if file_type == "detect": - regex = None - elif file_type == "jsonl": - regex = ".*\\.jsonl.*" - else: - raise StopIteration - - for file in self.get_files(regex=regex): + for file in self.get_files(): with self.filesystem.open( path=file, mode="rt", @@ -144,12 +109,12 @@ def _get_property(self, field: str) -> dict[str, dict[str, list[str]]]: field: { "type": [ "null", - "object", + "boolean", "integer", - "array", "number", - "boolean", "string", + "array", + "object", ], }, } @@ -160,10 +125,14 @@ def _get_property(self, field: str) -> dict[str, dict[str, list[str]]]: msg = f"The coercion strategy '{strategy}' is not valid." raise ValueError(msg) - def _get_fields(self) -> Iterable[str]: + def _get_fields(self) -> Generator[str, None, None]: strategy = self.config["jsonl_sampling_strategy"] if strategy == "first": - return list(next(self.get_rows())) + try: + yield from next(self.get_rows()) + except StopIteration: + return + return if strategy == "all": msg = f"The sampling strategy '{strategy}' has not been implemented." raise NotImplementedError(msg) diff --git a/tap_file/tap.py b/tap_file/tap.py index 3f739f9..5d620ea 100644 --- a/tap_file/tap.py +++ b/tap_file/tap.py @@ -49,13 +49,13 @@ class TapFile(Tap): th.Property( "file_type", th.RegexType, - default="detect", + default="delimited", description=( - "Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates " - "how to determine a file's type. If set to `detect`, file names " - "containing a matching extension will be read as that type and other " - "files will not be read. If set to a file type, *all* files will be " - "read as that type." + "Can be any of `delimited`, `jsonl`, or `avro`. Indicates the type of " + "file to sync, where `delimited` is for CSV/TSV files and similar. " + "Note that *all* files will be read as that type, regardless of file " + "extension. To only read from files with a matching file extension, " + "appropriately configure `file_regex`." ), ), th.Property( @@ -65,7 +65,10 @@ class TapFile(Tap): default="detect", description=( "The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, " - "`lzma`, `xz`, `none`, or `detect`." + "`lzma`, `xz`, `none`, or `detect`. If set to `none` or any encoding, " + "that setting will be applied to *all* files, regardless of file " + "extension. If set to `detect`, encodings will be applied based on " + "file extension." ), ), th.Property( @@ -73,10 +76,11 @@ class TapFile(Tap): th.StringType, default="detect", description=( - "The character used to separate records in a CSV/TSV. Can be any " - "character or the special value `detect`. If a character is provided, " - "all CSV and TSV files will use that value. `detect` will use `,` for " - " CSV files and `\\t` for TSV files." + "The character used to separate records in a delimited file. Can be " + "any character or the special value `detect`. If a character is " + "provided, all delimited files will use that value. `detect` will use " + "`,` for `.csv` files, `\\t` for `.tsv` files, and fail if other file " + "types are present." ), ), th.Property( @@ -168,10 +172,19 @@ def discover_streams(self) -> list[streams.FileStream]: A list of discovered streams. """ name = self.config["stream_name"] - return [ - streams.SeparatedValuesStream(self, name=name), - streams.JSONLStream(self, name=name), - ] + file_type = self.config["file_type"] + if file_type == "delimited": + return [streams.DelimitedStream(self, name=name)] + if file_type == "jsonl": + return [streams.JSONLStream(self, name=name)] + if file_type == "avro": + msg = "avro has not yet been implemented." + raise NotImplementedError(msg) + if file_type in {"csv", "tsv"}: + msg = f"{file_type} is not a valid 'file_type'. Did you mean 'delimited'?" + raise ValueError(msg) + msg = f"{file_type} is not a valid 'file_type'." + raise ValueError(msg) if __name__ == "__main__":