Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Publish the response to process in the context of the record's transformation and filtering steps #193

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
df11e5c
Store a reference to the root response in each record extracted from it
rpopov Dec 28, 2024
83a03d7
Added methods to remove the service keys from records to support the …
rpopov Dec 29, 2024
fd17e5a
Change and test the JSON / JSONL decoders to unify their behavior
rpopov Dec 29, 2024
d274539
Renamed a method
rpopov Dec 30, 2024
99a371d
Extracted a convention and 2 global functions to remove/check for ser…
rpopov Dec 30, 2024
1bb3e05
Avoid reusing the maps/dictionaries produced, thus avoid building cyc…
rpopov Dec 30, 2024
c71fd67
Added in-place update of the Mappings of service keys for cases of
rpopov Dec 31, 2024
b5e3650
Added explycit cleanup of the service fields, thus making them:
rpopov Dec 31, 2024
11472ee
Removed the skipping of the service fields in tests of features after…
rpopov Dec 31, 2024
e693c94
Formatted
rpopov Dec 31, 2024
daaea38
Applied automated formmatting as of the requirements of GitHub
rpopov Dec 31, 2024
9a05c94
Suppressed the false positive lint findings.
rpopov Dec 31, 2024
6587873
Update RELEASES.md
rpopov Jan 1, 2025
a33f815
Update CONTRIBUTING.md
rpopov Jan 1, 2025
4bc243d
Update CONTRIBUTING.md
rpopov Jan 1, 2025
62cca5e
Update CONTRIBUTING.md
rpopov Jan 2, 2025
b71da46
Updating the development environment setyp documentation
rpopov Jan 4, 2025
26367a6
50395 Updating the development environment setyp documentation
rpopov Jan 4, 2025
e1c5a47
Update CONTRIBUTING.md
rpopov Jan 5, 2025
4af3c8a
Update CONTRIBUTING.md
rpopov Jan 5, 2025
62d2417
Update record_extractor.py
rpopov Jan 6, 2025
8c636f7
Used the service key name instead of its literal value.
rpopov Jan 8, 2025
a4f16c3
Added the .lock
rpopov Jan 8, 2025
1065a20
Update unit_tests/sources/declarative/extractors/test_dpath_extractor.py
rpopov Jan 8, 2025
b2dab7b
Merge branch 'main' into 50395-1-2
rpopov Jan 8, 2025
118d785
Merged, formatted
rpopov Jan 9, 2025
b49a98d
Update documentation.
rpopov Jan 9, 2025
f7f19be
Update CONTRIBUTING.md
rpopov Jan 9, 2025
e5b9e5c
Handled nitpick comments by rabbit
rpopov Jan 9, 2025
638ed41
Merge branch 'main' into 50395-1-2
rpopov Jan 9, 2025
08366d6
The contract of body paring and iterating is not clear. New tests sta…
rpopov Jan 10, 2025
ca9026a
Restore the original contract
rpopov Jan 10, 2025
17f59e2
Formatted
rpopov Jan 10, 2025
04e12a3
Resolved the autmated nitpick comments
rpopov Jan 10, 2025
8d0c705
Trigger a new build
rpopov Jan 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ def decode(
try:
body_json = response.json()
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
yield {}
except requests.exceptions.JSONDecodeError as ex:
logger.warning("Response cannot be parsed into json: %s", ex)
logger.debug("Response to parse: %s", response.text, exc_info=True, stack_info=True)
yield from []

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]


@dataclass
Expand Down
23 changes: 21 additions & 2 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,25 @@
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
SERVICE_KEY_PREFIX,
RecordExtractor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config

# The name of the service field to bind the response (root) in each record
RECORD_ROOT_KEY = SERVICE_KEY_PREFIX + "root"


def update_record(record: Any, root: Any) -> Any:
if isinstance(record, dict):
copy = {k: v for k, v in record.items()}
copy.update({RECORD_ROOT_KEY: root})
else:
copy = record
return copy


