diff --git a/CHANGES.md b/CHANGES.md index 4c07594..6b35309 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Added transformer for AWS DMS to CrateDB SQL ## 2024/07/19 v0.0.2 - Added transformer for MongoDB CDC to CrateDB SQL conversion diff --git a/pyproject.toml b/pyproject.toml index 1db7ee3..446e52b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,8 @@ dynamic = [ "version", ] dependencies = [ + "attrs<24", + "backports-strenum<1.3; python_version<'3.11'", "simplejson<4", "toolz<0.13", ] diff --git a/src/commons_codec/exception.py b/src/commons_codec/exception.py new file mode 100644 index 0000000..8140ef9 --- /dev/null +++ b/src/commons_codec/exception.py @@ -0,0 +1,9 @@ +class MessageFormatError(Exception): + pass + + +class UnknownOperationError(Exception): + def __init__(self, *args, operation=None, record=None, **kwargs): + self.operation = operation + self.record = record + super().__init__(*args, **kwargs) diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py new file mode 100644 index 0000000..c86f422 --- /dev/null +++ b/src/commons_codec/model.py @@ -0,0 +1,83 @@ +import json +import sys +import typing as t +from enum import auto + +if sys.version_info >= (3, 11): + from enum import StrEnum +else: + from backports.strenum import StrEnum + +from attrs import define + + +@define(frozen=True) +class TableAddress: + schema: str + table: str + + @property + def fqn(self): + if not self.schema: + raise ValueError("Unable to compute a full-qualified table name without schema name") + return f"{self.quote_identifier(self.schema)}.{self.quote_identifier(self.table)}" + + @staticmethod + def quote_identifier(name: str) -> str: + """ + Poor man's table quoting. + + TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. + """ + if name and '"' not in name: + name = f'"{name}"' + return name + + +class ColumnType(StrEnum): + MAP = auto() + + +@define(frozen=True) +class ColumnTypeMap: + column: str + type: ColumnType + + +class PrimaryKeyStore(dict): + pass + + +class ColumnTypeMapStore(dict): + def add(self, table: TableAddress, column: str, type_: ColumnType): + self.setdefault(table, {}) + self[table][column] = type_ + return self + + def to_dict(self) -> dict: + data = {} + for key, value in self.items(): + tbl = f"{key.schema}:{key.table}" + for column, type_ in value.items(): + key = f"{tbl}:{column}" + data[key] = type_.value + return data + + def to_json(self) -> str: + return json.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data: dict) -> t.Union["ColumnTypeMapStore", None]: + if not data: + return None + ctms = cls() + for key, type_ in data.items(): + schema, table, column = key.split(":") + ctms.add(TableAddress(schema=schema, table=table), column=column, type_=ColumnType(type_)) + return ctms + + @classmethod + def from_json(cls, payload: str) -> t.Union["ColumnTypeMapStore", None]: + if not payload: + return None + return cls.from_dict(json.loads(payload)) diff --git a/src/commons_codec/transform/aws_dms.py b/src/commons_codec/transform/aws_dms.py new file mode 100644 index 0000000..05d63f0 --- /dev/null +++ b/src/commons_codec/transform/aws_dms.py @@ -0,0 +1,155 @@ +# Copyright (c) 2023-2024, The Kotori Developers and contributors. +# Distributed under the terms of the LGPLv3 license, see LICENSE. + +# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction + +import logging +import typing as t + +import simplejson as json + +from commons_codec.exception import MessageFormatError, UnknownOperationError +from commons_codec.model import ColumnType, ColumnTypeMapStore, PrimaryKeyStore, TableAddress + +logger = logging.getLogger(__name__) + + +class DMSTranslatorCrateDBRecord: + """ + Translate DMS full-load and cdc events into CrateDB SQL statements. + """ + + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__( + self, + record: t.Dict[str, t.Any], + container: "DMSTranslatorCrateDB", + ): + self.record = record + self.container = container + + self.metadata: t.Dict[str, t.Any] = self.record.get("metadata", {}) + self.control: t.Dict[str, t.Any] = self.record.get("control", {}) + self.data: t.Dict[str, t.Any] = self.record.get("data", {}) + + self.operation: t.Union[str, None] = self.metadata.get("operation") + + self.schema: t.Union[str, None] = self.metadata.get("schema-name") + self.table: t.Union[str, None] = self.metadata.get("table-name") + + # Tweaks. + + # Divert special tables like `awsdms_apply_exceptions` to a dedicated schema. + # Relevant CDC events are delivered with an empty table name, so some valid + # name needs to be selected anyway. The outcome of this is that AWS DMS special + # tables will be created within the sink database, like `dms.awsdms_apply_exceptions`. + if self.table and self.table.startswith("awsdms_"): + self.schema = "dms" + + # Sanity checks. + if not self.metadata or not self.operation: + message = "Record not in DMS format: metadata and/or operation is missing" + logger.error(message) + raise MessageFormatError(message) + + if not self.schema or not self.table: + message = f"Schema or table name missing or empty: schema={self.schema}, table={self.table}" + logger.error(message) + raise MessageFormatError(message) + + self.address: TableAddress = TableAddress(schema=self.schema, table=self.table) + + self.container.primary_keys.setdefault(self.address, []) + self.container.column_types.setdefault(self.address, {}) + self.primary_keys: t.List[str] = self.container.primary_keys[self.address] + self.column_types: t.Dict[str, ColumnType] = self.container.column_types[self.address] + + def to_sql(self) -> str: + if self.operation == "create-table": + pks = self.control.get("table-def", {}).get("primary-key") + if pks: + self.primary_keys += pks + # TODO: What about dropping tables first? + return f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + + elif self.operation in ["load", "insert"]: + values_clause = self.record_to_values() + sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" + + elif self.operation == "update": + values_clause = self.record_to_values() + where_clause = self.keys_to_where() + sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" + + elif self.operation == "delete": + where_clause = self.keys_to_where() + sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};" + + else: + message = f"Unknown CDC event operation: {self.operation}" + logger.warning(message) + raise UnknownOperationError(message, operation=self.operation, record=self.record) + + return sql + + def record_to_values(self) -> str: + """ + Apply type translations to record, and serialize to JSON. + + IN (top-level stripped): + "data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"} + + OUT: + {"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} + """ + for column_name, column_type in self.column_types.items(): + if column_name in self.data: + value = self.data[column_name] + # DMS marshals JSON|JSONB to CLOB, aka. string. Apply a countermeasure. + if column_type is ColumnType.MAP and isinstance(value, str): + value = json.loads(value) + self.data[column_name] = value + return json.dumps(self.data) + + def keys_to_where(self) -> str: + """ + Produce an SQL WHERE clause based on primary key definition and current record's data. + """ + if not self.primary_keys: + raise ValueError("Unable to invoke DML operation without primary key information") + constraints: t.List[str] = [] + for key_name in self.primary_keys: + key_value = self.data.get(key_name) + # FIXME: Does the quoting of the value on the right hand side need to take the data type into account? + constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'" + constraints.append(constraint) + return " AND ".join(constraints) + + +class DMSTranslatorCrateDB: + """ + Translate AWS DMS event messages into CrateDB SQL statements that materialize them again. + + The SQL DDL schema for CrateDB: + CREATE TABLE (data OBJECT(DYNAMIC)); + + Blueprint: + https://www.cockroachlabs.com/docs/stable/aws-dms + """ + + def __init__( + self, + primary_keys: PrimaryKeyStore = None, + column_types: ColumnTypeMapStore = None, + ): + self.primary_keys = primary_keys or PrimaryKeyStore() + self.column_types = column_types or ColumnTypeMapStore() + + def to_sql(self, record: t.Dict[str, t.Any]) -> str: + """ + Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record. + """ + record_decoded = DMSTranslatorCrateDBRecord(record=record, container=self) + return record_decoded.to_sql() diff --git a/tests/test_model.py b/tests/test_model.py new file mode 100644 index 0000000..7c09538 --- /dev/null +++ b/tests/test_model.py @@ -0,0 +1,36 @@ +import pytest +from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress + + +def test_table_address_success(): + ta = TableAddress(schema="foo", table="bar") + assert ta.fqn == '"foo"."bar"' + + +def test_table_address_failure(): + ta = TableAddress(schema=None, table="bar") + with pytest.raises(ValueError) as ex: + _ = ta.fqn + assert ex.match("adcdc") + + +def test_column_type_map_store_serialize(): + column_types = ColumnTypeMapStore().add( + table=TableAddress(schema="public", table="foo"), + column="attributes", + type_=ColumnType.MAP, + ) + assert column_types.to_dict() == {"public:foo:attributes": "map"} + assert column_types.to_json() == '{"public:foo:attributes": "map"}' + + +def test_column_type_map_store_unserialize_data(): + assert ColumnTypeMapStore.from_json('{"public:foo:attributes": "map"}') == ColumnTypeMapStore( + {TableAddress(schema="public", table="foo"): {"attributes": ColumnType.MAP}} + ) + + +def test_column_type_map_store_unserialize_empty(): + assert ColumnTypeMapStore.from_json("") is None + assert ColumnTypeMapStore.from_json(None) is None + assert ColumnTypeMapStore.from_dict(None) is None diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py new file mode 100644 index 0000000..3fce7a9 --- /dev/null +++ b/tests/transform/test_aws_dms.py @@ -0,0 +1,278 @@ +# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction +import json + +import pytest +from commons_codec.exception import MessageFormatError, UnknownOperationError +from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress +from commons_codec.transform.aws_dms import DMSTranslatorCrateDB + +RECORD_INSERT = {"age": 31, "attributes": {"baz": "qux"}, "id": 46, "name": "Jane"} +RECORD_UPDATE = {"age": 33, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} + +MSG_UNKNOWN_SHAPE = { + "unknown": "foo:bar", +} +MSG_SCHEMA_TABLE_MISSING = { + "control": {}, + "metadata": { + "operation": "insert", + }, +} +MSG_UNKNOWN_OPERATION = { + "control": {}, + "metadata": { + "operation": "FOOBAR", + "schema-name": "public", + "table-name": "foo", + }, +} + +MSG_CONTROL_DROP_TABLE = { + "control": {}, + "metadata": { + "operation": "drop-table", + "partition-key-type": "task-id", + "partition-key-value": "serv-res-id-1722195358878-yhru", + "record-type": "control", + "schema-name": "public", + "table-name": "foo", + "timestamp": "2024-07-29T00:30:47.258815Z", + }, +} + +MSG_CONTROL_CREATE_TABLE = { + "control": { + "table-def": { + "columns": { + "age": {"nullable": True, "type": "INT32"}, + "attributes": {"nullable": True, "type": "STRING"}, + "id": {"nullable": False, "type": "INT32"}, + "name": {"nullable": True, "type": "STRING"}, + }, + "primary-key": ["id"], + } + }, + "metadata": { + "operation": "create-table", + "partition-key-type": "task-id", + "partition-key-value": "serv-res-id-1722195358878-yhru", + "record-type": "control", + "schema-name": "public", + "table-name": "foo", + "timestamp": "2024-07-29T00:30:47.266581Z", + }, +} + +MSG_DATA_LOAD = { + "data": {"age": 30, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"}, + "metadata": { + "operation": "load", + "partition-key-type": "primary-key", + "partition-key-value": "public.foo.42", + "record-type": "data", + "schema-name": "public", + "table-name": "foo", + "timestamp": "2024-07-29T00:57:35.691762Z", + }, +} + +MSG_DATA_INSERT = { + "data": {"age": 31, "attributes": '{"baz": "qux"}', "id": 46, "name": "Jane"}, + "metadata": { + "commit-timestamp": "2024-07-29T00:58:17.974340Z", + "operation": "insert", + "partition-key-type": "schema-table", + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/7C007178.3.00000002/7C007178", + "table-name": "foo", + "timestamp": "2024-07-29T00:58:17.983670Z", + "transaction-id": 1139, + "transaction-record-id": 1, + }, +} + +MSG_DATA_UPDATE_VALUE = { + "before-image": {}, + "data": {"age": 33, "attributes": '{"foo": "bar"}', "id": 42, "name": "John"}, + "metadata": { + "commit-timestamp": "2024-07-29T00:58:44.886717Z", + "operation": "update", + "partition-key-type": "schema-table", + "prev-transaction-id": 1139, + "prev-transaction-record-id": 1, + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/7C007328.2.00000002/7C007328", + "table-name": "foo", + "timestamp": "2024-07-29T00:58:44.895275Z", + "transaction-id": 1140, + "transaction-record-id": 1, + }, +} + +MSG_DATA_UPDATE_PK = { + "before-image": {"id": 46}, + "data": {"age": 31, "attributes": '{"baz": "qux"}', "id": 45, "name": "Jane"}, + "metadata": { + "commit-timestamp": "2024-07-29T00:59:07.678294Z", + "operation": "update", + "partition-key-type": "schema-table", + "prev-transaction-id": 1140, + "prev-transaction-record-id": 1, + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/7C0073F8.2.00000002/7C0073F8", + "table-name": "foo", + "timestamp": "2024-07-29T00:59:07.686557Z", + "transaction-id": 1141, + "transaction-record-id": 1, + }, +} + +MSG_DATA_DELETE = { + "data": {"age": None, "attributes": None, "id": 45, "name": None}, + "metadata": { + "commit-timestamp": "2024-07-29T01:09:25.366257Z", + "operation": "delete", + "partition-key-type": "schema-table", + "prev-transaction-id": 1141, + "prev-transaction-record-id": 1, + "record-type": "data", + "schema-name": "public", + "stream-position": "00000002/840001D8.2.00000002/840001D8", + "table-name": "foo", + "timestamp": "2024-07-29T01:09:25.375525Z", + "transaction-id": 1144, + "transaction-record-id": 1, + }, +} + +MSG_CONTROL_AWSDMS = { + "control": { + "table-def": { + "columns": { + "ERROR": {"nullable": False, "type": "STRING"}, + "ERROR_TIME": {"nullable": False, "type": "TIMESTAMP"}, + "STATEMENT": {"nullable": False, "type": "STRING"}, + "TABLE_NAME": {"length": 128, "nullable": False, "type": "STRING"}, + "TABLE_OWNER": {"length": 128, "nullable": False, "type": "STRING"}, + "TASK_NAME": {"length": 128, "nullable": False, "type": "STRING"}, + } + } + }, + "metadata": { + "operation": "create-table", + "partition-key-type": "task-id", + "partition-key-value": "7QBLNBTPCNDEBG7CHI3WA73YFA", + "record-type": "control", + "schema-name": "", + "table-name": "awsdms_apply_exceptions", + "timestamp": "2024-08-04T10:50:10.584772Z", + }, +} + + +@pytest.fixture +def cdc(): + """ + Provide fresh translator instance. + """ + column_types = ColumnTypeMapStore().add( + table=TableAddress(schema="public", table="foo"), + column="attributes", + type_=ColumnType.MAP, + ) + return DMSTranslatorCrateDB(column_types=column_types) + + +def test_decode_cdc_unknown_source(cdc): + with pytest.raises(MessageFormatError) as ex: + cdc.to_sql(MSG_UNKNOWN_SHAPE) + assert ex.match("Record not in DMS format: metadata and/or operation is missing") + + +def test_decode_cdc_missing_schema_or_table(cdc): + with pytest.raises(MessageFormatError) as ex: + cdc.to_sql(MSG_SCHEMA_TABLE_MISSING) + assert ex.match("Schema or table name missing or empty: schema=None, table=None") + + +def test_decode_cdc_unknown_event(cdc): + with pytest.raises(UnknownOperationError) as ex: + cdc.to_sql(MSG_UNKNOWN_OPERATION) + assert ex.match("Unknown CDC event operation: FOOBAR") + assert ex.value.operation == "FOOBAR" + assert ex.value.record == { + "control": {}, + "metadata": {"operation": "FOOBAR", "schema-name": "public", "table-name": "foo"}, + } + + +def test_decode_cdc_sql_ddl_regular(cdc): + assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == 'CREATE TABLE IF NOT EXISTS "public"."foo" (data OBJECT(DYNAMIC));' + + +def test_decode_cdc_sql_ddl_awsdms(cdc): + assert ( + cdc.to_sql(MSG_CONTROL_AWSDMS) + == 'CREATE TABLE IF NOT EXISTS "dms"."awsdms_apply_exceptions" (data OBJECT(DYNAMIC));' + ) + + +def test_decode_cdc_insert(cdc): + assert ( + cdc.to_sql(MSG_DATA_INSERT) == 'INSERT INTO "public"."foo" (data) VALUES ' f"('{json.dumps(RECORD_INSERT)}');" + ) + + +def test_decode_cdc_update_success(cdc): + """ + Update statements need schema knowledge about primary keys. + """ + # Seed translator with control message, describing the table schema. + cdc.to_sql(MSG_CONTROL_CREATE_TABLE) + + # Emulate an UPDATE operation. + assert ( + cdc.to_sql(MSG_DATA_UPDATE_VALUE) == 'UPDATE "public"."foo" ' + f"SET data = '{json.dumps(RECORD_UPDATE)}' " + "WHERE data['id'] = '42';" + ) + + +def test_decode_cdc_update_failure(): + """ + Update statements without schema knowledge are not possible. + + When no `create-table` statement has been processed yet, + the machinery doesn't know about primary keys. + """ + # Emulate an UPDATE operation without seeding the translator. + with pytest.raises(ValueError) as ex: + DMSTranslatorCrateDB().to_sql(MSG_DATA_UPDATE_VALUE) + assert ex.match("Unable to invoke DML operation without primary key information") + + +def test_decode_cdc_delete_success(cdc): + """ + Delete statements need schema knowledge about primary keys. + """ + # Seed translator with control message, describing the table schema. + cdc.to_sql(MSG_CONTROL_CREATE_TABLE) + + # Emulate a DELETE operation. + assert cdc.to_sql(MSG_DATA_DELETE) == 'DELETE FROM "public"."foo" ' "WHERE data['id'] = '45';" + + +def test_decode_cdc_delete_failure(cdc): + """ + Delete statements without schema knowledge are not possible. + + When no `create-table` statement has been processed yet, + the machinery doesn't know about primary keys. + """ + # Emulate an DELETE operation without seeding the translator. + with pytest.raises(ValueError) as ex: + DMSTranslatorCrateDB().to_sql(MSG_DATA_DELETE) + assert ex.match("Unable to invoke DML operation without primary key information")