-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transform: MongoDB CDC event to CrateDB SQL #4
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #4 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 7 8 +1
Lines 231 289 +58
=========================================
+ Hits 231 289 +58
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
return ( | ||
f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" | ||
) | ||
|
||
def to_sql(self, record: t.Dict[str, t.Any]) -> str: | ||
""" | ||
Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record. | ||
""" | ||
|
||
if "operationType" in record and record["operationType"]: | ||
operation_type: str = str(record["operationType"]) | ||
else: | ||
raise ValueError(f"Operation Type missing or empty: {record}") | ||
|
||
if operation_type == "insert": | ||
oid: str = self.get_document_key(record) | ||
full_document = self.get_full_document(record) | ||
values_clause = self.full_document_to_values(full_document) | ||
sql = ( | ||
f"INSERT INTO {self.table_name} " | ||
f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " | ||
f"VALUES ('{oid}', '{values_clause}');" | ||
) | ||
|
||
# In order to use "full document" representations from "update" events, | ||
# you need to use `watch(full_document="updateLookup")`. | ||
# https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations | ||
elif operation_type in ["update", "replace"]: | ||
full_document = self.get_full_document(record) | ||
values_clause = self.full_document_to_values(full_document) | ||
where_clause = self.where_clause(record) | ||
sql = f"UPDATE {self.table_name} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" | ||
|
||
elif operation_type == "delete": | ||
where_clause = self.where_clause(record) | ||
sql = f"DELETE FROM {self.table_name} WHERE {where_clause};" | ||
|
||
# TODO: Enable applying the "drop" operation conditionally when enabled. | ||
elif operation_type == "drop": | ||
logger.info("Received 'drop' operation, but skipping to apply 'DROP TABLE'") | ||
sql = "" | ||
|
||
elif operation_type == "invalidate": | ||
logger.info("Ignoring 'invalidate' CDC operation") | ||
sql = "" | ||
|
||
else: | ||
raise ValueError(f"Unknown CDC operation type: {operation_type}") | ||
|
||
return sql | ||
|
||
@staticmethod | ||
def get_document_key(record: t.Dict[str, t.Any]) -> str: | ||
""" | ||
Return value of document key (MongoDB document OID) from CDC record. | ||
|
||
"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} | ||
""" | ||
return str(record.get("documentKey", {}).get("_id")) | ||
|
||
@staticmethod | ||
def get_full_document(record: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: | ||
""" | ||
return `fullDocument` representation from record. | ||
""" | ||
return t.cast(dict, record.get("fullDocument")) | ||
|
||
def full_document_to_values(self, document: t.Dict[str, t.Any]) -> str: | ||
""" | ||
Serialize CDC event's "fullDocument" representation to a `VALUES` clause in CrateDB SQL syntax. | ||
|
||
IN (top-level stripped): | ||
"fullDocument": { | ||
"_id": ObjectId("669683c2b0750b2c84893f3e"), | ||
"id": "5F9E", | ||
"data": {"temperature": 42.42, "humidity": 84.84}, | ||
"meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, | ||
} | ||
|
||
OUT: | ||
{"_id": {"$oid": "669683c2b0750b2c84893f3e"}, | ||
"id": "5F9E", | ||
"data": {"temperature": 42.42, "humidity": 84.84}, | ||
"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, | ||
} | ||
""" | ||
return json.dumps(self.deserialize_item(document)) | ||
|
||
def where_clause(self, record: t.Dict[str, t.Any]) -> str: | ||
""" | ||
When converging an oplog of a MongoDB collection, the primary key is always the MongoDB document OID. | ||
|
||
IN (top-level stripped): | ||
"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} | ||
|
||
OUT: | ||
WHERE oid = '669683c2b0750b2c84893f3e' | ||
""" | ||
oid = self.get_document_key(record) | ||
return f"oid = '{oid}'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dear @hlcianfagna, @hammerhead, @surister, and @wierdvanderhaar,
may I kindly ask you to review those routines? If you worked with DynamoDB CDC convergence already, they might feel familiar to you.
Thanks in advance,
Andreas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DDL schema
In particular, I am interested if you agree with the SQL DDL schema and the corresponding update strategy, or if you would prefer a different one, and how that would look like.
CREATE TABLE <tablename> (oid TEXT, data OBJECT(DYNAMIC));
Value serialization
Currently, typed values are retained in their nested-JSON forms, so a sample content of the data
column looks like outlined below. Here, I would be interested what you think about the nestings of "timestamp": {"$date": "2024-07-11T23:17:42Z"}
and friends, how MongoDB's special values are serialized like.
- Is it good to go for a first iteration?
- Should there be a subsequent iteration to address any shortcomings you may report, or wish, because crafting SQL queries which take those nestings into consideration will be totally silly or even impossible?
{
// Coming from MongoDB.
// Can also be omitted, because the OID will be stored into a dedicated `oid` column anyway.
// Right now, we retained the `fullDocument` payload 1:1, but that will be easy to spice up
// by introducing corresponding options/flavours.
"_id": {
"$oid": "6696c937f6e1770586480277"
},
// Coming from user.
"id": "5F9E",
"meta": {
// Scalar types will not cause any headaches.
"device": "foo",
// Currently, special data types are only value-unmarshalled, while retaining the structure
// as conveyed by MongoDB 1:1. Optionally, or when there is demand, the routine may
// also unmarshal the nested substructure.
"timestamp": {
"$date": "2024-07-11T23:17:42Z"
}
},
"data": {
"humidity": 84.84,
"temperature": 42.42
}
}
About
Similar to the transformer for DynamoDB CDC events, this patch adds a little converter which takes care of MongoDB Change Stream events. In this case, it translates ingress CDC events into SQL statements suitable for CrateDB.
Documentation
The Relay MongoDB Change Stream into CrateDB document describes how to get started using the basic relay example program.