Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avro-sdc #18

Merged
merged 3 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@ pipx install git+https://github.com/MeltanoLabs/tap-file.git
| protocol | True | None | The protocol to use to retrieve data. One of `file` or `s3`. |
| filepath | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. |
| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. |
| file_type | False | detect | Can be any of `csv`, `tsv`, `json`, `avro`, or `detect`. Indicates how to determine a file's type. If set to `detect`, file names containing a matching extension will be read as that type and other files will not be read. If set to a file type, *all* files will be read as that type. |
| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. |
| delimiter | False | detect | The character used to separate records in a CSV/TSV. Can be any character or the special value `detect`. If a character is provided, all CSV and TSV files will use that value. `detect` will use `,` for CSV files and `\t` for TSV files. |
| quote_character | False | " | The character used to indicate when a record in a CSV contains a delimiter character. |
| file_type | False | delimited | Can be any of `delimited`, `jsonl`, or `avro`. Indicates the type of file to sync, where `delimited` is for CSV/TSV files and similar. Note that *all* files will be read as that type, regardless of file extension. To only read from files with a matching file extension, appropriately configure `file_regex`. |
| compression | False | detect | The encoding to use to decompress data. One of `zip`, `bz2`, `gzip`, `lzma`, `xz`, `none`, or `detect`. If set to `none` or any encoding, that setting will be applied to *all* files, regardless of file extension. If set to `detect`, encodings will be applied based on file extension. |
| additional_info | False | 1 | If `True`, each row in tap's output will have two additional columns: `_sdc_file_name` and `_sdc_line_number`. If `False`, these columns will not be present. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaulting to True I think makes more sense here. Does it apply for all file types though? Like avro I'm not sure it does 🤷‍♂️ we'd have to force some kind of implementation for each stream type maybe some can just return n/a or something if it doesn't make sense.

Copy link
Collaborator Author

@sebastianswms sebastianswms Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already does default to True. required=False, default=1. That's just how --about format=markdown does it.

I think it applies to all file types, or at least all file types we have so far. For avro it just increments _sdc_line_number for each record in a file and resets the count for each new file.

| delimited_delimiter | False | detect | The character used to separate records in a delimited file. Can be any character or the special value `detect`. If a character is provided, all delimited files will use that value. `detect` will use `,` for `.csv` files, `\t` for `.tsv` files, and fail if other file types are present. |
| delimited_quote_character | False | " | The character used to indicate when a record in a delimited file contains a delimiter character. |
| delimited_header_skip | False | 0 | The number of initial rows to skip at the beginning of each delimited file. |
| delimited_footer_skip | False | 0 | The number of initial rows to skip at the end of each delimited file. |
| delimited_override_headers | False | None | An optional array of headers used to override the default column name in delimited files, allowing for headerless files to be correctly read. |
| jsonl_sampling_strategy | False | first | The strategy determining how to read the keys in a JSONL file. Must be one of `first` or `all`. Currently, only `first` is supported, which will assume that the first record in a file is representative of all keys. |
| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `blob`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `blob` will deliver each JSONL row as a JSON object with no internal schema. Currently, only `any` and `string` are supported. |
| jsonl_type_coercion_strategy| False | any | The strategy determining how to construct the schema for JSONL files when the types represented are ambiguous. Must be one of `any`, `string`, or `envelope`. `any` will provide a generic schema for all keys, allowing them to be any valid JSON type. `string` will require all keys to be strings and will convert other values accordingly. `envelope` will deliver each JSONL row as a JSON object with no internal schema. |
| avro_type_coercion_strategy | False | convert | The strategy determining how to construct the schema for Avro files when conversion between schema types is ambiguous. Must be one of `convert` or `envelope`. `convert` will attempt to convert from Avro Schema to JSON Schema and will fail if a type can't be easily coerced. `envelope` will wrap each record in an object without providing an internal schema for the record. |
| s3_anonymous_connection | False | 0 | Whether to use an anonymous S3 connection, without any credentials. Ignored if `protocol!=s3`. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_ACCESS_KEY_ID | False | $AWS_ACCESS_KEY_ID | The access key to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| AWS_SECRET_ACCESS_KEY | False | $AWS_SECRET_ACCESS_KEY | The access key secret to use when authenticating to S3. Ignored if `protocol!=s3` or `s3_anonymous_connection=True`. Defaults to the value of the environment variable of the same name. |
| caching_strategy | False | once | *DEVELOPERS ONLY* The caching method to use when `protocol!=file`. One of `none`, `once`, or `persistent`. `none` does not use caching at all. `once` (the default) will cache all files for the duration of the tap's invocation, then discard them upon completion. `peristent` will allow caches to persist between invocations of the tap, storing them in your OS's temp directory. It is recommended that you do not modify this setting. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
Expand Down
13 changes: 11 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ plugins:
- name: file_regex
- name: file_type
- name: compression
- name: delimiter
- name: quote_character
- name: additional_info
kind: boolean
- name: delimited_delimiter
- name: delimited_quote_character
- name: delimited_header_skip
kind: integer
- name: delimited_footer_skip
kind: integer
- name: delimited_override_headers
kind: array
- name: jsonl_sampling_strategy
- name: jsonl_type_coercion_strategy
- name: avro_type_coercion_strategy
- name: s3_anonymous_connection
- name: AWS_ACCESS_KEY_ID
kind: password
Expand Down
44 changes: 30 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ singer-sdk = { version="^0.28.0" }
fsspec = "^2023.5.0"
s3fs = { version = "^2023.5.0", optional = true}
fs-s3fs = { version = "^1.1.1", optional = true}
avro = "^1.1.11"

[tool.poetry.extras]
s3=["s3fs", "fs-s3fs"]
Expand Down
44 changes: 44 additions & 0 deletions tap_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,38 @@ def filesystem(self) -> fsspec.AbstractFileSystem:
"""
return FilesystemManager(self.config, self.logger).get_filesystem()

@cached_property
def schema(self) -> dict:
"""Orchestrates schema creation for all streams.

Returns:
A schema constructed using the get_properties() method of whichever stream
is currently in use.
"""
properties = self.get_properties()
additional_info = self.config["additional_info"]
if additional_info:
properties.update({"_sdc_file_name": {"type": ["string"]}})
properties.update({"_sdc_line_number": {"type": ["integer"]}})
return {"properties": properties}

def add_additional_info(self, row: dict, file_name: str, line_number: int) -> dict:
"""Adds _sdc-prefixed additional columns to a row, dependent on config.

Args:
row: The row to add info to.
file_name: The name of the file that the row came from.
line_number: The line number of the row within its file.

Returns:
A dictionary representing a row containing additional information columns.
"""
additional_info = self.config["additional_info"]
if additional_info:
row.update({"_sdc_file_name": file_name})
row.update({"_sdc_line_number": line_number})
return row

def get_files(self) -> Generator[str, None, None]:
"""Gets file names to be synced.

Expand Down Expand Up @@ -72,6 +104,18 @@ def get_rows(self) -> Generator[dict[str | Any, str | Any], None, None]:
msg = "get_rows must be implemented by subclass."
raise NotImplementedError(msg)

def get_properties(self) -> dict:
"""Gets properties for the purpose of schema generation.

Raises:
NotImplementedError: This must be implemented by a subclass.

Returns:
A dictionary representing a series of properties for schema generation.
"""
msg = "get_properties must be implemented by subclass."
raise NotImplementedError(msg)

def get_compression(self, file: str) -> str | None: # noqa: PLR0911
"""Determines what compression encoding is appropraite for a given file.

Expand Down
Loading