Skip to content

Commit

Permalink
feat(low-code cdk): add component resolver and http component resolver (
Browse files Browse the repository at this point in the history
#88)

Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
lazebnyi and octavia-squidington-iii authored Dec 5, 2024
1 parent 3e671b8 commit a07b04a
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 38 deletions.
10 changes: 7 additions & 3 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,13 @@ def _group_streams(

state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

name_to_stream_mapping = {
stream["name"]: stream for stream in self.resolved_manifest["streams"]
}
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)

name_to_stream_mapping = {stream["name"]: stream for stream in streams}

for declarative_stream in self.streams(config=config):
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
Expand Down
102 changes: 100 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ version: 1.0.0
required:
- type
- check
- streams
- version
anyOf:
- required:
- streams
- required:
- dynamic_streams
properties:
type:
type: string
Expand All @@ -19,6 +23,10 @@ properties:
type: array
items:
"$ref": "#/definitions/DeclarativeStream"
dynamic_streams:
type: array
items:
"$ref": "#/definitions/DynamicDeclarativeStream"
version:
type: string
description: The version of the Airbyte CDK used to build and test the source.
Expand Down Expand Up @@ -1321,7 +1329,7 @@ definitions:
type: array
items:
- type: string
interpolation_content:
interpolation_context:
- config
examples:
- ["data"]
Expand Down Expand Up @@ -2895,6 +2903,96 @@ definitions:
$parameters:
type: object
additionalProperties: true
ComponentMappingDefinition:
title: Component Mapping Definition
description: (This component is experimental. Use at your own risk.) Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts.
type: object
required:
- type
- field_path
- value
properties:
type:
type: string
enum: [ComponentMappingDefinition]
field_path:
title: Field Path
description: A list of potentially nested fields indicating the full path where value will be added or updated.
type: array
items:
- type: string
interpolation_context:
- config
- components_values
- stream_template_config
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
value:
title: Value
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
type: string
interpolation_context:
- config
- stream_template_config
- components_values
examples:
- "{{ components_values['updates'] }}"
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
- "{{ config['segment_id'] }}"
value_type:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
$parameters:
type: object
additionalProperties: true
HttpComponentsResolver:
type: object
description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched via an HTTP retriever.
properties:
type:
type: string
enum: [HttpComponentsResolver]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
components_mapping:
type: array
items:
"$ref": "#/definitions/ComponentMappingDefinition"
$parameters:
type: object
additionalProperties: true
required:
- type
- retriever
- components_mapping
DynamicDeclarativeStream:
type: object
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
properties:
type:
type: string
enum: [DynamicDeclarativeStream]
stream_template:
title: Stream Template
description: Reference to the stream template.
"$ref": "#/definitions/DeclarativeStream"
components_resolver:
title: Components Resolver
description: Component resolve and populates stream templates with components values.
"$ref": "#/definitions/HttpComponentsResolver"
required:
- type
- stream_template
- components_resolver
interpolation:
variables:
- title: config
Expand Down
55 changes: 53 additions & 2 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import ConnectionDefinition
Expand Down Expand Up @@ -120,7 +121,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
)
stream_configs = self._stream_configs(self._source_config)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)