@dataclass
class DpathExtractor(RecordExtractor):
Expand Down Expand Up @@ -70,6 +85,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
for body in self.decoder.decode(response):
root_response = body
if len(self._field_path) == 0:
extracted = body
else:
Expand All @@ -79,7 +95,10 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
for record in extracted:
yield update_record(record, root_response)
elif isinstance(extracted, dict):
yield update_record(extracted, root_response)
elif extracted:
yield extracted
else:
Expand Down
24 changes: 24 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/record_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@

import requests

# Convention:
# - The record extractors may leave service fields in the extracted records (mappings)
# - The names (keys) of the service fields have the value of SERVICE_KEY_PREFIX as their prefix
# - The service fields may be skipped only to ease the testing
SERVICE_KEY_PREFIX = "$"


def exclude_service_keys(mapping: Mapping[str, Any]) -> Mapping[str, Any]:
return {k: v for k, v in mapping.items() if not is_service_key(k)}


def remove_service_keys(mapping: dict[str, Any]): # type: ignore[no-untyped-def]
for key in list(mapping.keys()):
if is_service_key(key):
mapping.pop(key)


def is_service_key(k: str) -> bool:
return k.find(SERVICE_KEY_PREFIX) == 0


def verify_service_keys_exist(mapping: Mapping[str, Any]): # type: ignore[no-untyped-def]
assert mapping != exclude_service_keys(mapping), "Expected service are present"


@dataclass
class RecordExtractor:
Expand Down
15 changes: 13 additions & 2 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import requests

