-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add transformer for AWS DMS to CrateDB SQL
- PostgreSQL full-load and CDC
- Loading branch information
Showing
7 changed files
with
557 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,6 +101,7 @@ dynamic = [ | |
"version", | ||
] | ||
dependencies = [ | ||
"attrs<24", | ||
"simplejson<4", | ||
"toolz<0.13", | ||
] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
class MessageFormatError(Exception): | ||
pass | ||
|
||
|
||
class UnknownOperationError(Exception): | ||
def __init__(self, *args, operation=None, record=None, **kwargs): | ||
self.operation = operation | ||
self.record = record | ||
super().__init__(*args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import json | ||
import typing as t | ||
from enum import StrEnum, auto | ||
|
||
from attrs import define | ||
|
||
|
||
@define(frozen=True) | ||
class TableAddress: | ||
schema: str | ||
table: str | ||
|
||
@property | ||
def fqn(self): | ||
if not self.schema: | ||
raise ValueError("Unable to compute a full-qualified table name without schema name") | ||
return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}" | ||
|
||
@staticmethod | ||
def quote_identifier(name: str) -> str: | ||
""" | ||
Poor man's table quoting. | ||
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. | ||
""" | ||
if name and '"' not in name: | ||
name = f'"{name}"' | ||
return name | ||
|
||
|
||
class ColumnType(StrEnum): | ||
MAP = auto() | ||
|
||
|
||
@define(frozen=True) | ||
class ColumnTypeMap: | ||
column: str | ||
type: ColumnType | ||
|
||
|
||
class PrimaryKeyStore(dict): | ||
pass | ||
|
||
|
||
class ColumnTypeMapStore(dict): | ||
def add(self, table: TableAddress, column: str, type_: ColumnType): | ||
self.setdefault(table, {}) | ||
self[table][column] = type_ | ||
return self | ||
|
||
def to_dict(self) -> dict: | ||
data = {} | ||
for key, value in self.items(): | ||
tbl = f"{key.schema}:{key.table}" | ||
for column, type_ in value.items(): | ||
key = f"{tbl}:{column}" | ||
data[key] = type_.value | ||
return data | ||
|
||
def to_json(self) -> str: | ||
return json.dumps(self.to_dict()) | ||
|
||
@classmethod | ||
def from_dict(cls, data: dict) -> t.Union["ColumnTypeMapStore", None]: | ||
if not data: | ||
return None | ||
ctms = cls() | ||
for key, type_ in data.items(): | ||
schema, table, column = key.split(":") | ||
ctms.add(TableAddress(schema=schema, table=table), column=column, type_=ColumnType(type_)) | ||
return ctms | ||
|
||
@classmethod | ||
def from_json(cls, payload: str) -> t.Union["ColumnTypeMapStore", None]: | ||
if not payload: | ||
return None | ||
return cls.from_dict(json.loads(payload)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# Copyright (c) 2023-2024, The Kotori Developers and contributors. | ||
# Distributed under the terms of the LGPLv3 license, see LICENSE. | ||
|
||
# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction | ||
|
||
import logging | ||
import typing as t | ||
|
||
import simplejson as json | ||
|
||
from commons_codec.exception import MessageFormatError, UnknownOperationError | ||
from commons_codec.model import ColumnType, ColumnTypeMapStore, PrimaryKeyStore, TableAddress | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DMSTranslatorCrateDBRecord: | ||
""" | ||
Translate DMS full-load and cdc events into CrateDB SQL statements. | ||
""" | ||
|
||
# Define name of the column where CDC's record data will get materialized into. | ||
DATA_COLUMN = "data" | ||
|
||
def __init__( | ||
self, | ||
record: t.Dict[str, t.Any], | ||
container: "DMSTranslatorCrateDB", | ||
): | ||
self.record = record | ||
self.container = container | ||
|
||
self.metadata: t.Dict[str, t.Any] = self.record.get("metadata", {}) | ||
self.control: t.Dict[str, t.Any] = self.record.get("control", {}) | ||
self.data: t.Dict[str, t.Any] = self.record.get("data", {}) | ||
|
||
self.operation: t.Union[str, None] = self.metadata.get("operation") | ||
|
||
self.schema: t.Union[str, None] = self.metadata.get("schema-name") | ||
self.table: t.Union[str, None] = self.metadata.get("table-name") | ||
|
||
# Tweaks. | ||
|
||
# Divert special tables like `awsdms_apply_exceptions` to a dedicated schema. | ||
# Relevant CDC events are delivered with an empty table name, so some valid | ||
# name needs to be selected anyway. The outcome of this is that AWS DMS special | ||
# tables will be created within the sink database, like `dms.awsdms_apply_exceptions`. | ||
if self.table and self.table.startswith("awsdms_"): | ||
self.schema = "dms" | ||
|
||
# Sanity checks. | ||
if not self.metadata or not self.operation: | ||
message = "Record not in DMS format: metadata and/or operation is missing" | ||
logger.error(message) | ||
raise MessageFormatError(message) | ||
|
||
if not self.schema or not self.table: | ||
message = f"Schema or table name missing or empty: schema={self.schema}, table={self.table}" | ||
logger.error(message) | ||
raise MessageFormatError(message) | ||
|
||
self.address: TableAddress = TableAddress(schema=self.schema, table=self.table) | ||
|
||
self.container.primary_keys.setdefault(self.address, []) | ||
self.container.column_types.setdefault(self.address, {}) | ||
self.primary_keys: t.List[str] = self.container.primary_keys[self.address] | ||
self.column_types: t.Dict[str, ColumnType] = self.container.column_types[self.address] | ||
|
||
def to_sql(self) -> str: | ||
if self.operation == "create-table": | ||
pks = self.control.get("table-def", {}).get("primary-key") | ||
if pks: | ||
self.primary_keys += pks | ||
# TODO: What about dropping tables first? | ||
return f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" | ||
|
||
elif self.operation in ["load", "insert"]: | ||
values_clause = self.record_to_values() | ||
sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" | ||
|
||
elif self.operation == "update": | ||
values_clause = self.record_to_values() | ||
where_clause = self.keys_to_where() | ||
sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" | ||
|
||
elif self.operation == "delete": | ||
where_clause = self.keys_to_where() | ||
sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};" | ||
|
||
else: | ||
message = f"Unknown CDC event operation: {self.operation}" | ||
logger.warning(message) | ||
raise UnknownOperationError(message, operation=self.operation, record=self.record) | ||
|
||
return sql | ||
|
||
def record_to_values(self) -> str: | ||
""" | ||
Apply type translations to record, and serialize to JSON. | ||
IN (top-level stripped): | ||
"data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"} | ||
OUT: | ||
{"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} | ||
""" | ||
for column_name, column_type in self.column_types.items(): | ||
if column_name in self.data: | ||
value = self.data[column_name] | ||
# DMS marshals JSON|JSONB to CLOB, aka. string. Apply a countermeasure. | ||
if column_type is ColumnType.MAP and isinstance(value, str): | ||
value = json.loads(value) | ||
self.data[column_name] = value | ||
return json.dumps(self.data) | ||
|
||
def keys_to_where(self) -> str: | ||
""" | ||
Produce an SQL WHERE clause based on primary key definition and current record's data. | ||
""" | ||
if not self.primary_keys: | ||
raise ValueError("Unable to invoke DML operation without primary key information") | ||
constraints: t.List[str] = [] | ||
for key_name in self.primary_keys: | ||
key_value = self.data.get(key_name) | ||
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account? | ||
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'" | ||
constraints.append(constraint) | ||
return " AND ".join(constraints) | ||
|
||
|
||
class DMSTranslatorCrateDB: | ||
""" | ||
Translate AWS DMS event messages into CrateDB SQL statements that materialize them again. | ||
The SQL DDL schema for CrateDB: | ||
CREATE TABLE <tablename> (data OBJECT(DYNAMIC)); | ||
Blueprint: | ||
https://www.cockroachlabs.com/docs/stable/aws-dms | ||
""" | ||
|
||
def __init__( | ||
self, | ||
primary_keys: PrimaryKeyStore = None, | ||
column_types: ColumnTypeMapStore = None, | ||
): | ||
self.primary_keys = primary_keys or PrimaryKeyStore() | ||
self.column_types = column_types or ColumnTypeMapStore() | ||
|
||
def to_sql(self, record: t.Dict[str, t.Any]) -> str: | ||
""" | ||
Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record. | ||
""" | ||
record_decoded = DMSTranslatorCrateDBRecord(record=record, container=self) | ||
return record_decoded.to_sql() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import pytest | ||
from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress | ||
|
||
|
||
def test_table_address_success(): | ||
ta = TableAddress(schema="foo", table="bar") | ||
assert ta.fqn == '"foo"."bar"' | ||
|
||
|
||
def test_table_address_failure(): | ||
ta = TableAddress(schema=None, table="bar") | ||
with pytest.raises(ValueError) as ex: | ||
_ = ta.fqn | ||
assert ex.match("adcdc") | ||
|
||
|
||
def test_column_type_map_store_serialize(): | ||
column_types = ColumnTypeMapStore().add( | ||
table=TableAddress(schema="public", table="foo"), | ||
column="attributes", | ||
type_=ColumnType.MAP, | ||
) | ||
assert column_types.to_dict() == {"public:foo:attributes": "map"} | ||
assert column_types.to_json() == '{"public:foo:attributes": "map"}' | ||
|
||
|
||
def test_column_type_map_store_unserialize_data(): | ||
assert ColumnTypeMapStore.from_json('{"public:foo:attributes": "map"}') == ColumnTypeMapStore( | ||
{TableAddress(schema="public", table="foo"): {"attributes": ColumnType.MAP}} | ||
) | ||
|
||
|
||
def test_column_type_map_store_unserialize_empty(): | ||
assert ColumnTypeMapStore.from_json("") is None | ||
assert ColumnTypeMapStore.from_json(None) is None | ||
assert ColumnTypeMapStore.from_dict(None) is None |
Oops, something went wrong.