source_streams = [
self._constructor.create_component(
Expand Down Expand Up @@ -234,7 +238,8 @@ def _validate_source(self) -> None:
)

streams = self._source_config.get("streams")
if not streams:
dynamic_streams = self._source_config.get("dynamic_streams")
if not (streams or dynamic_streams):
raise ValidationError(
f"A valid manifest should have at least one stream defined. Got {streams}"
)
Expand Down Expand Up @@ -303,5 +308,51 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
s["type"] = "DeclarativeStream"
return stream_configs

def _dynamic_stream_configs(
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
) -> List[Dict[str, Any]]:
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
dynamic_stream_configs: List[Dict[str, Any]] = []

for dynamic_definition in dynamic_stream_definitions:
components_resolver_config = dynamic_definition["components_resolver"]

if not components_resolver_config:
raise ValueError(
f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
)

resolver_type = components_resolver_config.get("type")
if not resolver_type:
raise ValueError(
f"Missing 'type' in components resolver configuration: {components_resolver_config}"
)

if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
raise ValueError(
f"Invalid components resolver type '{resolver_type}'. "
f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
)

if "retriever" in components_resolver_config:
components_resolver_config["retriever"]["requester"]["use_cache"] = True

# Create a resolver for dynamic components based on type
components_resolver = self._constructor.create_component(
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
)

stream_template_config = dynamic_definition["stream_template"]

for dynamic_stream in components_resolver.resolve_components(
stream_template_config=stream_template_config
):
if "type" not in dynamic_stream:
dynamic_stream["type"] = "DeclarativeStream"

dynamic_stream_configs.append(dynamic_stream)

return dynamic_stream_configs

def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
self.logger.debug("declarative source created from manifest", extra=extra_args)
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,37 @@ class WaitUntilTimeFromHeader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ComponentMappingDefinition(BaseModel):
type: Literal["ComponentMappingDefinition"]
field_path: List[str] = Field(
...,
description="A list of potentially nested fields indicating the full path where value will be added or updated.",
examples=[
["data"],
["data", "records"],
["data", "{{ parameters.name }}"],
["data", "*", "record"],
],
title="Field Path",
)
value: str = Field(
...,
description="The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.",
examples=[
"{{ components_values['updates'] }}",
"{{ components_values['MetaData']['LastUpdatedTime'] }}",
"{{ config['segment_id'] }}",
],
title="Value",
)
value_type: Optional[ValueType] = Field(
None,
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
title="Value Type",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class AddedFieldDefinition(BaseModel):
type: Literal["AddedFieldDefinition"]
path: List[str] = Field(
Expand Down Expand Up @@ -1455,13 +1486,40 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DeclarativeSource(BaseModel):
class DeclarativeSource1(BaseModel):
class Config:
extra = Extra.forbid

type: Literal["DeclarativeSource"]
check: CheckStream
streams: List[DeclarativeStream]
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
version: str = Field(
...,
description="The version of the Airbyte CDK used to build and test the source.",
)
schemas: Optional[Schemas] = None
definitions: Optional[Dict[str, Any]] = None
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
)
description: Optional[str] = Field(
None,
description="A description of the connector. It will be presented on the Source documentation page.",
)


class DeclarativeSource2(BaseModel):
class Config:
extra = Extra.forbid

type: Literal["DeclarativeSource"]
check: CheckStream
streams: Optional[List[DeclarativeStream]] = None
dynamic_streams: List[DynamicDeclarativeStream]
version: str = Field(
...,
description="The version of the Airbyte CDK used to build and test the source.",
Expand All @@ -1480,6 +1538,17 @@ class Config:
)


class DeclarativeSource(BaseModel):
class Config:
extra = Extra.forbid

__root__: Union[DeclarativeSource1, DeclarativeSource2] = Field(
...,
description="An API source that extracts data according to its declarative components.",
title="DeclarativeSource",
)


class SelectiveAuthenticator(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1883,8 +1952,32 @@ class SubstreamPartitionRouter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HttpComponentsResolver(BaseModel):
type: Literal["HttpComponentsResolver"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
components_mapping: List[ComponentMappingDefinition]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DynamicDeclarativeStream(BaseModel):
type: Literal["DynamicDeclarativeStream"]
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
components_resolver: HttpComponentsResolver = Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
)


CompositeErrorHandler.update_forward_refs()
DeclarativeSource.update_forward_refs()
DeclarativeSource1.update_forward_refs()
DeclarativeSource2.update_forward_refs()
SelectiveAuthenticator.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
# DeclarativeStream
"DeclarativeStream.retriever": "SimpleRetriever",
"DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
# DynamicDeclarativeStream
"DynamicDeclarativeStream.stream_template": "DeclarativeStream",
"DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver",
# HttpComponentsResolver
"HttpComponentsResolver.retriever": "SimpleRetriever",
"HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
# DefaultErrorHandler
"DefaultErrorHandler.response_filters": "HttpResponseFilter",
# DefaultPaginator
Expand Down
Loading

0 comments on commit a07b04a

Please sign in to comment.