Skip to content

Commit

Permalink
Add transformer for AWS DMS to CrateDB SQL
Browse files Browse the repository at this point in the history
- PostgreSQL full-load and CDC
  • Loading branch information
amotl committed Aug 4, 2024
1 parent 4c4ac2c commit 4879136
Show file tree
Hide file tree
Showing 7 changed files with 564 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Added transformer for AWS DMS to CrateDB SQL

## 2024/07/19 v0.0.2
- Added transformer for MongoDB CDC to CrateDB SQL conversion
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ dynamic = [
"version",
]
dependencies = [
"attrs<24",
"backports-strenum<1.3; python_version<'3.11'",
"simplejson<4",
"toolz<0.13",
]
Expand Down
9 changes: 9 additions & 0 deletions src/commons_codec/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class MessageFormatError(Exception):
pass


class UnknownOperationError(Exception):
def __init__(self, *args, operation=None, record=None, **kwargs):
self.operation = operation
self.record = record
super().__init__(*args, **kwargs)
83 changes: 83 additions & 0 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import json
import sys
import typing as t
from enum import auto

if sys.version_info >= (3, 11):
from enum import StrEnum
else:
from backports.strenum import StrEnum

from attrs import define


@define(frozen=True)
class TableAddress:
schema: str
table: str

@property
def fqn(self):
if not self.schema:
raise ValueError("Unable to compute a full-qualified table name without schema name")
return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}"

@staticmethod
def quote_identifier(name: str) -> str:
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if name and '"' not in name:
name = f'"{name}"'
return name


class ColumnType(StrEnum):
MAP = auto()


@define(frozen=True)
class ColumnTypeMap:
column: str
type: ColumnType


class PrimaryKeyStore(dict):
pass


class ColumnTypeMapStore(dict):
def add(self, table: TableAddress, column: str, type_: ColumnType):
self.setdefault(table, {})
self[table][column] = type_
return self

def to_dict(self) -> dict:
data = {}
for key, value in self.items():
tbl = f"{key.schema}:{key.table}"
for column, type_ in value.items():
key = f"{tbl}:{column}"
data[key] = type_.value
return data

def to_json(self) -> str:
return json.dumps(self.to_dict())

@classmethod
def from_dict(cls, data: dict) -> t.Union["ColumnTypeMapStore", None]:
if not data:
return None
ctms = cls()
for key, type_ in data.items():
schema, table, column = key.split(":")
ctms.add(TableAddress(schema=schema, table=table), column=column, type_=ColumnType(type_))
return ctms

@classmethod
def from_json(cls, payload: str) -> t.Union["ColumnTypeMapStore", None]:
if not payload:
return None
return cls.from_dict(json.loads(payload))
155 changes: 155 additions & 0 deletions src/commons_codec/transform/aws_dms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Distributed under the terms of the LGPLv3 license, see LICENSE.

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction

import logging
import typing as t

import simplejson as json

from commons_codec.exception import MessageFormatError, UnknownOperationError
from commons_codec.model import ColumnType, ColumnTypeMapStore, PrimaryKeyStore, TableAddress

logger = logging.getLogger(__name__)


class DMSTranslatorCrateDBRecord:
"""
Translate DMS full-load and cdc events into CrateDB SQL statements.
"""

# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(
self,
record: t.Dict[str, t.Any],
container: "DMSTranslatorCrateDB",
):
self.record = record
self.container = container

self.metadata: t.Dict[str, t.Any] = self.record.get("metadata", {})
self.control: t.Dict[str, t.Any] = self.record.get("control", {})
self.data: t.Dict[str, t.Any] = self.record.get("data", {})

self.operation: t.Union[str, None] = self.metadata.get("operation")

self.schema: t.Union[str, None] = self.metadata.get("schema-name")
self.table: t.Union[str, None] = self.metadata.get("table-name")

# Tweaks.

# Divert special tables like `awsdms_apply_exceptions` to a dedicated schema.
# Relevant CDC events are delivered with an empty table name, so some valid
# name needs to be selected anyway. The outcome of this is that AWS DMS special
# tables will be created within the sink database, like `dms.awsdms_apply_exceptions`.
if self.table and self.table.startswith("awsdms_"):
self.schema = "dms"

