Skip to content

Commit

Permalink
Merge pull request #18 from sebastianswms/avro-sdc
Browse files Browse the repository at this point in the history
feat: avro-sdc
  • Loading branch information
visch authored Jul 12, 2023
2 parents 21b8d12 + 856dc41 commit e083c0d
Show file tree
Hide file tree
Showing 18 changed files with 1,597 additions and 72 deletions.
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@ pipx install git+https://github.com/MeltanoLabs/tap-file.git
| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. |
| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. |
| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. |
| file_type | False | detect | Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates how to determine a file's type. If set to `detect`, file names containing a matching extension will be read as that type and other files will not be read. If set to a file type, *all* files will be read as that type. |
| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. |
| delimiter | False | detect | The character used to separate records in a CSV/TSV. Can be any character or the special value `detect`. If a character is provided, all CSV and TSV files will use that value. `detect` will use `,` for CSV files and `\t` for TSV files. |
| quote_character | False | " | The character used to indicate when a record in a CSV contains a delimiter character. |
| file_type | False | delimited | Can be any of `delimited`, `jsonl`, or `avro`. Indicates the type of file to sync, where `delimited` is for CSV/TSV files and similar. Note that *all* files will be read as that type, regardless of file extension. To only read from files with a matching file extension, appropriately configure `file_regex`. |
| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. If set to `none` or any encoding, that setting will be applied to *all* files, regardless of file extension. If set to `detect`, encodings will be applied based on file extension. |
| additional_info | False | 1 | If `True`, each row in tap's output will have two additional columns: `_sdc_file_name` and `_sdc_line_number`. If `False`, these columns will not be present. |
| delimited_delimiter | False | detect | The character used to separate records in a delimited file. Can be any character or the special value `detect`. If a character is provided, all delimited files will use that value. `detect` will use `,` for `.csv` files, `\t` for `.tsv` files, and fail if other file types are present. |
| delimited_quote_character | False | " | The character used to indicate when a record in a delimited file contains a delimiter character. |
| delimited_header_skip | False | 0 | The number of initial rows to skip at the beginning of each delimited file. |
| delimited_footer_skip | False | 0 | The number of initial rows to skip at the end of each delimited file. |
| delimited_override_headers | False | None | An optional array of headers used to override the default column name in delimited files, allowing for headerless files to be correctly read. |
| jsonl_sampling_strategy | False | first | The strategy determining how to read the keys in a JSONL file. Must be one of `first` or `all`. Currently, only `first` is supported, which will assume that the first record in a file is representative of all keys. |
| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `blob`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `blob` will deliver each JSONL row as a JSON object with no internal schema. Currently, only `any` and `string` are supported. |
| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `envelope`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `envelope` will deliver each JSONL row as a JSON object with no internal schema. |
| avro_type_coercion_strategy | False | convert | The strategy determining how to construct the schema for Avro files when conversion between schema types is ambiguous. Must be one of `convert` or `envelope`. `convert` will attempt to convert from Avro Schema to JSON Schema and will fail if a type can't be easily coerced. `envelope` will wrap each record in an object without providing an internal schema for the record. |
| s3_anonymous_connection | False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| caching_strategy | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. One of `none`, `once`, or `persistent`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
Expand Down
13 changes: 11 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ plugins:
- name: file_regex
- name: file_type
- name: compression
- name: delimiter
- name: quote_character
- name: additional_info
kind: boolean
- name: delimited_delimiter
- name: delimited_quote_character
- name: delimited_header_skip
kind: integer
- name: delimited_footer_skip
kind: integer
- name: delimited_override_headers
kind: array
- name: jsonl_sampling_strategy
- name: jsonl_type_coercion_strategy
- name: avro_type_coercion_strategy
- name: s3_anonymous_connection
- name: AWS_ACCESS_KEY_ID
kind: password
Expand Down
44 changes: 30 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ singer-sdk = { version="^0.28.0" }
fsspec = "^2023.5.0"
s3fs = { version = "^2023.5.0", optional = true}
fs-s3fs = { version = "^1.1.1", optional = true}
avro = "^1.1.11"

[tool.poetry.extras]
s3=["s3fs", "fs-s3fs"]
Expand Down
44 changes: 44 additions & 0 deletions tap_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,38 @@ def filesystem(self) -> fsspec.AbstractFileSystem:
"""
return FilesystemManager(self.config, self.logger).get_filesystem()

@cached_property
def schema(self) -> dict:
"""Orchestrates schema creation for all streams.
Returns:
A schema constructed using the get_properties() method of whichever stream
is currently in use.
"""
properties = self.get_properties()
additional_info = self.config["additional_info"]
if additional_info:
properties.update({"_sdc_file_name": {"type": ["string"]}})
properties.update({"_sdc_line_number": {"type": ["integer"]}})
return {"properties": properties}

def add_additional_info(self, row: dict, file_name: str, line_number: int) -> dict:
"""Adds _sdc-prefixed additional columns to a row, dependent on config.
Args:
row: The row to add info to.
file_name: The name of the file that the row came from.
line_number: The line number of the row within its file.
Returns:
A dictionary representing a row containing additional information columns.
"""
additional_info = self.config["additional_info"]
if additional_info:
row.update({"_sdc_file_name": file_name})
row.update({"_sdc_line_number": line_number})
return row

def get_files(self) -> Generator[str, None, None]:
"""Gets file names to be synced.
Expand Down Expand Up @@ -72,6 +104,18 @@ def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]:
msg = "get_rows must be implemented by subclass."
raise NotImplementedError(msg)

def get_properties(self) -> dict:
"""Gets properties for the purpose of schema generation.
Raises:
NotImplementedError: This must be implemented by a subclass.
Returns:
A dictionary representing a series of properties for schema generation.
"""
msg = "get_properties must be implemented by subclass."
raise NotImplementedError(msg)

def get_compression(self, file: str) -> str | None: # noqa: PLR0911
"""Determines what compression encoding is appropraite for a given file.
Expand Down
Loading

0 comments on commit e083c0d

Please sign in to comment.