Skip to content

Commit

Permalink
DynamoDB: Fix String Set and Number Set representation to CrateDB.
Browse files Browse the repository at this point in the history
  • Loading branch information
surister authored and Andreas Motl committed Aug 22, 2024
1 parent 1ee5217 commit f45530f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 14 deletions.
29 changes: 26 additions & 3 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,50 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Distributed under the terms of the LGPLv3 license, see LICENSE.
import decimal

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction

import logging
import typing as t

import simplejson as json
import toolz

from commons_codec.util.data import is_container, is_number
from commons_codec.vendor.boto3.dynamodb.types import TypeDeserializer
from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer

logger = logging.getLogger(__name__)


# Inhibit Inexact Exceptions
DYNAMODB_CONTEXT.traps[decimal.Inexact] = False
# Inhibit Rounded Exceptions
DYNAMODB_CONTEXT.traps[decimal.Rounded] = False


class CrateDBTypeDeserializer(TypeDeserializer):
def _deserialize_n(self, value):
return float(super()._deserialize_n(value))

def _deserialize_b(self, value):
return value

def _deserialize_ns(self, value):
return list(super()._deserialize_ns(value))

def _deserialize_ss(self, value):
return list(super()._deserialize_ss(value))

def _deserialize_bs(self, value):
return list(super()._deserialize_bs(value))


class DynamoCDCTranslatorBase:
"""
Translate DynamoDB CDC events into different representations.
"""

def __init__(self):
self.deserializer = TypeDeserializer()
self.deserializer = CrateDBTypeDeserializer()

def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]:
"""
Expand Down
89 changes: 78 additions & 11 deletions tests/transform/test_dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import decimal
from collections import Counter
from decimal import Decimal

import pytest

from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoCDCTranslatorCrateDB

READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84}

Expand All @@ -29,6 +30,10 @@
"temperature": {"N": "42.42"},
"device": {"S": "foo"},
"timestamp": {"S": "2024-07-12T01:17:42"},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 4]},
"binary_set": {"BS": ["U3Vubnk="]},
# "varied_list": {'L': [1, 0.23123, "Hello world"]}
},
"SizeBytes": 99,
"ApproximateCreationDateTimePrecision": "MICROSECOND",
Expand All @@ -49,6 +54,9 @@
"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"},
"data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}},
"meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
},
"SizeBytes": 156,
"ApproximateCreationDateTimePrecision": "MICROSECOND",
Expand All @@ -71,6 +79,9 @@
"device": {"S": "bar"},
"location": {"S": "Sydney"},
"timestamp": {"S": "2024-07-12T01:17:42"},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
},
"OldImage": {
"humidity": {"N": "84.84"},
Expand Down Expand Up @@ -100,6 +111,9 @@
"empty_map": {"M": {}},
"empty_list": {"L": []},
"timestamp": {"S": "2024-07-12T01:17:42"},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
},
"OldImage": {
"humidity": {"N": "84.84"},
Expand Down Expand Up @@ -128,6 +142,9 @@
"temperature": {"N": "55.66"},
"device": {"S": "bar"},
"timestamp": {"S": "2024-07-12T01:17:42"},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
},
"SizeBytes": 99,
"ApproximateCreationDateTimePrecision": "MICROSECOND",
Expand All @@ -137,9 +154,7 @@


def test_decode_ddb_deserialize_type():
assert DynamoCDCTranslatorCrateDB(table_name="foo").deserialize_item({"foo": {"N": "84.84"}}) == {
"foo": decimal.Decimal("84.84")
}
assert DynamoCDCTranslatorCrateDB(table_name="foo").deserialize_item({"foo": {"N": "84.84"}}) == {"foo": 84.84}


def test_decode_cdc_sql_ddl():
Expand All @@ -161,7 +176,8 @@ def test_decode_cdc_unknown_event():
def test_decode_cdc_insert_basic():
assert (
DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_BASIC) == 'INSERT INTO "foo" (data) '
'VALUES (\'{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"}\');'
'VALUES (\'{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42",'
' "string_set": ["location_1"], "number_set": [1.0, 2.0, 3.0, 4.0], "binary_set": ["U3Vubnk="]}\');'
)


Expand All @@ -170,23 +186,28 @@ def test_decode_cdc_insert_nested():
DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_NESTED)
== 'INSERT INTO "foo" (data) VALUES (\'{"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", '
'"data": {"temperature": 42.42, "humidity": 84.84}, '
'"meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}}\');'
'"meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"},'
' "string_set": ["location_1"], "number_set": [0.34, 1.0, 2.0, 3.0],'
' "binary_set": ["U3Vubnk="]}\');'
)


def test_decode_cdc_modify_basic():
assert (
DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_MODIFY_BASIC) == 'UPDATE "foo" '
"SET data['humidity'] = 84.84, data['temperature'] = 55.66, data['location'] = 'Sydney' "
"WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';"
"SET data['humidity'] = 84.84, data['temperature'] = 55.66, data['location'] = 'Sydney', "
"data['string_set'] = ['location_1'], data['number_set'] = [0.34, 1.0, 2.0, 3.0],"
" data['binary_set'] = ['U3Vubnk=']"
" WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';"
)


def test_decode_cdc_modify_nested():
assert (
DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_MODIFY_NESTED) == 'UPDATE "foo" '
"SET data['tags'] = ['foo', 'bar'], data['empty_map'] = {}, data['empty_list'] = [] "
"WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';"
"SET data['tags'] = ['foo', 'bar'], data['empty_map'] = {}, data['empty_list'] = [],"
" data['string_set'] = ['location_1'], data['number_set'] = [0.34, 1.0, 2.0, 3.0],"
" data['binary_set'] = ['U3Vubnk='] WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';"
)


Expand All @@ -195,3 +216,49 @@ def test_decode_cdc_remove():
DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REMOVE) == 'DELETE FROM "foo" '
"WHERE data['device'] = 'bar' AND data['timestamp'] = '2024-07-12T01:17:42';"
)


def test_deserialize_number_set():
deserializer = CrateDBTypeDeserializer()
assert deserializer.deserialize({"NS": ["1", "1.25"]}) == [
Decimal("1"),
Decimal("1.25"),
]


def test_deserialize_string_set():
deserializer = CrateDBTypeDeserializer()
# We us Counter because when the set is transformed into a list, it loses order.
assert Counter(deserializer.deserialize({"SS": ["foo", "bar"]})) == Counter(
[
"foo",
"bar",
]
)


def test_deserialize_binary_set():
deserializer = CrateDBTypeDeserializer()
assert Counter(deserializer.deserialize({"BS": [b"\x00", b"\x01"]})) == Counter([b"\x00", b"\x01"])


def test_deserialize_list_objects():
deserializer = CrateDBTypeDeserializer()
assert deserializer.deserialize(
{
"L": [
{
"M": {
"foo": {"S": "mystring"},
"bar": {"M": {"baz": {"N": "1"}}},
}
},
{
"M": {
"foo": {"S": "other string"},
"bar": {"M": {"baz": {"N": "2"}}},
}
},
]
}
)

0 comments on commit f45530f

Please sign in to comment.