Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: In each record to filter and transform, publish a local service field holding the original object the record is extracted from #214

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
17 changes: 7 additions & 10 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ def decode(
try:
body_json = response.json()
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
yield {}
except requests.exceptions.JSONDecodeError as ex:
logger.warning("Response cannot be parsed into json: %s", ex)
logger.debug("Response to parse: %s", response.text, exc_info=True, stack_info=True)
yield {} # Keep the exiting contract

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]


@dataclass
Expand Down
49 changes: 36 additions & 13 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,25 @@
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
SERVICE_KEY_PREFIX,
RecordExtractor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config

# The name of the service field to bind the response (root) in each record
RECORD_ROOT_KEY = SERVICE_KEY_PREFIX + "root"


def update_record(record: Any, root: Any) -> Any:
if isinstance(record, dict):
copy = {k: v for k, v in record.items()}
copy.update({RECORD_ROOT_KEY: root})
else:
copy = record
return copy


@dataclass
class DpathExtractor(RecordExtractor):
Expand Down Expand Up @@ -70,17 +85,25 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
for body in self.decoder.decode(response):
if len(self._field_path) == 0:
extracted = body
if body == {}:
# An empty/invalid JSON parsed, keep the contract
yield {}
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
root_response = body
if len(self._field_path) == 0:
extracted = body
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
elif extracted:
yield extracted
else:
yield from []
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.values(body, path)
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
for record in extracted:
yield update_record(record, root_response)
elif isinstance(extracted, dict):
yield update_record(extracted, root_response)
elif extracted:
yield extracted
else:
yield from []
27 changes: 27 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/record_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@

import requests

# Convention:
# - The record extractors may leave service fields bound in the extracted records (mappings).
# - The names (keys) of the service fields have the value of SERVICE_KEY_PREFIX as their prefix.
# - The service fields are kept only during the record's filtering and transormation.
SERVICE_KEY_PREFIX = "$"


def exclude_service_keys(mapping: Mapping[str, Any]) -> Mapping[str, Any]:
return {k: v for k, v in mapping.items() if not is_service_key(k)}


def remove_service_keys(mapping: dict[str, Any]): # type: ignore[no-untyped-def]
"""
Modify the parameter by removing the service keys from it.
"""
for key in list(mapping.keys()):
if is_service_key(key):
mapping.pop(key)


def is_service_key(key: str) -> bool:
return key.find(SERVICE_KEY_PREFIX) == 0


def assert_service_keys_exist(mapping: Mapping[str, Any]): # type: ignore[no-untyped-def]
assert mapping != exclude_service_keys(mapping), "The mapping should contain service keys"


@dataclass
class RecordExtractor:
Expand Down
15 changes: 13 additions & 2 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import requests

from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import (
RecordExtractor,
exclude_service_keys,
)
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.type_transformer import (
TypeTransformer as DeclarativeTypeTransformer,
Expand Down Expand Up @@ -106,7 +109,8 @@ def filter_and_transform(
"""
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
no_service_fields_data = self._remove_service_keys(transformed_data)
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

Expand Down Expand Up @@ -154,3 +158,10 @@ def _transform(
stream_slice=stream_slice,
)
yield record

def _remove_service_keys(
self, records: Iterable[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
for record in records:
clean_record = exclude_service_keys(record)
yield clean_record
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def next_page_token(
last_record: Optional[Record],
last_page_token_value: Optional[Any] = None,
) -> Optional[Any]:
decoded_response = next(self.decoder.decode(response))
decoded_response = next(self.decoder.decode(response), {})

# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
Expand Down
16 changes: 10 additions & 6 deletions airbyte_cdk/sources/declarative/transformations/flatten_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

from airbyte_cdk.sources.declarative.extractors.record_extractor import is_service_key
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

Expand Down Expand Up @@ -32,12 +33,15 @@ def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:

if isinstance(current_record, dict):
for current_key, value in current_record.items():
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))
if not is_service_key(current_key):
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))
else: # transfer the service fields without change
transformed_record[current_key] = value

elif isinstance(current_record, list):
for i, item in enumerate(current_record):
Expand Down
Loading
Loading