diff --git a/tests/transform/test_dynamodb_cdc.py b/tests/transform/test_dynamodb_cdc.py index a4d7add..92d1c37 100644 --- a/tests/transform/test_dynamodb_cdc.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -4,7 +4,7 @@ import pytest from commons_codec.model import SQLOperation, UniversalRecord -from commons_codec.transform.dynamodb import CrateDBTypeDeserializer +from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator, DynamoDBFullLoadTranslator pytestmark = pytest.mark.dynamodb @@ -344,3 +344,33 @@ def test_deserialize_list_objects(): ] } ) + + +@pytest.mark.integration +def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo): + """ + Verify converging NewImage CDC INSERT event to CrateDB. + """ + + # Use the full-load translator only for running the SQL DDL. + translator = DynamoDBFullLoadTranslator( + table_name="from.dynamodb", primary_key_schema=dynamodb_full_translator_foo.primary_key_schema + ) + cratedb.database.run_sql(translator.sql_ddl) + + # Compute CrateDB operation (SQL+parameters) from DynamoDB record. + translator = DynamoDBCDCTranslator(table_name="from.dynamodb") + operation = translator.to_sql(MSG_INSERT_NESTED) + + # Insert into CrateDB. + cratedb.database.run_sql(operation.statement, operation.parameters) + + # Verify data in target database. + assert cratedb.database.table_exists("from.dynamodb") is True + assert cratedb.database.refresh_table("from.dynamodb") is True + assert cratedb.database.count_records("from.dynamodb") == 1 + + results = cratedb.database.run_sql('SELECT * FROM "from".dynamodb;', records=True) # noqa: S608 + assert results[0]["pk"]["id"] == MSG_INSERT_NESTED["dynamodb"]["NewImage"]["id"]["S"] + assert results[0]["data"]["meta"]["timestamp"] == "2024-07-12T01:17:42" + assert results[0]["aux"] == {}