Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add DateTimeNormalizerFields transformation #187

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
31 changes: 31 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,35 @@ definitions:
$parameters:
type: object
additionalProperties: true
DateTimeNormalizer:
title: DateTime Normalizer
description: Transformation which normalize datetime fields in record to RFC3339. The path of the datetime fields can be nested.
type: object
required:
- type
- field_pointers
properties:
type:
type: string
enum: [DateTimeNormalizer]
field_pointers:
title: Field Paths
description: Array of paths defining the field to normalize.
type: array
items:
items:
type: string
examples:
- ["created_at"]
- [["content", "created_at"]]
datetime_format:
title: DateTime Format
description: Expected datetime format
type: string
examples:
- "%Y-%m-%d %H:%M:%S %Z"
- "%m/%d/%Y %I:%M %p"
- "%A, %B %d, %Y %I:%M:%S %p %Z"
JwtAuthenticator:
title: JWT Authenticator
description: Authenticator for requests using JWT authentication flow.
Expand Down Expand Up @@ -1237,6 +1266,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/DateTimeNormalizer"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
Expand Down Expand Up @@ -1781,6 +1811,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/DateTimeNormalizer"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,26 @@ class Config:
type: Optional[Literal["LegacyToPerPartitionStateMigration"]] = None


class DateTimeNormalizer(BaseModel):
type: Literal["DateTimeNormalizer"]
field_pointers: List[List[str]] = Field(
...,
description="Array of paths defining the field to normalize.",
examples=[["created_at"], [["content", "created_at"]]],
title="Field Paths",
)
datetime_format: Optional[str] = Field(
None,
description="Expected datetime format",
examples=[
"%Y-%m-%d %H:%M:%S %Z",
"%m/%d/%Y %I:%M %p",
"%A, %B %d, %Y %I:%M:%S %p %Z",
],
title="DateTime Format",
)


class Algorithm(Enum):
HS256 = "HS256"
HS384 = "HS384"
Expand Down Expand Up @@ -1208,6 +1228,8 @@ class ComponentMappingDefinition(BaseModel):
"{{ components_values['updates'] }}",
"{{ components_values['MetaData']['LastUpdatedTime'] }}",
"{{ config['segment_id'] }}",
"{{ stream_slice['parent_id'] }}",
"{{ stream_slice['extra_fields']['name'] }}",
],
title="Value",
)
Expand Down Expand Up @@ -1674,6 +1696,7 @@ class Config:
Union[
AddFields,
CustomTransformation,
DateTimeNormalizer,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
Expand Down Expand Up @@ -1848,6 +1871,7 @@ class DynamicSchemaLoader(BaseModel):
Union[
AddFields,
CustomTransformation,
DateTimeNormalizer,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DatetimeBasedCursor as DatetimeBasedCursorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DateTimeNormalizer as DateTimeNormalizerModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
Expand Down Expand Up @@ -396,6 +399,7 @@
RemoveFields,
)
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.datetime_normalizer import DateTimeNormalizer
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
Expand Down Expand Up @@ -473,6 +477,7 @@ def _init_mappings(self) -> None:
CustomPartitionRouterModel: self.create_custom_component,
CustomTransformationModel: self.create_custom_component,
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
DateTimeNormalizerModel: self.create_datetime_normalizer,
DeclarativeStreamModel: self.create_declarative_stream,
DefaultErrorHandlerModel: self.create_default_error_handler,
DefaultPaginatorModel: self.create_default_paginator,
Expand Down Expand Up @@ -1222,6 +1227,16 @@ def create_datetime_based_cursor(
parameters=model.parameters or {},
)

@staticmethod
def create_datetime_normalizer(
model: DateTimeNormalizerModel, config: Config, **kwargs: Any
) -> DateTimeNormalizer:
return DateTimeNormalizer(
field_pointers=model.field_pointers,
datetime_format=model.datetime_format,
parameters={},
)

def create_declarative_stream(
self, model: DeclarativeStreamModel, config: Config, **kwargs: Any
) -> DeclarativeStream:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional

import dateparser
import dpath

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState

logger = logging.getLogger("airbyte")


@dataclass
class DateTimeNormalizer(RecordTransformation):
"""
A transformation which transform specified datetime fields into RFC3339 format.

Usage syntax:

```yaml
my_stream:
<other parameters..>
transformations:
- type: DateTimeNormalizer
field_pointers:
- ["path", "to", "field1"]
- ["path2"]
```

Attributes:
field_pointers (List[FieldPointer]): pointers to the fields that should be removed
"""

field_pointers: List[FieldPointer]
parameters: InitVar[Mapping[str, Any]]
datetime_format: Optional[str] = None

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
"""
:param record: The record to be transformed
:return: the input record with the requested fields removed
"""
for pointer in self.field_pointers:
try:
current_date_time_value = dpath.get(record, pointer) # type: ignore [arg-type]
parsed_datetime = self.parse_datetime_to_rfc3339(current_date_time_value) # type: ignore [arg-type]
dpath.set(record, pointer, parsed_datetime)
except dpath.exceptions.PathNotFound:
# if the (potentially nested) property does not exist, silently skip
pass

def parse_datetime_to_rfc3339(self, datetime_value: str) -> str:
value = dateparser.parse(
datetime_value,
# date_format will be used as the main source of format; will remove
date_formats=[self.datetime_format] if self.datetime_format else None,
settings={"TIMEZONE": "UTC", "RETURN_AS_TIMEZONE_AWARE": True},
)
if value:
return value.isoformat()
else:
logger.warning("Could not parse datetime value %s", datetime_value)
return datetime_value
Loading
Loading