Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform: MongoDB CDC event to CrateDB SQL #4

Merged
merged 1 commit into from
Jul 19, 2024
Merged

Transform: MongoDB CDC event to CrateDB SQL #4

merged 1 commit into from
Jul 19, 2024

Conversation

amotl
Copy link
Member

@amotl amotl commented Jul 16, 2024

About

Similar to the transformer for DynamoDB CDC events, this patch adds a little converter which takes care of MongoDB Change Stream events. In this case, it translates ingress CDC events into SQL statements suitable for CrateDB.

Documentation

The Relay MongoDB Change Stream into CrateDB document describes how to get started using the basic relay example program.

Copy link

codecov bot commented Jul 16, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 100.00%. Comparing base (f5975b1) to head (9c0bdc8).

Additional details and impacted files
@@            Coverage Diff            @@
##              main        #4   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files            7         8    +1     
  Lines          231       289   +58     
=========================================
+ Hits           231       289   +58     
Flag Coverage Δ
main ?
mongodb 100.00% <100.00%> (?)
vanilla 100.00% <ø> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines +15 to +192
return (
f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));"
)

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record.
"""

if "operationType" in record and record["operationType"]:
operation_type: str = str(record["operationType"])
else:
raise ValueError(f"Operation Type missing or empty: {record}")

if operation_type == "insert":
oid: str = self.get_document_key(record)
full_document = self.get_full_document(record)
values_clause = self.full_document_to_values(full_document)
sql = (
f"INSERT INTO {self.table_name} "
f"({self.ID_COLUMN}, {self.DATA_COLUMN}) "
f"VALUES ('{oid}', '{values_clause}');"
)

# In order to use "full document" representations from "update" events,
# you need to use `watch(full_document="updateLookup")`.
# https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations
elif operation_type in ["update", "replace"]:
full_document = self.get_full_document(record)
values_clause = self.full_document_to_values(full_document)
where_clause = self.where_clause(record)
sql = f"UPDATE {self.table_name} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};"

elif operation_type == "delete":
where_clause = self.where_clause(record)
sql = f"DELETE FROM {self.table_name} WHERE {where_clause};"

# TODO: Enable applying the "drop" operation conditionally when enabled.
elif operation_type == "drop":
logger.info("Received 'drop' operation, but skipping to apply 'DROP TABLE'")
sql = ""

elif operation_type == "invalidate":
logger.info("Ignoring 'invalidate' CDC operation")
sql = ""

else:
raise ValueError(f"Unknown CDC operation type: {operation_type}")

return sql

@staticmethod
def get_document_key(record: t.Dict[str, t.Any]) -> str:
"""
Return value of document key (MongoDB document OID) from CDC record.

"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}
"""
return str(record.get("documentKey", {}).get("_id"))

@staticmethod
def get_full_document(record: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
"""
return `fullDocument` representation from record.
"""
return t.cast(dict, record.get("fullDocument"))

def full_document_to_values(self, document: t.Dict[str, t.Any]) -> str:
"""
Serialize CDC event's "fullDocument" representation to a `VALUES` clause in CrateDB SQL syntax.

IN (top-level stripped):
"fullDocument": {
"_id": ObjectId("669683c2b0750b2c84893f3e"),
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"},
}

OUT:
{"_id": {"$oid": "669683c2b0750b2c84893f3e"},
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"},
}
"""
return json.dumps(self.deserialize_item(document))

def where_clause(self, record: t.Dict[str, t.Any]) -> str:
"""
When converging an oplog of a MongoDB collection, the primary key is always the MongoDB document OID.

IN (top-level stripped):
"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}

OUT:
WHERE oid = '669683c2b0750b2c84893f3e'
"""
oid = self.get_document_key(record)
return f"oid = '{oid}'"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dear @hlcianfagna, @hammerhead, @surister, and @wierdvanderhaar,

may I kindly ask you to review those routines? If you worked with DynamoDB CDC convergence already, they might feel familiar to you.

Thanks in advance,
Andreas.

Copy link
Member Author

@amotl amotl Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DDL schema

In particular, I am interested if you agree with the SQL DDL schema and the corresponding update strategy, or if you would prefer a different one, and how that would look like.

CREATE TABLE <tablename> (oid TEXT, data OBJECT(DYNAMIC));

Value serialization

Currently, typed values are retained in their nested-JSON forms, so a sample content of the data column looks like outlined below. Here, I would be interested what you think about the nestings of "timestamp": {"$date": "2024-07-11T23:17:42Z"} and friends, how MongoDB's special values are serialized like.

  • Is it good to go for a first iteration?
  • Should there be a subsequent iteration to address any shortcomings you may report, or wish, because crafting SQL queries which take those nestings into consideration will be totally silly or even impossible?
{

  // Coming from MongoDB.
  // Can also be omitted, because the OID will be stored into a dedicated `oid` column anyway.
  // Right now, we retained the `fullDocument` payload 1:1, but that will be easy to spice up
  // by introducing corresponding options/flavours.
  "_id": {
    "$oid": "6696c937f6e1770586480277"
  },

  // Coming from user.
  "id": "5F9E",
  "meta": {

    // Scalar types will not cause any headaches.
    "device": "foo",

    // Currently, special data types are only value-unmarshalled, while retaining the structure
    // as conveyed by MongoDB 1:1. Optionally, or when there is demand, the routine may
    // also unmarshal the nested substructure.
    "timestamp": {
      "$date": "2024-07-11T23:17:42Z"
    }
  },
  "data": {
    "humidity": 84.84,
    "temperature": 42.42
  }
}

@amotl amotl changed the title Add transformer for MongoDB CDC to CrateDB SQL conversion Transform: MongoDB CDC event to CrateDB SQL Jul 16, 2024
@amotl amotl merged commit 3979f9a into main Jul 19, 2024
15 checks passed
@amotl amotl deleted the mongodb branch July 19, 2024 19:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant