Skip to content

Commit

Permalink
feat(cdc): parse debezium schema event for mysql (#17707)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Aug 2, 2024
1 parent 911578c commit c09d264
Show file tree
Hide file tree
Showing 14 changed files with 585 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public DbzCdcEngine(
sourceId,
heartbeatTopicPrefix,
transactionTopic,
topicPrefix,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffset;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.risingwave.connector.source.common.CdcConnectorException;
import com.risingwave.proto.ConnectorServiceProto.CdcMessage;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.debezium.connector.postgresql.PostgresOffsetContext;
Expand All @@ -43,6 +44,7 @@ enum EventType {
HEARTBEAT,
TRANSACTION,
DATA,
SCHEMA_CHANGE,
}

public class DbzChangeEventConsumer
Expand All @@ -57,6 +59,7 @@ public class DbzChangeEventConsumer
private final JsonConverter keyConverter;
private final String heartbeatTopicPrefix;
private final String transactionTopic;
private final String schemaChangeTopic;

private volatile DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>>
currentRecordCommitter;
Expand All @@ -66,12 +69,14 @@ public class DbzChangeEventConsumer
long sourceId,
String heartbeatTopicPrefix,
String transactionTopic,
String schemaChangeTopic,
BlockingQueue<GetEventStreamResponse> queue) {
this.connector = connector;
this.sourceId = sourceId;
this.outputChannel = queue;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;
this.transactionTopic = transactionTopic;
this.schemaChangeTopic = schemaChangeTopic;
LOG.info("heartbeat topic: {}, trnx topic: {}", heartbeatTopicPrefix, transactionTopic);

// The default JSON converter will output the schema field in the JSON which is unnecessary
Expand Down Expand Up @@ -105,6 +110,8 @@ private EventType getEventType(SourceRecord record) {
return EventType.HEARTBEAT;
} else if (isTransactionMetaEvent(record)) {
return EventType.TRANSACTION;
} else if (isSchemaChangeEvent(record)) {
return EventType.SCHEMA_CHANGE;
} else {
return EventType.DATA;
}
Expand All @@ -122,6 +129,11 @@ private boolean isTransactionMetaEvent(SourceRecord record) {
return topic != null && topic.equals(transactionTopic);
}

private boolean isSchemaChangeEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.equals(schemaChangeTopic);
}

@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
Expand Down Expand Up @@ -155,7 +167,8 @@ var record = event.value();
switch (eventType) {
case HEARTBEAT:
{
var message = msgBuilder.build();
var message =
msgBuilder.setMsgType(CdcMessage.CdcMessageType.HEARTBEAT).build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
break;
Expand All @@ -168,14 +181,54 @@ var record = event.value();
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
.setIsTransactionMeta(true)
.setMsgType(CdcMessage.CdcMessageType.TRANSACTION_META)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(trxTs)
.build();
LOG.debug("transaction => {}", message);
respBuilder.addEvents(message);
break;
}

case SCHEMA_CHANGE:
{
var sourceStruct = ((Struct) record.value()).getStruct("source");
if (sourceStruct == null) {
throw new CdcConnectorException(
"source field is missing in schema change event");
}

// upstream event time
long sourceTsMs = sourceStruct.getInt64("ts_ms");
byte[] payload =
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());

// We intentionally don't set the fullTableName for schema change event,
// since it doesn't need to be routed to a specific cdc table
var message =
msgBuilder
.setMsgType(CdcMessage.CdcMessageType.SCHEMA_CHANGE)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug(
"offset => {}, key => {}, payload => {}",
message.getOffset(),
message.getKey(),
message.getPayload());
respBuilder.addEvents(message);

// emit the schema change event as a single response
respBuilder.setSourceId(sourceId);
var response = respBuilder.build();
outputChannel.put(response);

// reset the response builder
respBuilder = GetEventStreamResponse.newBuilder();
break;
}

case DATA:
{
// Topic naming conventions
Expand All @@ -192,10 +245,11 @@ var record = event.value();
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
if (sourceStruct == null) {
throw new CdcConnectorException(
"source field is missing in data change event");
}
long sourceTsMs = sourceStruct.getInt64("ts_ms");
byte[] payload =
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
Expand All @@ -208,6 +262,7 @@ var record = event.value();
String msgKey = key == null ? "" : new String(key, StandardCharsets.UTF_8);
var message =
msgBuilder
.setMsgType(CdcMessage.CdcMessageType.DATA)
.setFullTableName(fullTableName)
.setPayload(msgPayload)
.setKey(msgKey)
Expand Down
11 changes: 10 additions & 1 deletion proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,22 @@ message SinkCoordinatorStreamResponse {
/* Source Service */

message CdcMessage {
enum CdcMessageType {
UNSPECIFIED = 0;
HEARTBEAT = 1;
DATA = 2;
TRANSACTION_META = 3;
SCHEMA_CHANGE = 4;
}

// The value of the Debezium message
string payload = 1;
string partition = 2;
string offset = 3;
string full_table_name = 4;
int64 source_ts_ms = 5;
bool is_transaction_meta = 6;
CdcMessageType msg_type = 6;

// The key of the Debezium message, which only used by `mongodb-cdc` connector.
string key = 7;
}
Expand Down
18 changes: 18 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ddl_service;
import "catalog.proto";
import "common.proto";
import "meta.proto";
import "plan_common.proto";
import "stream_plan.proto";

option java_package = "com.risingwave.proto";
Expand Down Expand Up @@ -444,6 +445,23 @@ message CommentOnResponse {
uint64 version = 2;
}

message TableSchemaChange {
enum TableChangeType {
UNSPECIFIED = 0;
ALTER = 1;
CREATE = 2;
DROP = 3;
}

TableChangeType change_type = 1;
string cdc_table_name = 2;
repeated plan_common.ColumnCatalog columns = 3;
}

message SchemaChangeEnvelope {
repeated TableSchemaChange table_changes = 1;
}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ impl<'a> JsonbRef<'a> {
.ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
}

/// If the JSON is a string, returns the associated string.
pub fn as_string(&self) -> Result<String, String> {
self.0
.as_str()
.map(|s| s.to_owned())
.ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
}

/// Attempt to read jsonb as a JSON number.
///
/// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good
Expand Down
3 changes: 3 additions & 0 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum AccessError {

#[error(transparent)]
NotImplemented(#[from] NotImplemented),
// NOTE: We intentionally don't embed `anyhow::Error` in `AccessError` since it happens
// in record-level and it might be too heavy to capture the backtrace
// when creating a new `anyhow::Error`.
}

pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/parser/debezium/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
mod avro_parser;
mod debezium_parser;
mod mongo_json_parser;
pub mod schema_change;
pub mod simd_json_parser;

pub use avro_parser::*;
pub use debezium_parser::*;
pub use mongo_json_parser::DebeziumMongoJsonParser;
94 changes: 94 additions & 0 deletions src/connector/src/parser/debezium/schema_change.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::ColumnCatalog;
use risingwave_pb::ddl_service::table_schema_change::TableChangeType as PbTableChangeType;
use risingwave_pb::ddl_service::{
SchemaChangeEnvelope as PbSchemaChangeEnvelope, TableSchemaChange as PbTableSchemaChange,
};

#[derive(Debug)]
pub struct SchemaChangeEnvelope {
pub table_changes: Vec<TableSchemaChange>,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum TableChangeType {
Unspecified,
Alter,
Create,
Drop,
}

impl TableChangeType {
#[allow(dead_code)]
pub fn from_proto(value: PbTableChangeType) -> Self {
match value {
PbTableChangeType::Alter => TableChangeType::Alter,
PbTableChangeType::Create => TableChangeType::Create,
PbTableChangeType::Drop => TableChangeType::Drop,
PbTableChangeType::Unspecified => TableChangeType::Unspecified,
}
}

pub fn to_proto(self) -> PbTableChangeType {
match self {
TableChangeType::Alter => PbTableChangeType::Alter,
TableChangeType::Create => PbTableChangeType::Create,
TableChangeType::Drop => PbTableChangeType::Drop,
TableChangeType::Unspecified => PbTableChangeType::Unspecified,
}
}
}

impl From<&str> for TableChangeType {
fn from(value: &str) -> Self {
match value {
"ALTER" => TableChangeType::Alter,
"CREATE" => TableChangeType::Create,
"DROP" => TableChangeType::Drop,
_ => TableChangeType::Unspecified,
}
}
}

#[derive(Debug)]
pub struct TableSchemaChange {
pub(crate) cdc_table_name: String,
pub(crate) columns: Vec<ColumnCatalog>,
pub(crate) change_type: TableChangeType,
}

impl SchemaChangeEnvelope {
pub fn to_protobuf(&self) -> PbSchemaChangeEnvelope {
let table_changes = self
.table_changes
.iter()
.map(|table_change| {
let columns = table_change
.columns
.iter()
.map(|column| column.to_protobuf())
.collect();
PbTableSchemaChange {
change_type: table_change.change_type.to_proto() as _,
cdc_table_name: table_change.cdc_table_name.clone(),
columns,
}
})
.collect();

PbSchemaChangeEnvelope { table_changes }
}
}
7 changes: 7 additions & 0 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ impl DebeziumJsonAccessBuilder {
json_parse_options: JsonParseOptions::new_for_debezium(timestamptz_handling),
})
}

pub fn new_for_schema_event() -> ConnectorResult<Self> {
Ok(Self {
value: None,
json_parse_options: JsonParseOptions::default(),
})
}
}

impl AccessBuilder for DebeziumJsonAccessBuilder {
Expand Down
8 changes: 8 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod unified;
mod upsert_parser;
mod util;

use debezium::schema_change::SchemaChangeEnvelope;
pub use debezium::DEBEZIUM_IGNORE_KEY;
use risingwave_common::bitmap::BitmapBuilder;
pub use unified::{AccessError, AccessResult};
Expand Down Expand Up @@ -579,6 +580,9 @@ pub enum ParseResult {
Rows,
/// A transaction control message is parsed.
TransactionControl(TransactionControl),

/// A schema change message is parsed.
SchemaChange(SchemaChangeEnvelope),
}

#[derive(Clone, Copy, Debug, PartialEq)]
Expand Down Expand Up @@ -829,6 +833,10 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}
}
},

Ok(ParseResult::SchemaChange(_)) => {
// TODO
}
}
}

Expand Down
Loading

0 comments on commit c09d264

Please sign in to comment.