# Sanity checks.
if not self.metadata or not self.operation:
message = "Record not in DMS format: metadata and/or operation is missing"
logger.error(message)
raise MessageFormatError(message)

if not self.schema or not self.table:
message = f"Schema or table name missing or empty: schema={self.schema}, table={self.table}"
logger.error(message)
raise MessageFormatError(message)

self.address: TableAddress = TableAddress(schema=self.schema, table=self.table)

self.container.primary_keys.setdefault(self.address, [])
self.container.column_types.setdefault(self.address, {})
self.primary_keys: t.List[str] = self.container.primary_keys[self.address]
self.column_types: t.Dict[str, ColumnType] = self.container.column_types[self.address]

def to_sql(self) -> str:
if self.operation == "create-table":
pks = self.control.get("table-def", {}).get("primary-key")
if pks:
self.primary_keys += pks
# TODO: What about dropping tables first?
return f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

elif self.operation in ["load", "insert"]:
values_clause = self.record_to_values()
sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"

elif self.operation == "update":
values_clause = self.record_to_values()
where_clause = self.keys_to_where()
sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};"

elif self.operation == "delete":
where_clause = self.keys_to_where()
sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};"

else:
message = f"Unknown CDC event operation: {self.operation}"
logger.warning(message)
raise UnknownOperationError(message, operation=self.operation, record=self.record)

return sql

def record_to_values(self) -> str:
"""
Apply type translations to record, and serialize to JSON.
IN (top-level stripped):
"data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"}
OUT:
{"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"}
"""
for column_name, column_type in self.column_types.items():
if column_name in self.data:
value = self.data[column_name]
# DMS marshals JSON|JSONB to CLOB, aka. string. Apply a countermeasure.
if column_type is ColumnType.MAP and isinstance(value, str):
value = json.loads(value)
self.data[column_name] = value
return json.dumps(self.data)

def keys_to_where(self) -> str:
"""
Produce an SQL WHERE clause based on primary key definition and current record's data.
"""
if not self.primary_keys:
raise ValueError("Unable to invoke DML operation without primary key information")
constraints: t.List[str] = []
for key_name in self.primary_keys:
key_value = self.data.get(key_name)
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account?
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
constraints.append(constraint)
return " AND ".join(constraints)


class DMSTranslatorCrateDB:
"""
Translate AWS DMS event messages into CrateDB SQL statements that materialize them again.
The SQL DDL schema for CrateDB:
CREATE TABLE <tablename> (data OBJECT(DYNAMIC));
Blueprint:
https://www.cockroachlabs.com/docs/stable/aws-dms
"""

def __init__(
self,
primary_keys: PrimaryKeyStore = None,
column_types: ColumnTypeMapStore = None,
):
self.primary_keys = primary_keys or PrimaryKeyStore()
self.column_types = column_types or ColumnTypeMapStore()

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record.
"""
record_decoded = DMSTranslatorCrateDBRecord(record=record, container=self)
return record_decoded.to_sql()
36 changes: 36 additions & 0 deletions tests/test_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest
from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress


def test_table_address_success():
ta = TableAddress(schema="foo", table="bar")
assert ta.fqn == '"foo"."bar"'


def test_table_address_failure():
ta = TableAddress(schema=None, table="bar")
with pytest.raises(ValueError) as ex:
_ = ta.fqn
assert ex.match("adcdc")


def test_column_type_map_store_serialize():
column_types = ColumnTypeMapStore().add(
table=TableAddress(schema="public", table="foo"),
column="attributes",
type_=ColumnType.MAP,
)
assert column_types.to_dict() == {"public:foo:attributes": "map"}
assert column_types.to_json() == '{"public:foo:attributes": "map"}'


def test_column_type_map_store_unserialize_data():
assert ColumnTypeMapStore.from_json('{"public:foo:attributes": "map"}') == ColumnTypeMapStore(
{TableAddress(schema="public", table="foo"): {"attributes": ColumnType.MAP}}
)


def test_column_type_map_store_unserialize_empty():
assert ColumnTypeMapStore.from_json("") is None
assert ColumnTypeMapStore.from_json(None) is None
assert ColumnTypeMapStore.from_dict(None) is None
Loading

0 comments on commit 4879136

Please sign in to comment.