from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
RecordExtractor,
exclude_service_keys,
)
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
Expand Down Expand Up @@ -108,7 +111,8 @@ def filter_and_transform(
"""
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
no_service_fields_data = self._remove_service_keys(transformed_data)
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

Expand Down Expand Up @@ -156,3 +160,10 @@ def _transform(
stream_slice=stream_slice,
)
yield record

def _remove_service_keys(
self, records: Iterable[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
for record in records:
clean_record = exclude_service_keys(record)
yield clean_record
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def initial_token(self) -> Optional[Any]:
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Any]:
decoded_response = next(self.decoder.decode(response))
decoded_response = next(self.decoder.decode(response), {})

# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
Expand Down
16 changes: 10 additions & 6 deletions airbyte_cdk/sources/declarative/transformations/flatten_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

from airbyte_cdk.sources.declarative.extractors.record_extractor import is_service_key
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

Expand Down Expand Up @@ -32,12 +33,15 @@ def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:

if isinstance(current_record, dict):
for current_key, value in current_record.items():
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))
if not is_service_key(current_key):
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))
else: # transfer the service fields without change
transformed_record[current_key] = value

elif isinstance(current_record, list):
for i, item in enumerate(current_record):
Expand Down
171 changes: 117 additions & 54 deletions docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,132 @@ Thank you for being interested in contributing to Airbyte Python CDK! Here are s
Here are some tips to get started using the project dependencies and development tools:

1. Clone the CDK repo. If you will be testing connectors, you should clone the CDK into the same parent directory as `airbytehq/airbyte`, which contains the connector definitions.
1. Make sure [Poetry is installed](https://python-poetry.org/docs/#).
1. Run `poetry install --all-extras`.
1. Unit tests can be run via `poetry run pytest`.
1. You can use "Poe" tasks to perform common actions such as lint checks (`poetry run poe lint`), autoformatting (`poetry run poe format-fix`), etc. For a list of tasks you can run, try `poetry run poe list`.
1. [Install Poetry](https://python-poetry.org/docs/#).
2. Run `poetry install --all-extras` to install the unit tests' prerequisites and [Poe the Poet](https://poethepoet.natn.io/).

Note that installing all extras is required to run the full suite of unit tests.
## Local development

## Working with Poe Tasks
- Iterate on the CDK code locally.

The Airbyte CDK uses [Poe the Poet](https://poethepoet.natn.io/) to define common development task. You can run `poetry run poe list` to see all available tasks. This will work after `poetry install --all-extras` without any additional installations.
### Run Unit Tests

Optionally, if you can [pre-install Poe](https://poethepoet.natn.io/installation.html) with `pipx install poethepoet` and then you will be able to run Poe tasks with the shorter `poe TASKNAME` syntax instead of `poetry run poe TASKNAME`.
- `poetry run pytest` to run all unit tests.
- `poetry run pytest -k <suite or test name>` to run specific unit tests.
- `poetry run pytest-fast` to run the subset of PyTest tests which are not flagged as `slow`. (Should take <5 min for fast tests only.)
- `python -m pytest -s unit_tests` if you want to pass pytest options.

## Running tests locally
### Run Code Formatting

- Iterate on the CDK code locally.
- Run tests via `poetry run poe pytest`, or `python -m pytest -s unit_tests` if you want to pass pytest options.
- Run `poetry run pytest-fast` to run the subset of PyTest tests which are not flagged as `slow`. (Should take <5 min for fast tests only.)
- Run `poetry run poe check-local` to lint all code, type-check modified code, and run unit tests with coverage in one command.
- `poetry run ruff check .` to report the formatting issues.
- `poetry run ruff check --fix` to fix the formatting issues.
- `poetry run ruff format` to format your Python code.
- `poetry run poe format-fix` to auto-fix formatting issues.

To see all available scripts, run `poetry run poe`.
### Run Code Linting

## Formatting Code
- `poetry run poe lint` for lint checks.
- `poetry run poe check-local` to lint all code, type-check modified code, and run unit tests with coverage in one command.
- `poetry run mypy --config-file mypy.ini airbyte_cdk` to validate the code. Resolve the reported issues.

### More tools and options

To see all available scripts and options, run:
- `poetry run ruff`
- `poetry run pytest --help`
- `poetry run poe`


## Test CDK Changes against Connectors (integration tests)

When developing a new feature in the CDK, you may find it necessary to run a connector that uses that new feature or to use an existing connector to validate a new feature or fix in the CDK. The [GitHub pipelines](.github/workflows/connector-tests.yml) in this project run such tests against the Shopify source as **integration tests**.

### Option 1: Installing your local CDK into a local Python connector

**Assumptions:**
* The connector to test with is in the root [Airbyte project](https://github.com/airbytehq/airbyte).
* The [Airbyte project](https://github.com/airbytehq/airbyte) is checked out in `airbyte` directory.
* The CDK development project is checked out in the `airbyte-python-cdk` directory - a sibling of the `airbyte` directory.

**Preparation steps**
* As of
* [Acceptance Tests Reference](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
* [Connector Acceptance Tests](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/bases/connector-acceptance-test/README.md)

```
cd airbyte-integrations/bases/connector-acceptance-test/
poetry install
```

* Change the current directory to the connector's directory, e.g.
`cd airbyte/airbyte-integrations/connectors/source-shopify`
* Edit the connector's `pyproject.toml` file
* Replace the line with `airbyte_cdk` with the following:

```toml
airbyte_cdk = { path = "../../../../airbyte-python-cdk", develop = true }
```

* Run `poetry update` to reinstall `airbyte_cdk` from your local working directory.

**Steps:**

* Run the connector's tests (see the connector's README.md)
```
cd airbyte/airbyte-integrations/connectors/<connector name>

poetry run <connector name> spec
poetry run <connector name> check --config secrets/config.json
poetry run <connector name> discover --config secrets/config.json
poetry run <connector name> read --config secrets/config.json --catalog integration_tests/<connector name>.json
poetry run pytest
```
e.g.
```
cd airbyte/airbyte-integrations/connectors/source-shopify

poetry run source-shopify spec
poetry run source-shopify check --config secrets/config.json
poetry run source-shopify discover --config secrets/config.json
poetry run source-shopify read --config secrets/config.json --catalog integration_tests/configured_catalog.json
poetry run pytest
```
* Run the acceptance tests:
```
cd airbyte/airbyte-integrations/bases/connector-acceptance-test

poetry run pytest -p connector_acceptance_test.plugin --acceptance-test-config=../../connectors/<connector name> --pdb
```

* When testing is complete, revert your test changes in the Connector.

### Option 2: Build and Test Connectors Using `airbyte-ci --use-local-cdk`

_Pre-requisite: Install the [`airbyte-ci` CLI](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)_

**Steps:**

* Build your connector image with the local CDK using:

```bash
# from the airbytehq/airbyte base directory
airbyte-ci connectors --use-local-cdk --name=<CONNECTOR> build
```

* Use the `test` command with `--use-local-cdk` to run the full set of connector tests, including connector acceptance tests (CAT) and the connector's own unit tests:

```bash
# from the airbytehq/airbyte base directory
airbyte-ci connectors --use-local-cdk --name=<CONNECTOR> test
```

Note that the local CDK is injected at build time, so if you make changes, you will have to run the build command again to see them reflected.

- Iterate on the CDK code locally.
- Run `poetry run poe format-fix` to auto-fix formatting issues.
- If you only want to format Python code (excluding markdown, yaml, etc.), you can use `poetry run ruff format` to autoformat your Python code.

To see all available `ruff` options, run `poetry run ruff`.
## Working with Poe Tasks

The Airbyte CDK uses [Poe the Poet](https://poethepoet.natn.io/) to define common development task. You can run `poetry run poe list` to see all available tasks. This will work after `poetry install --all-extras` without any additional installations.

Optionally, if you can [pre-install Poe](https://poethepoet.natn.io/installation.html) with `pipx install poethepoet` and then you will be able to run Poe tasks with the shorter `poe TASKNAME` syntax instead of `poetry run poe TASKNAME`.

## Auto-Generating the Declarative Schema File

Expand Down Expand Up @@ -113,41 +210,7 @@ docker run -d --rm -v $(pwd)/secrets/mock_server_config:/config -p 8113:8113 --e

HTTP requests to `localhost:8113/data` should now return the body defined in the expectations file. To test this, the implementer either has to change the code which defines the base URL for Python source or update the `url_base` from low-code. With the Connector Builder running in docker, you will have to use domain `host.docker.internal` instead of `localhost` as the requests are executed within docker.

## Testing Connectors against local CDK Changes

When developing a new feature in the CDK, you will sometimes find it necessary to run a connector that uses that new feature, or to use an existing connector to validate some new feature or fix in the CDK.

### Option 1: Installing your local CDK into a local Python connector

Open the connector's `pyproject.toml` file and replace the line with `airbyte_cdk` with the following:

```toml
airbyte_cdk = { path = "../../../../airbyte-python-cdk", develop = true }
```

Then, running `poetry update` should reinstall `airbyte_cdk` from your local working directory. When testing is complete and you've published the CDK update, remember to revert your change and bump to the latest CDK version before re-publishing the connector.

### Option 2: Build and Test Connectors Using `airbyte-ci --use-local-cdk`

_Pre-requisite: Install the [`airbyte-ci` CLI](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)_

You can build your connector image with the local CDK using

```bash
# from the airbytehq/airbyte base directory
airbyte-ci connectors --use-local-cdk --name=<CONNECTOR> build
```

Or use the `test` command with `--use-local-cdk` to run the full set of connector tests, including connector acceptance tests (CAT) and the connector's own unit tests:

```bash
# from the airbytehq/airbyte base directory
airbyte-ci connectors --use-local-cdk --name=<CONNECTOR> build
```

Note that the local CDK is injected at build time, so if you make changes, you will have to run the build command again to see them reflected.

#### Running Connector Acceptance Tests for a single connector in Docker with your local CDK installed
## Running Connector Acceptance Tests for a single connector in Docker with your local CDK installed

_Pre-requisite: Install the
[`airbyte-ci` CLI](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)_
Expand Down
Loading
Loading