Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: In each record to filter and transform, publish a local service field holding the original object the record is extracted from #214

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
from .sources.declarative.declarative_stream import DeclarativeStream
from .sources.declarative.decoders import Decoder, JsonDecoder
from .sources.declarative.exceptions import ReadException
from .sources.declarative.extractors import DpathExtractor, RecordSelector
from .sources.declarative.extractors import DpathEnhancingExtractor, DpathExtractor, RecordSelector
from .sources.declarative.extractors.record_extractor import RecordExtractor
from .sources.declarative.extractors.record_filter import RecordFilter
from .sources.declarative.incremental import DatetimeBasedCursor
Expand Down Expand Up @@ -234,6 +234,7 @@
"DefaultPaginator",
"DefaultRequestOptionsProvider",
"DpathExtractor",
"DpathEnhancingExtractor",
"FieldPointer",
"HttpMethod",
"HttpRequester",
Expand Down
34 changes: 34 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,39 @@ definitions:
$parameters:
type: object
additionalProperties: true

DpathEnhancingExtractor:
title: Dpath Enhancing Extractor
description: |
Extract records from a response, navigating a path as an array of fields. Include $parent and $root service fields where:
$root holds the original response;
$parent holds the attributes container object, including its $parent field.
These service fields are local for the filter and transform phases.
type: object
required:
- type
- field_path
properties:
type:
type: string
enum: [DpathEnhancingExtractor]
field_path:
title: Field Path
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
type: array
items:
- type: string
interpolation_context:
- config
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
$parameters:
type: object
additionalProperties: true

ResponseToFileExtractor:
title: CSV To File Extractor
description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed.
Expand Down Expand Up @@ -2770,6 +2803,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/DpathEnhancingExtractor"
record_filter:
title: Record Filter
description: Responsible for filtering records to be emitted by the Source.
Expand Down
17 changes: 7 additions & 10 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ def decode(
try:
body_json = response.json()
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
yield {}
except requests.exceptions.JSONDecodeError as ex:
logger.warning("Response cannot be parsed into json: %s", ex)
logger.debug("Response to parse: %s", response.text, exc_info=True, stack_info=True)
yield {} # Keep the exiting contract

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]


@dataclass
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.extractors.dpath_enhancing_extractor import (
DpathEnhancingExtractor,
)
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
Expand All @@ -15,6 +18,7 @@
"TypeTransformer",
"HttpSelector",
"DpathExtractor",
"DpathEnhancingExtractor",
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Union

from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
SERVICE_KEY_PREFIX,
add_service_key,
is_service_key,
)

# The name of the service key that holds a reference to the original root response
SERVICE_KEY_ROOT = "root"

# The name of the service key that holds a reference to the owner object
SERVICE_KEY_PARENT = "parent"


@dataclass
class DpathEnhancingExtractor(DpathExtractor):
"""
Navigate a path through the JSON structure to the records to retrieve. Extend the records with service fields
applicable to their filtering and transformation.

Like the DpathExtractor, extract records from a response by following a path of names of nested objects,
while adding specific service fields to the extracted records to facilitate the further processing.

Service fields:
root: Binds the original response body, the record was extracted from. This allows the record access any attribute
in any nested object, navigating from the root field.

parent: Binds a map of the parent object's attributes, including its "parent" service field. This way the extracted
record has access to the attributes of any object This is especially useful when the records are extracted from
nested lists.

Example:
body: {"a":1, "b":2, "c":{"d":4}}\n
path: {c}\n
record: {"d":4,"parent": { "a":1, "b":2}, "root": { "a":1, "b":2, "c":{"d":4}}}\n
access: {{ record.d }}, {{ record["parent"].a }}, {{ record["parent"].b }}, {{ record.["root"].a }}...

Example:
body: {"a":1, "b":2, "c":[{"d":4},{"e",5}]}\n
path: {c, *}\n
record 1: {"d":4, "parent":{ "a":1, "b":2}, "root":{ "a":1, "b":2, "c":[{"d":4},{"e",5}]})\n
record 2: {"e",5, "parent":{ "a":1, "b":2}, "root":{ "a":1, "b":2, "c":[{"d":4},{"e",5}]})\n
access: {{ record.d }}, {{ record["parent"].a }}, {{ record["parent"].b }}, {{ record.["root"].a }}...

Example:
body: { "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}\n
path: {c,e}\n
record: {"f":6, "parent": {"d":4, parent: { "a":1, "b":2}},"root":{ "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}}\n
access: {{ record.f }}, {{ record["parent"].d }}, {{ record["parent"]["parent"].a }},\n
{{ record["parent"]["parent"].b }},{{ record.["root"].a }}, {{ record.["root"].a.c.d }}...

Note:
The names of the service fields have a specific prefix like $ set in SERVICE_KEY_PREFIX.\n
When the result record is the body object itself, then the "parent" service field is not set (as it is None).\n
When the parent contains no attributes and no parent service field, the parent field is not bound.\n
The "root" service field is always set in the result record.
"""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
"""
See DpathExtractor
"""
super().__post_init__(parameters)

