diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md index 002bd0c3bec..e0751a24927 100644 --- a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md +++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md @@ -19,7 +19,6 @@ source { MySQL-CDC { result_table_name = "table1" - hostname = localhost base-url="jdbc:mysql://localhost:3306/test" "startup.mode"=INITIAL catalog { diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index caeeca06283..dbf0c1dbc96 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -2,10 +2,9 @@ > MySQL CDC source connector -## Description +## Support Those Engines -The MySQL CDC connector allows for reading snapshot data and incremental data from MySQL database. This document -describes how to set up the MySQL CDC connector to run SQL queries against MySQL databases. +> SeaTunnel Zeta
## Key features @@ -16,207 +15,206 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL - [x] [parallelism](../../concept/connector-v2-features.md) - [x] [support user-defined split](../../concept/connector-v2-features.md) -## Options - -| name | type | required | default value | -|------------------------------------------------|----------|----------|---------------| -| username | String | Yes | - | -| password | String | Yes | - | -| database-names | List | No | - | -| table-names | List | Yes | - | -| base-url | String | Yes | - | -| startup.mode | Enum | No | INITIAL | -| startup.timestamp | Long | No | - | -| startup.specific-offset.file | String | No | - | -| startup.specific-offset.pos | Long | No | - | -| stop.mode | Enum | No | NEVER | -| stop.timestamp | Long | No | - | -| stop.specific-offset.file | String | No | - | -| stop.specific-offset.pos | Long | No | - | -| incremental.parallelism | Integer | No | 1 | -| snapshot.split.size | Integer | No | 8096 | -| snapshot.fetch.size | Integer | No | 1024 | -| server-id | String | No | - | -| server-time-zone | String | No | UTC | -| connect.timeout.ms | Duration | No | 30000 | -| connect.max-retries | Integer | No | 3 | -| connection.pool.size | Integer | No | 20 | -| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | -| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | -| sample-sharding.threshold | int | No | 1000 | -| inverse-sampling.rate | int | No | 1000 | -| exactly_once | Boolean | No | true | -| debezium.* | config | No | - | -| format | Enum | No | DEFAULT | -| common-options | | no | - | - -### username [String] - -Name of the database to use when connecting to the database server. - -### password [String] - -Password to use when connecting to the database server. - -### database-names [List] - -Database name of the database to monitor. - -### table-names [List] - -Table name of the database to monitor. The table name needs to include the database name, for example: database_name.table_name - -### base-url [String] - -URL has to be with database, like "jdbc:mysql://localhost:5432/db" or "jdbc:mysql://localhost:5432/db?useSSL=true". - -### startup.mode [Enum] - -Optional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific". - -### startup.timestamp [Long] - -Start from the specified epoch timestamp (in milliseconds). - -**Note, This option is required when the "startup.mode" option used `'timestamp'`.** - -### startup.specific-offset.file [String] - -Start from the specified binlog file name. - -**Note, This option is required when the "startup.mode" option used `'specific'`.** - -### startup.specific-offset.pos [Long] - -Start from the specified binlog file position. - -**Note, This option is required when the "startup.mode" option used `'specific'`.** - -### stop.mode [Enum] - -Optional stop mode for MySQL CDC consumer, valid enumerations are "never". - -### stop.timestamp [Long] - -Stop from the specified epoch timestamp (in milliseconds). - -**Note, This option is required when the "stop.mode" option used `'timestamp'`.** - -### stop.specific-offset.file [String] - -Stop from the specified binlog file name. - -**Note, This option is required when the "stop.mode" option used `'specific'`.** - -### stop.specific-offset.pos [Long] - -Stop from the specified binlog file position. - -**Note, This option is required when the "stop.mode" option used `'specific'`.** - -### incremental.parallelism [Integer] - -The number of parallel readers in the incremental phase. - -### snapshot.split.size [Integer] - -The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot -of table. - -### snapshot.fetch.size [Integer] - -The maximum fetch size for per poll when read table snapshot. - -### chunk-key.even-distribution.factor.upper-bound [Double] - -The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. - -### chunk-key.even-distribution.factor.lower-bound [Double] +## Description -The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. +The MySQL CDC connector allows for reading snapshot data and incremental data from MySQL database. This document +describes how to set up the MySQL CDC connector to run SQL queries against MySQL databases. -### sample-sharding.threshold [Integer] +## Supported DataSource Info -This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------| +| MySQL |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 | -### inverse-sampling.rate [Integer] +## Database Dependency -The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. +### Install Jdbc Driver -### server-id [String] +Please download and put mysql driver in `${SEATUNNEL_HOME}/lib/` dir. For example: cp mysql-connector-java-xxx.jar `$SEATNUNNEL_HOME/lib/` -A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range -syntax is like '5400-5408'. +### Creating MySQL user -Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the -MySQL cluster as another server (with this unique ID) so it can read the binlog. +You have to define a MySQL user with appropriate permissions on all databases that the Debezium MySQL connector monitors. -By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. +1. Create the MySQL user: -### server-time-zone [String] +```sql +mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password'; +``` -The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. +2. Grant the required permissions to the user: -### connect.timeout.ms [long] +```sql +mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; +``` -The maximum time that the connector should wait after trying to connect to the database server before timing out. +3. Finalize the user’s permissions: -### connect.max-retries [Integer] +```sql +mysql> FLUSH PRIVILEGES; +``` -The max retry times that the connector should retry to build database server connection. +### Enabling the MySQL binlog -### connection.pool.size [Integer] +You must enable binary logging for MySQL replication. The binary logs record transaction updates for replication tools to propagate changes. -The connection pool size. +1. Check whether the `log-bin` option is already on: -### exactly_once [Boolean] +```sql +mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency'); ++--------------------------+----------------+ +| Variable_name | Value | ++--------------------------+----------------+ +| binlog_format | ROW | +| binlog_row_image | FULL | +| enforce_gtid_consistency | ON | +| gtid_mode | ON | +| log_bin | ON | ++--------------------------+----------------+ +5 rows in set (0.00 sec) +``` -Enable exactly once semantic. +2. If inconsistent with the above results, configure your MySQL server configuration file(`$MYSQL_HOME/mysql.cnf`) with the following properties, which are described in the table below: -### debezium [Config] +``` +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 10 +binlog_format = row +binlog_row_image = FULL + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on +``` -Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. +3. Restart MySQL Server -See more about -the [Debezium's MySQL Connector properties](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties) +```shell +/etc/inint.d/mysqld restart +``` -### format [Enum] +4. Confirm your changes by checking the binlog status once more: + +```sql +mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency'); ++--------------------------+----------------+ +| Variable_name | Value | ++--------------------------+----------------+ +| binlog_format | ROW | +| binlog_row_image | FULL | +| enforce_gtid_consistency | ON | +| gtid_mode | ON | +| log_bin | ON | ++--------------------------+----------------+ +5 rows in set (0.00 sec) +``` -Optional output format for MySQL CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON". +### Notes + +#### Setting up MySQL session timeouts + +When an initial consistent snapshot is made for large databases, your established connection could timeout while the tables are being read. You can prevent this behavior by configuring interactive_timeout and wait_timeout in your MySQL configuration file. +- `interactive_timeout`: The number of seconds the server waits for activity on an interactive connection before closing it. See [MySQL’s documentation](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_interactive_timeout) for more details. +- `wait_timeout`: The number of seconds the server waits for activity on a non-interactive connection before closing it. See [MySQL’s documentation](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_wait_timeout) for more details. + +*For more database settings see [Debezium MySQL Connector](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#setting-up-mysql)* + +## Data Type Mapping + +### todo 要重构类型映射要值转换 + +| Mysql Data type | SeaTunnel Data type | +|------------------------------------------------------------------------------------------|---------------------| +| BIT(1) | BOOLEAN | +| TINYINT | TINYINT | +| TINYINT UNSIGNED
    SMALLINT | SMALLINT | +| SMALLINT UNSIGNED
    MEDIUMINT
    MEDIUMINT UNSIGNED
    INT
    INTEGER
    YEAR | INT | +| INT UNSIGNED
    INTEGER UNSIGNED
    BIGINT | BIGINT | +| BIGINT UNSIGNED | DECIMAL(20,0) | +| DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED | DECIMAL(p,s) | +| FLOAT
    FLOAT UNSIGNED | FLOAT | +| DOUBLE
    DOUBLE UNSIGNED
    REAL
    REAL UNSIGNED | DOUBLE | +| CHAR
    VARCHAR
    TINYTEXT
    MEDIUMTEXT
    TEXT
    LONGTEXT
    ENUM
    JSON | STRING | +| DATE | DATE | +| TIME | TIME | +| DATETIME
    TIMESTAMP | TIMESTAMP | +| BINARY
    VARBINAR
    BIT(p)
    TINYBLOB
    MEDIUMBLOB
    BLOB
    LONGBLOB | BYTES | + +## Source Options + +### todo 要确保配置完整含义正确 + +| Name | Type | Required | Default | Description | +|------------------------------------------------|----------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| base-url | String | Yes | - | The URL of the JDBC connection. Refer to a case: `jdbc:mysql://localhost:3306:3306/test`. | +| username | String | Yes | - | Name of the database to use when connecting to the database server. | +| password | String | Yes | - | Password to use when connecting to the database server. | +| database-names | List | No | - | Database name of the database to monitor. | +| table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` | +| startup.mode | Enum | No | INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`.
    `initial`: Synchronize historical data at startup, and then synchronize incremental data.
    `earliest`: Startup from the earliest offset possible.
    `latest`: Startup from the latest offset.
    `specific`: Startup from user-supplied specific offsets. | +| startup.specific-offset.file | String | No | - | Start from the specified binlog file name. **Note, This option is required when the `startup.mode` option used `specific`.** | +| startup.specific-offset.pos | Long | No | - | Start from the specified binlog file position. **Note, This option is required when the `startup.mode` option used `specific`.** | +| stop.mode | Enum | No | NEVER | Optional stop mode for MySQL CDC consumer, valid enumerations are `never`, `latest` or `specific`.
    `never`: Real-time job don't stop the source.
    `latest`: Stop from the latest offset.
    `specific`: Stop from user-supplied specific offset. | +| stop.specific-offset.file | String | No | - | Stop from the specified binlog file name. **Note, This option is required when the `stop.mode` option used `specific`.** | +| stop.specific-offset.pos | Long | No | - | Stop from the specified binlog file position. **Note, This option is required when the `stop.mode` option used `specific`.** | +| snapshot.split.size | Integer | No | 8096 | The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table. | +| snapshot.fetch.size | Integer | No | 1024 | The maximum fetch size for per poll when read table snapshot. | +| server-id | String | No | - | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like `5400`, the numeric ID range syntax is like '5400-5408'.
    Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the
    MySQL cluster as another server (with this unique ID) so it can read the binlog.
    By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. | +| server-time-zone | String | No | UTC | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. | +| connect.timeout.ms | Duration | No | 30000 | The maximum time that the connector should wait after trying to connect to the database server before timing out. | +| connect.max-retries | Integer | No | 3 | The max retry times that the connector should retry to build database server connection. | +| connection.pool.size | Integer | No | 20 | The jdbc connection pool size. | +| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. | +| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. | +| sample-sharding.threshold | Integer | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. | +| inverse-sampling.rate | Integer | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. | +| exactly_once | Boolean | No | true | Enable exactly once semantic. | +| format | Enum | No | DEFAULT | Optional output format for MySQL CDC, valid enumerations are `DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`. | +| debezium | Config | No | - | Pass-through [Debezium's properties](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties) to Debezium Embedded Engine which is used to capture data changes from MySQL server. | +| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +## Task Example + +### Simple + +> Support multi-table reading -#### example +``` +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 10000 +} -```conf source { MySQL-CDC { - debezium { - snapshot.mode = "never" - decimal.handling.mode = "double" + catalog = { + factory = MySQL } + base-url = "jdbc:mysql://localhost:3306/testdb" + username = "root" + password = "root@123" + table-names = ["testdb.table1", "testdb.table2"] + + startup.mode = "initial" } } -``` - -### common options - -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. - -## Example -```Jdbc { -source { - MySQL-CDC { - result_table_name = "fake" - parallelism = 1 - server-id = 5656 - username = "mysqluser" - password = "mysqlpw" - table-names = ["inventory_vwyw0n.products"] - base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n" +sink { + Console { } } ``` +### Support debezium-compatible format send to kafka + +> Must be used with kafka connector sink, see [compatible debezium format](../formats/cdc-compatible-debezium-json.md) for details + ## Changelog - Add MySQL CDC Source Connector diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index a84eb79be3e..6429fa4b529 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -71,6 +71,10 @@ public OptionRule optionRule() { JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD, JdbcSourceOptions.INVERSE_SAMPLING_RATE) .optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE) + .conditional( + MySqlSourceOptions.STARTUP_MODE, + StartupMode.INITIAL, + SourceOptions.EXACTLY_ONCE) .conditional( MySqlSourceOptions.STARTUP_MODE, StartupMode.SPECIFIC, @@ -81,18 +85,6 @@ public OptionRule optionRule() { StopMode.SPECIFIC, SourceOptions.STOP_SPECIFIC_OFFSET_FILE, SourceOptions.STOP_SPECIFIC_OFFSET_POS) - .conditional( - MySqlSourceOptions.STARTUP_MODE, - StartupMode.TIMESTAMP, - SourceOptions.STARTUP_TIMESTAMP) - .conditional( - MySqlSourceOptions.STOP_MODE, - StopMode.TIMESTAMP, - SourceOptions.STOP_TIMESTAMP) - .conditional( - MySqlSourceOptions.STARTUP_MODE, - StartupMode.INITIAL, - SourceOptions.EXACTLY_ONCE) .build(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java index 43f3f4c70cc..bc59fd0f5c1 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java @@ -34,18 +34,22 @@ public class MySqlSourceOptions { Arrays.asList( StartupMode.INITIAL, StartupMode.EARLIEST, - StartupMode.LATEST)) + StartupMode.LATEST, + StartupMode.SPECIFIC)) .defaultValue(StartupMode.INITIAL) .withDescription( "Optional startup mode for CDC source, valid enumerations are " - + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\""); + + "\"initial\", \"earliest\", \"latest\" or \"specific\""); public static final SingleChoiceOption STOP_MODE = (SingleChoiceOption) Options.key(SourceOptions.STOP_MODE_KEY) - .singleChoice(StopMode.class, Arrays.asList(StopMode.NEVER)) + .singleChoice( + StopMode.class, + Arrays.asList( + StopMode.LATEST, StopMode.SPECIFIC, StopMode.NEVER)) .defaultValue(StopMode.NEVER) .withDescription( "Optional stop mode for CDC source, valid enumerations are " - + "\"never\", \"latest\", \"timestamp\"\n or \"specific\""); + + "\"never\", \"latest\" or \"specific\""); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java index 267476b3ffe..053e96b86ba 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java @@ -50,10 +50,14 @@ public class MySqlTypeUtils { private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; private static final String MYSQL_DECIMAL = "DECIMAL"; private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + private static final String MYSQL_NUMERIC = "NUMERIC"; + private static final String MYSQL_NUMERIC_UNSIGNED = "NUMERIC UNSIGNED"; private static final String MYSQL_FLOAT = "FLOAT"; private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; private static final String MYSQL_DOUBLE = "DOUBLE"; private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + private static final String MYSQL_REAL = "REAL"; + private static final String MYSQL_REAL_UNSIGNED = "REAL UNSIGNED"; // -------------------------string---------------------------- private static final String MYSQL_CHAR = "CHAR"; @@ -63,6 +67,7 @@ public class MySqlTypeUtils { private static final String MYSQL_TEXT = "TEXT"; private static final String MYSQL_LONGTEXT = "LONGTEXT"; private static final String MYSQL_JSON = "JSON"; + private static final String MYSQL_ENUM = "ENUM"; // ------------------------------time------------------------- private static final String MYSQL_DATE = "DATE"; @@ -87,9 +92,10 @@ public static SeaTunnelDataType convertFromColumn(Column column) { case MYSQL_BIT: return BasicType.BOOLEAN_TYPE; case MYSQL_TINYINT: - return column.length() == 1 ? BasicType.BOOLEAN_TYPE : BasicType.INT_TYPE; + return BasicType.BYTE_TYPE; case MYSQL_TINYINT_UNSIGNED: case MYSQL_SMALLINT: + return BasicType.SHORT_TYPE; case MYSQL_SMALLINT_UNSIGNED: case MYSQL_MEDIUMINT: case MYSQL_MEDIUMINT_UNSIGNED: @@ -104,6 +110,9 @@ public static SeaTunnelDataType convertFromColumn(Column column) { case MYSQL_BIGINT_UNSIGNED: return new DecimalType(20, 0); case MYSQL_DECIMAL: + case MYSQL_DECIMAL_UNSIGNED: + case MYSQL_NUMERIC: + case MYSQL_NUMERIC_UNSIGNED: return new DecimalType(column.length(), column.scale().orElse(0)); case MYSQL_FLOAT: return BasicType.FLOAT_TYPE; @@ -111,8 +120,10 @@ public static SeaTunnelDataType convertFromColumn(Column column) { log.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED); return BasicType.FLOAT_TYPE; case MYSQL_DOUBLE: + case MYSQL_REAL: return BasicType.DOUBLE_TYPE; case MYSQL_DOUBLE_UNSIGNED: + case MYSQL_REAL_UNSIGNED: log.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED); return BasicType.DOUBLE_TYPE; case MYSQL_CHAR: @@ -121,6 +132,7 @@ public static SeaTunnelDataType convertFromColumn(Column column) { case MYSQL_TEXT: case MYSQL_VARCHAR: case MYSQL_JSON: + case MYSQL_ENUM: return BasicType.STRING_TYPE; case MYSQL_LONGTEXT: log.warn(