Skip to content

Commit

Permalink
DBZ-2852 Add static gRPC authentication to Vitess Connector (debezium#12
Browse files Browse the repository at this point in the history
)
  • Loading branch information
keweishang authored Dec 15, 2020
1 parent 96a917b commit 4da3e3e
Show file tree
Hide file tree
Showing 25 changed files with 199 additions and 206 deletions.
175 changes: 3 additions & 172 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,16 @@ E.g. the format of emitted messages may change, specific features may not be imp

## Supported Features

- Constantly get data-changes of from all shards (or from a specific shard) of the keyspace from VTGate (via gRPC, a.k.a VStream).
- One connector instance can subscribe to all shards of a given keyspace.
- One connector instance can also choose to subscribe to only 1 shard. However, the vitess `Reshard` operation would be manual in this case.
- Supoort vitess `Reshard` operation when subscribing to all shards of a given keyspace, no data loss, no data duplication.
- Support vitess `MoveTables` operation, no data loss, no data duplication.
- Each transaction has multiple events. All events in the transaction have the same VGTID.
- Support basic MySQL type -> Kafka Connect type mapping.
- Support AVRO/JSON connect converters.
- Support extracting only the `after` struct of the message, by using `ExtractNewRecordState` single message transformation (SMT).
- If for any reason, a message in a transaction fails the task, we can restart from the offset where it has left off.
- If no previous offset exists, start from the current vgtid.
- Have at-least-once delivery guarantee. In most cases, data is delivered once. In case of immediate shutdown of the server, data could be delivered twice.
- Mapping vitess table to kafka topics (e.g. Mapping vitess table `product` in `commerce` keyspace to kafka topic `connect_vitess_customer_sharded.customer.customer`, `connect_vitess_customer_sharded` is the `database.server.name` connector config, the first `customer` is the keyspace, the second `customer` is the table's name)
- Use async grpc stub for vstream service so that `ChangeEventSourceCoordinator` can be stopped gracefully
- Some configurations supported out-of-the-box from Debezium Development Framework.
- database.server.name ✅
- table.include.list ✅
- table.exclude.list ✅
- column.include.list ✅
- message.key.columns ✅
- tombstones.on.delete ✅ (was limited by missing primary key in VStream API, so need to be used with `message.key.columns`, but now they've added support for this, so it's in the TODOs)
- max.queue.size ✅
- max.batch.size ✅
- poll.interval.ms ✅
- event.processing.failure.handling.mode ✅
- converters ✅
- Use async grpc stub for vstream service so that `ChangeEventSourceCoordinator` can be stopped gracefully.
- Vitess Sequence tables also generate events, users can optionally filter them out by `table.exclude.list`.

## Future Improvements

- Add primary key to message. A workaround at the moment is using `message.key.columns` configuration.
- Support for initial database snapshot
- Support `decimal.handling.mode` configuration. (It was limited by VStream API, but now Vitess has added support for this.)
- Support `time.precision.mode` configuration.
- Support nullable columns. At the moment, all columns are nullable.
- Add authentication to VTGate gRPC

## Building The Connector

Please see the [README.md](https://github.com/debezium/debezium#building-debezium) in the main repository for general instructions on building Debezium from source (prerequisites, usage of Docker etc).
Expand All @@ -80,10 +53,10 @@ would have 5 seconds to consume all expected `SourceRecord` before fail the test
Each connector instance captures data-changes from all shards in a keyspace, or from a specific shard in a keyspace.

If you subscribe to all shards, no VTCtld is needed to get the initial vgtid.
![VitessAllShards](./documentation/assets/VitessAllShards.png)
![VitessAllShards](./documentation/assets/AllShards.png)

If you subscribe to a specific shard, connectors communicate with the VTCtld to get the initial (a.k.a. the current) vgtid position of the keyspace/shard.
![ConnectorShardMapping](./documentation/assets/ConnectorShardMapping.png)
![ConnectorShardMapping](./documentation/assets/SingleShard.png)

Internally, each connector constantly polls data-changes from VStream gRPC and send them to an in-memory queue.
Connector task polls data-changes from the queue. See below:
Expand All @@ -92,148 +65,6 @@ Connector task polls data-changes from the queue. See below:
The following is the vitess architecture within a specific cell (a.k.a. data center or AZ in AWS term):
![VitessArchitecture](./documentation/assets/VitessArchitecture.png)

## Supported Streaming Metrics

The MBean is `debezium.vitess:type=connector-metrics,context=binlog,server=<database.server.name>`.

| MBean Attribute Name | Type | Description |
| ----------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Connected | boolean | Flag that denotes whether the connector is currently connected to the database server. |
| NumberOfCommittedTransactions | long | The number of processed transactions that were committed. |
| QueueTotalCapacity | int | The length the queue used to pass events between the streamer and the main Kafka Connect loop. |
| QueueRemainingCapacity | int | The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop. |
| TotalNumberOfEventsSeen | long | The total number of events that this connector has seen since last started or reset, including filtered, error, normal row-change events. Not including DDL events. |
| NumberOfEventsFiltered | long | The number of events that have been filtered by include/exclude list filtering rules configured on the connector. |
| MilliSecondsBehindSource | long | The number of milliseconds between the last change event’s timestamp (a.k.a. commit time) and the connector processing it. |
| MilliSecondsSinceLastEvent | long | The number of milliseconds since the connector has read and processed the most recent event. |

## Data Types

- **Vitess gRPC column type** : column type returned by Vitess gRPC
- **literal type** : how the value is represented using Kafka Connect schema types
- **semantic type** : how the Kafka Connect schema captures the meaning of the field (schema name)

| MySQL type | Vitess gRPC column type | Literal type | Semantic type |
| ---------- | ----------------------- | ------------ | ------------- |
| BOOLEAN | INT8 | INT16 | n/a |
| TINYINT | INT8 | INT16 | n/a |
| SMALLINT | INT16 | INT16 | n/a |
| MEDIUMINT | INT24 | INT32 | n/a |
| INT | INT32 | INT32 | n/a |
| BIGINT | INT64 | INT64 | n/a |
| REAL | | | n/a |
| FLOAT | FLOAT32 | FLOAT64 | n/a |
| DOUBLE | FLOAT64 | FLOAT64 | n/a |
| BIT | | | |
| CHAR | CHAR | STRING | n/a |
| VARCHAR | VARCHAR | STRING | n/a |
| BINARY | BINARY | STRING | n/a |
| VARBINARY | VARBINARY | STRING | n/a |
| TINYBLOB | | | |
| TINYTEXT | | | |
| BLOB | | | |
| TEXT | TEXT | STRING | n/a |
| MEDIUMBLOB | | | |
| MEDIUMTEXT | TEXT | STRING | n/a |
| LONGBLOB | | | |
| LONGTEXT | TEXT | STRING | n/a |
| JSON | JSON | | |
| ENUM | ENUM | STRING | ordinals |
| SET | SET | STRING | ordinals |
| YEAR | YEAR | STRING | |
| TIMESTAMP | TIMESTAMP | STRING | |
| DATETIME | DATETIME | STRING | |
| DATE | DATE | STRING | |
| TIME | TIME | STRING | |
| DECIMAL | DECIMAL | STRING | |

Inserting a row in the following table

```bash
CREATE TABLE `type_table` (
`type_id` bigint(20) NOT NULL,
`tinyint1_col` tinyint(1) DEFAULT '1',
`smallint_col` smallint(5) DEFAULT '1',
`mediumint_col` mediumint(8) DEFAULT '1',
`int_col` int(10) DEFAULT '1',
`float_col` float DEFAULT '1.2',
`double_col` double DEFAULT '1.2',
`decimal_col` decimal(10,4) NOT NULL DEFAULT '1.0000',
`time_col` time NOT NULL DEFAULT '00:00:00',
`date_col` date NOT NULL DEFAULT '2020-02-12',
`datetime_col` datetime NOT NULL DEFAULT '2020-02-12 00:00:00',
`created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`char_col` char(9) NOT NULL DEFAULT 'my_char',
`varchar_col` varchar(16) NOT NULL DEFAULT 'my_varchar',
`text_col` text,
`mediumtext_col` mediumtext,
`longtext_col` longtext NOT NULL,
`binary_col` binary(255) DEFAULT NULL,
`varbinary_col` varbinary(12) DEFAULT NULL,
`enum_col` enum('small','medium','large') NOT NULL DEFAULT 'medium',
`set_col` set('a','b','c','d') NOT NULL DEFAULT 'b',
PRIMARY KEY (`type_id`)
)
```

would send the following record to the `connect_vitess_customer_sharded.customer.type_table` topic,
where `connect_vitess_customer_sharded` is the `database.server.name`, `customer` is the keyspace, `type_table` is the table's name:

```json
{
"before" : null,
"after" : {
"type_id" : 1,
"tinyint1_col" : 1,
"smallint_col" : 1,
"mediumint_col" : 1,
"int_col" : 1,
"float_col" : 1.2000000476837158,
"double_col" : 1.2,
"decimal_col" : "1.0000",
"time_col" : "00:00:00",
"date_col" : "2020-02-12",
"datetime_col" : "2020-02-12 00:00:00",
"created" : "2020-10-08 09:41:05",
"char_col" : "my_char",
"varchar_col" : "my_varchar",
"text_col" : null,
"mediumtext_col" : null,
"longtext_col" : "my_longtext",
"binary_col" : null,
"varbinary_col" : null,
"enum_col" : "2",
"set_col" : "2"
},
"source" : {
"version" : "1.4.0-SNAPSHOT",
"connector" : "vitess",
"name" : "connect_vitess_customer_sharded",
"ts_ms" : 1602150065000,
"snapshot" : "false",
"db" : "connect_vitess_customer_sharded",
"schema" : "customer",
"table" : "type_table",
"vgtid" : "[{\"keyspace\":\"customer\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"customer\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-45\"}]"
},
"op" : "c",
"ts_ms" : 1602150065082,
"transaction" : null
}
```

## Schema Migration

| Entity | Description | Rows from older binlog | Rows from newer binlog |
| ------ | --------------------------------------------------------------- | ------------------------------------------ | --------------------------------------------------------- |
| Table | Rename table | [ok] rows have old table name | [ok] rows have new table name |
| Table | Drop column | [ok] include the dropped column | [ok] does not include the dropped column |
| Table | Add column to the end | [ok] does not include the added column | [ok] include the new column |
| Table | Add column in the middle | [ok] does not include the added column | [ok] include the new column, old columns are not replaced |
| Column | Reame column | [ok] column is not renamed | [ok] include the column with new name |
| Column | Change column type (e.g. int to bigint) | [ok] column type is not changed | [ok] column type is nullable `long` |
| Column | Change column default (e.g. bigint, keep nullable, add default) | [ok] no nullable or default value metadata | [ok] no nullable or default value metadata |

## Managing Offset

Offsets are stored in the `_connect_offsets` topic.
Expand Down
Binary file added documentation/assets/AllShards.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed documentation/assets/ConnectorShardMapping.png
Binary file not shown.
Binary file modified documentation/assets/HowTaskWorks.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added documentation/assets/SingleShard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed documentation/assets/VitessAllShards.png
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
"database.server.name": "connect_vitess_test_sharded_keyspace",
"database.hostname": "host.docker.internal",
"database.port": "15991",
"database.user": "vitess",
"database.password": "vitess_password",
"vitess.keyspace": "test_sharded_keyspace",
"vitess.vtctld.host": "host.docker.internal",
"vitess.vtctld.port": "15999",
"vitess.vtctld.user": "vitess",
"vitess.vtctld.password": "vitess_password",
"vitess.tablet.type": "MASTER",
"max.queue.size": "24576",
"max.batch.size": "6144",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
"database.server.name": "connect_vitess_test_unsharded_keyspace",
"database.hostname": "host.docker.internal",
"database.port": "15991",
"database.user": "vitess",
"database.password": "vitess_password",
"vitess.keyspace": "test_unsharded_keyspace",
"vitess.shard": "0",
"vitess.vtctld.host": "host.docker.internal",
"vitess.vtctld.port": "15999",
"vitess.vtctld.user": "vitess",
"vitess.vtctld.password": "vitess_password",
"vitess.tablet.type": "MASTER",
"max.queue.size": "24576",
"max.batch.size": "6144",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
.withValidation(Field::isInteger)
.withDescription("Port of the Vitess VTGate gRPC server.");

public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + "user")
.withDisplayName("User")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Name of the user to be used when connecting the Vitess VTGate gRPC server.");

public static final Field VTGATE_PASSWORD = Field.create(DATABASE_CONFIG_PREFIX + "password")
.withDisplayName("Password")
.withType(Type.PASSWORD)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Password of the user to be used when connecting the Vitess VTGate gRPC server.");

public static final Field KEYSPACE = Field.create(VITESS_CONFIG_GROUP_PREFIX + "keyspace")
.withDisplayName("Keyspace")
.withType(Type.STRING)
Expand Down Expand Up @@ -86,6 +100,20 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
.withValidation(Field::isInteger)
.withDescription("VTCtld gRPC server port. E.p. \"15999\".");

public static final Field VTCTLD_USER = Field.create(VITESS_CONFIG_GROUP_PREFIX + "vtctld.user")
.withDisplayName("VTCtld User")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Name of the user to be used when connecting the Vitess VTCtld gRPC server.");

public static final Field VTCTLD_PASSWORD = Field.create(VITESS_CONFIG_GROUP_PREFIX + "vtctld.password")
.withDisplayName("VTCtld Password")
.withType(Type.PASSWORD)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Password of the user to be used when connecting the Vitess VTCtld gRPC server.");

public static final Field TABLET_TYPE = Field.create(VITESS_CONFIG_GROUP_PREFIX + "tablet.type")
.withDisplayName("Tablet type to get data-changes")
.withType(Type.STRING)
Expand All @@ -110,7 +138,18 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
protected static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION
.edit()
.name("Vitess")
.type(KEYSPACE, SHARD, VTGATE_HOST, VTGATE_PORT, VTCTLD_HOST, VTCTLD_PORT, TABLET_TYPE)
.type(
KEYSPACE,
SHARD,
VTGATE_HOST,
VTGATE_PORT,
VTGATE_USER,
VTGATE_PASSWORD,
VTCTLD_HOST,
VTCTLD_PORT,
VTCTLD_USER,
VTCTLD_PASSWORD,
TABLET_TYPE)
.events(INCLUDE_UNKNOWN_DATATYPES)
.excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST)
.create();
Expand Down Expand Up @@ -174,6 +213,14 @@ public int getVtgatePort() {
return getConfig().getInteger(VTGATE_PORT);
}

public String getVtgateUsername() {
return getConfig().getString(VTGATE_USER);
}

public String getVtgatePassword() {
return getConfig().getString(VTGATE_PASSWORD);
}

public String getVtctldHost() {
return getConfig().getString(VTCTLD_HOST);
}
Expand All @@ -182,11 +229,20 @@ public int getVtctldPort() {
return getConfig().getInteger(VTCTLD_PORT);
}

public String getVtctldUsername() {
return getConfig().getString(VTCTLD_USER);
}

public String getVtctldPassword() {
return getConfig().getString(VTCTLD_PASSWORD);
}

public String getTabletType() {
return getConfig().getString(TABLET_TYPE);
}

public boolean includeUnknownDatatypes() {
return getConfig().getBoolean(INCLUDE_UNKNOWN_DATATYPES);
}

}
Loading

0 comments on commit 4da3e3e

Please sign in to comment.