def update_body(self, body: Any) -> Any:
"""
In each nested object in the body add a service key "parent" to refer to the owner object.
For the root object/body the owner is None.
Example:
body = { "a":1, "b":2, "c":{"d":4}}
result = { "a":1,
"b":2,
"c":{"d":4,
parent: { "a":1, "b":2}}}

Example:
body = { "a":1, "b":2, "c":[{"d":4},{"e",5}]}
result = { "a":1,
"b":2,
"c":[{"d":4, "parent":{ "a":1, "b":2}},
{"e",5, "parent":{ "a":1, "b":2}}],
}

Example:
body = { "a":1, "b":2, "c":{"d":4, "e":{"f":6}}}
result = { "a":1,
"b":2,
"c":{"d":4,
parent: { "a":1, "b":2},
"e":{"f":6,
"parent": {"d":4,
parent: { "a":1, "b":2}} }}}

:param body: the original response body. Not to be changed
:return: a copy of the body, where the nested objects have the "parent" service field bound to the map of the
parent object's attributes (including its "parent" service fields). This way any record that will be
extracted from the nested objects will have access to any parent's attributes still avoiding loops
in the JSON structure.
"""
return self._set_parent(body, None)

def _set_parent(self, body: Any, parent: Any) -> Any:
"""
:param body: the original response body. Not to be changed
:param parent: none or the parent object that owns/has as nested the body object
:return: a copy of the body enhanced in a subclass-specific way. None only when body is None.
"""
if isinstance(body, dict):
result: dict[str, Any] = dict()
if parent:
result = add_service_key(result, SERVICE_KEY_PARENT, parent)
attributes_only = dict(result)
attributes_only.update(
{
k: v
for k, v in body.items()
if v and not isinstance(v, dict) and not isinstance(v, list)
}
)
for k, v in body.items():
result[k] = self._set_parent(v, attributes_only)
return result
elif isinstance(body, list):
return [self._set_parent(v, parent) for v in body]
else:
return body

def update_record(self, record: Any, root: Any) -> Any:
"""
Change the extracted record in a subclass-specific way. Override in subclasses.
:param record: the original extracted record. Not to be changed. Not None.
:param root: the original body the record is extracted from.
:return: a copy of the record changed or enanced in a subclass-specific way.
"""
return add_service_key(record, SERVICE_KEY_ROOT, root)
51 changes: 39 additions & 12 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,44 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
for body in self.decoder.decode(response):
if len(self._field_path) == 0:
extracted = body
if body == {}:
# An empty/invalid JSON parsed, keep the contract
yield {}
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
root_response = body
body = self.update_body(root_response)

if len(self._field_path) == 0:
extracted = body
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
elif extracted:
yield extracted
else:
yield from []
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
for record in extracted:
yield self.update_record(record, root_response)
elif isinstance(extracted, dict):
yield self.update_record(extracted, root_response)
elif extracted:
yield extracted
else:
yield from []

def update_body(self, body: Any) -> Any:
"""
Change the original response in a subclass-specific way. Override in subclasses.
:param body: the original response body. Not to be changed
:return: a copy of the body enhanced in a subclass-specific way. None only when body is None.
"""
return body

def update_record(self, record: Any, root: Any) -> Any:
"""
Change the extracted record in a subclass-specific way. Override in subclasses.
:param record: the original extracted record. Not to be changed. Not None.
:param root: the original body the record is extracted from.
:return: a copy of the record changed or enanced in a subclass-specific way.
"""
return record
40 changes: 40 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/record_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,46 @@

import requests

# Convention:
# - The record extractors may leave service fields bound in the extracted records (mappings).
# - The names (keys) of the service fields have the value of SERVICE_KEY_PREFIX as their prefix.
# - The service fields are kept only during the record's filtering and transformation.
SERVICE_KEY_PREFIX = "$"


def add_service_key(mapping: Mapping[str, Any], key: str, value: Any) -> dict[str, Any]:
"""
:param mapping: non-null mapping
:param key: the name of the key, not including any specific prefixes
:param value: the value to bind
:return: a non-null copy of the mappibg including a new key-value pair, where the key is prefixed as service field.
"""
result = dict(mapping)
result[SERVICE_KEY_PREFIX + key] = value
return result


def exclude_service_keys(struct: Any) -> Any:
"""
:param struct: any object/JSON structure
:return: a copy of struct without any service fields at any level of nesting
"""
if isinstance(struct, dict):
return {k: exclude_service_keys(v) for k, v in struct.items() if not is_service_key(k)}
elif isinstance(struct, list):
return [exclude_service_keys(v) for v in struct]
else:
return struct


def is_service_key(key: str) -> bool:
return key.find(SERVICE_KEY_PREFIX) == 0


def remove_service_keys(records: Iterable[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]:
for record in records:
yield exclude_service_keys(record)


@dataclass
class RecordExtractor:
Expand Down
9 changes: 6 additions & 3 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import requests

from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
RecordExtractor,
remove_service_keys,
)
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.type_transformer import (
TypeTransformer as DeclarativeTypeTransformer,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TypeTransformer
Expand Down Expand Up @@ -106,7 +108,8 @@ def filter_and_transform(
"""
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
no_service_fields_data = remove_service_keys(transformed_data)
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

Expand Down
Loading
Loading