From 7ba11cecdf7ac70c65ff5df501bbce8b499967c9 Mon Sep 17 00:00:00 2001 From: He Wang Date: Tue, 11 Jul 2023 17:34:02 +0800 Subject: [PATCH] [Feature][Connector-V2][Jdbc] Add oceanbase dialect factory (#4989) --------- Co-authored-by: silenceland Co-authored-by: changhuyan <877018069@qq.com> --- docs/en/connector-v2/sink/Jdbc.md | 6 + docs/en/connector-v2/sink/OceanBase.md | 186 +++++++++++++ docs/en/connector-v2/source/Jdbc.md | 6 + docs/en/connector-v2/source/OceanBase.md | 168 ++++++++++++ .../jdbc/config/JdbcConnectionConfig.java | 13 + .../seatunnel/jdbc/config/JdbcOptions.java | 6 + .../jdbc/config/JdbcSourceConfig.java | 2 + .../internal/dialect/JdbcDialectFactory.java | 10 + .../internal/dialect/JdbcDialectLoader.java | 5 +- .../oceanbase/OceanBaseDialectFactory.java | 49 ++++ .../seatunnel/jdbc/sink/JdbcSink.java | 5 +- .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 9 +- .../seatunnel/jdbc/source/JdbcSource.java | 4 +- .../jdbc/source/JdbcSourceFactory.java | 9 +- .../seatunnel/jdbc/JdbcOceanBaseITBase.java | 147 ++++++++++ .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 256 ++++++++++++++++++ .../seatunnel/jdbc/JdbcOceanBaseOracleIT.java | 161 +++++++++++ .../jdbc_oceanbase_mysql_source_and_sink.conf | 55 ++++ ...jdbc_oceanbase_oracle_source_and_sink.conf | 53 ++++ .../e2e/connector/pulsar/PulsarBatchIT.java | 2 + 20 files changed, 1144 insertions(+), 8 deletions(-) create mode 100644 docs/en/connector-v2/sink/OceanBase.md create mode 100644 docs/en/connector-v2/source/OceanBase.md create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index d472d9a33e5..f128f6b4b21 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. | user | String | No | - | | password | String | No | - | | query | String | No | - | +| compatible_mode | String | No | - | | database | String | No | - | | table | String | No | - | | primary_keys | Array | No | - | @@ -69,6 +70,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes Use this sql write upstream input datas to database. e.g `INSERT ...` +### compatible_mode [string] + +The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. + ### database [string] Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database. @@ -168,6 +173,7 @@ there are some reference value for params above. | Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | | Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | ## Example diff --git a/docs/en/connector-v2/sink/OceanBase.md b/docs/en/connector-v2/sink/OceanBase.md new file mode 100644 index 00000000000..ec87ce3d36d --- /dev/null +++ b/docs/en/connector-v2/sink/OceanBase.md @@ -0,0 +1,186 @@ +# OceanBase + +> JDBC OceanBase Sink Connector + +## Support Those Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## Key Features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +## Description + +Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics. + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------| +| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2883/test | [Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +### Mysql Mode + +| Mysql Data type | SeaTunnel Data type | +|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT(1)
INT UNSIGNED | BOOLEAN | +| TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR | INT | +| INT UNSIGNED
INTEGER UNSIGNED
BIGINT | BIGINT | +| BIGINT UNSIGNED | DECIMAL(20,0) | +| DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) | +| DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) | +| DECIMAL UNSIGNED | DECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT
FLOAT UNSIGNED | FLOAT | +| DOUBLE
DOUBLE UNSIGNED | DOUBLE | +| CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
JSON | STRING | +| DATE | DATE | +| TIME | TIME | +| DATETIME
TIMESTAMP | TIMESTAMP | +| TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BINARY
VARBINAR
BIT(n) | BYTES | +| GEOMETRY
UNKNOWN | Not supported yet | + +### Oracle Mode + +| Oracle Data type | SeaTunnel Data type | +|-----------------------------------------------------------|---------------------| +| Number(p), p <= 9 | INT | +| Number(p), p <= 18 | BIGINT | +| Number(p), p > 18 | DECIMAL(38,18) | +| REAL
BINARY_FLOAT | FLOAT | +| BINARY_DOUBLE | DOUBLE | +| CHAR
NCHAR
NVARCHAR2
NCLOB
CLOB
ROWID | STRING | +| DATE | DATE | +| TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | +| BLOB
RAW
LONG RAW
BFILE | BYTES | +| UNKNOWN | Not supported yet | + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | +| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to JDBC Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target table is test_table will also be 16 rows of data in the table. Before run this job, you need create database test and table test_table in your mysql. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + jdbc { + url = "jdbc:oceanbase://localhost:2883/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + query = "insert into test_table(name,age) values(?,?)" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} +``` + +### Generate Sink SQL + +> This example not need to write complex sql statements, you can configure the database name table name to automatically generate add statements for you + +``` +sink { + jdbc { + url = "jdbc:oceanbase://localhost:2883/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + # Automatically generate sql statements based on database table names + generate_sink_sql = true + database = test + table = test_table + } +} +``` + +### CDC(Change Data Capture) Event + +> CDC change data is also supported by us In this case, you need config database, table and primary_keys. + +``` +sink { + jdbc { + url = "jdbc:oceanbase://localhost:3306/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + generate_sink_sql = true + # You need to configure both database and table + database = test + table = sink_table + primary_keys = ["id","name"] + } +} +``` + diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 528114754ff..a324316e594 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -35,6 +35,7 @@ supports query SQL and can achieve projection effect. | user | String | No | - | | password | String | No | - | | query | String | Yes | - | +| compatible_mode | String | No | - | | connection_check_timeout_sec | Int | No | 30 | | partition_column | String | No | - | | partition_upper_bound | Long | No | - | @@ -63,6 +64,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes Query statement +### compatible_mode [string] + +The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. + ### connection_check_timeout_sec [int] The time in seconds to wait for the database operation used to validate the connection to complete. @@ -120,6 +125,7 @@ there are some reference value for params above. | Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | | Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | ## Example diff --git a/docs/en/connector-v2/source/OceanBase.md b/docs/en/connector-v2/source/OceanBase.md new file mode 100644 index 00000000000..9625ef4fbb9 --- /dev/null +++ b/docs/en/connector-v2/source/OceanBase.md @@ -0,0 +1,168 @@ +# OceanBase + +> JDBC OceanBase Source Connector + +## Support Those Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) + +## Description + +Read external data source data through JDBC. + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------| +| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2883/test | [Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +### Mysql Mode + +| Mysql Data type | SeaTunnel Data type | +|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT(1)
INT UNSIGNED | BOOLEAN | +| TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR | INT | +| INT UNSIGNED
INTEGER UNSIGNED
BIGINT | BIGINT | +| BIGINT UNSIGNED | DECIMAL(20,0) | +| DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) | +| DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) | +| DECIMAL UNSIGNED | DECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT
FLOAT UNSIGNED | FLOAT | +| DOUBLE
DOUBLE UNSIGNED | DOUBLE | +| CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
JSON | STRING | +| DATE | DATE | +| TIME | TIME | +| DATETIME
TIMESTAMP | TIMESTAMP | +| TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BINARY
VARBINAR
BIT(n) | BYTES | +| GEOMETRY
UNKNOWN | Not supported yet | + +### Oracle Mode + +| Oracle Data type | SeaTunnel Data type | +|-----------------------------------------------------------|---------------------| +| Number(p), p <= 9 | INT | +| Number(p), p <= 18 | BIGINT | +| Number(p), p > 18 | DECIMAL(38,18) | +| REAL
BINARY_FLOAT | FLOAT | +| BINARY_DOUBLE | DOUBLE | +| CHAR
NCHAR
NVARCHAR2
NCLOB
CLOB
ROWID | STRING | +| DATE | DATE | +| TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | +| BLOB
RAW
LONG RAW
BFILE | BYTES | +| UNKNOWN | Not supported yet | + +## Source Options + +| Name | Type | Required | Default | Description | +|------------------------------|--------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | +| query | String | Yes | - | Query statement | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete | +| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type column and string type column. | +| partition_lower_bound | Long | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. | +| partition_upper_bound | Long | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | +| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. Default value is job parallelism. | +| fetch_size | Int | No | 0 | For queries that return a large number of objects, you can configure
the row fetch size used in the query to improve performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +``` +env { + execution.parallelism = 2 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} +``` + +### Parallel: + +> Read your query table in parallel with the shard field you configured and the shard data. You can do this if you want to read the whole table + +``` +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + # Parallel sharding reads fields + partition_column = "id" + # Number of fragments + partition_num = 10 + } +} +``` + +### Parallel Boundary: + +> It is more efficient to read your data source according to the upper and lower boundaries you configured + +``` +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + partition_column = "id" + partition_num = 10 + # Read start boundary + partition_lower_bound = 1 + # Read end boundary + partition_upper_bound = 500 + } +} +``` + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java index afceddc59a0..6e2147c03c8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java @@ -27,6 +27,7 @@ public class JdbcConnectionConfig implements Serializable { public String url; public String driverName; + public String compatibleMode; public int connectionCheckTimeoutSeconds = JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue(); public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue(); @@ -48,6 +49,7 @@ public class JdbcConnectionConfig implements Serializable { public static JdbcConnectionConfig of(ReadonlyConfig config) { JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder(); builder.url(config.get(JdbcOptions.URL)); + builder.compatibleMode(config.get(JdbcOptions.COMPATIBLE_MODE)); builder.driverName(config.get(JdbcOptions.DRIVER)); builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT)); builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES)); @@ -74,6 +76,10 @@ public String getDriverName() { return driverName; } + public String getCompatibleMode() { + return compatibleMode; + } + public boolean isAutoCommit() { return autoCommit; } @@ -121,6 +127,7 @@ public static JdbcConnectionConfig.Builder builder() { public static final class Builder { private String url; private String driverName; + private String compatibleMode; private int connectionCheckTimeoutSeconds = JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue(); private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue(); @@ -146,6 +153,11 @@ public Builder driverName(String driverName) { return this; } + public Builder compatibleMode(String compatibleMode) { + this.compatibleMode = compatibleMode; + return this; + } + public Builder connectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) { this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds; return this; @@ -206,6 +218,7 @@ public JdbcConnectionConfig build() { jdbcConnectionConfig.batchSize = this.batchSize; jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs; jdbcConnectionConfig.driverName = this.driverName; + jdbcConnectionConfig.compatibleMode = this.compatibleMode; jdbcConnectionConfig.maxRetries = this.maxRetries; jdbcConnectionConfig.password = this.password; jdbcConnectionConfig.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 87b2a7b4657..24ae0580f32 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -36,6 +36,12 @@ public interface JdbcOptions { .intType() .defaultValue(30) .withDescription("connection check time second"); + Option COMPATIBLE_MODE = + Options.key("compatible_mode") + .stringType() + .noDefaultValue() + .withDescription( + "The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'."); Option MAX_RETRIES = Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java index 4c6221549bf..00130b32acc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java @@ -33,6 +33,7 @@ public class JdbcSourceConfig implements Serializable { private JdbcConnectionConfig jdbcConnectionConfig; public String query; + public String compatibleMode; private String partitionColumn; private BigDecimal partitionUpperBound; private BigDecimal partitionLowerBound; @@ -44,6 +45,7 @@ public static JdbcSourceConfig of(ReadonlyConfig config) { builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config)); builder.query(config.get(JdbcOptions.QUERY)); builder.fetchSize(config.get(JdbcOptions.FETCH_SIZE)); + config.getOptional(JdbcOptions.COMPATIBLE_MODE).ifPresent(builder::compatibleMode); config.getOptional(JdbcOptions.PARTITION_COLUMN).ifPresent(builder::partitionColumn); config.getOptional(JdbcOptions.PARTITION_UPPER_BOUND) .ifPresent(builder::partitionUpperBound); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java index 5e5ae1b5592..3d66de65909 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java @@ -37,4 +37,14 @@ public interface JdbcDialectFactory { /** @return Creates a new instance of the {@link JdbcDialect}. */ JdbcDialect create(); + + /** + * Create a {@link JdbcDialect} instance based on the driver type and compatible mode. + * + * @param compatibleMode The compatible mode + * @return a new instance of {@link JdbcDialect} + */ + default JdbcDialect create(String compatibleMode) { + return create(); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java index 076a6734b92..b49df35ff3f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java @@ -40,11 +40,12 @@ private JdbcDialectLoader() {} * Loads the unique JDBC Dialect that can handle the given database url. * * @param url A database URL. + * @param compatibleMode The compatible mode. * @throws IllegalStateException if the loader cannot find exactly one dialect that can * unambiguously process the given database URL. * @return The loaded dialect. */ - public static JdbcDialect load(String url) { + public static JdbcDialect load(String url, String compatibleMode) { ClassLoader cl = Thread.currentThread().getContextClassLoader(); List foundFactories = discoverFactories(cl); @@ -89,7 +90,7 @@ public static JdbcDialect load(String url) { .collect(Collectors.joining("\n")))); } - return matchingFactories.get(0).create(); + return matchingFactories.get(0).create(compatibleMode); } private static List discoverFactories(ClassLoader classLoader) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java new file mode 100644 index 00000000000..66df84205ed --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect; + +import com.google.auto.service.AutoService; + +import javax.annotation.Nonnull; + +@AutoService(JdbcDialectFactory.class) +public class OceanBaseDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:oceanbase:"); + } + + @Override + public JdbcDialect create() { + throw new UnsupportedOperationException( + "Can't create JdbcDialect without compatible mode for OceanBase"); + } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode) { + if ("oracle".equalsIgnoreCase(compatibleMode)) { + return new OracleDialect(); + } + return new MysqlDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 4221172b1cc..4666eae1e51 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -104,7 +104,10 @@ public String getPluginName() { public void prepare(Config pluginConfig) throws PrepareFailException { this.config = ReadonlyConfig.fromConfig(pluginConfig); this.jdbcSinkConfig = JdbcSinkConfig.of(config); - this.dialect = JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl()); + this.dialect = + JdbcDialectLoader.load( + jdbcSinkConfig.getJdbcConnectionConfig().getUrl(), + jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode()); this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index ae2e49b1eac..a9bb1c15554 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -41,6 +41,7 @@ import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER; @@ -82,7 +83,10 @@ public TableSink createSink(TableFactoryContext context) { } final ReadonlyConfig options = config; JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config); - JdbcDialect dialect = JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl()); + JdbcDialect dialect = + JdbcDialectLoader.load( + sinkConfig.getJdbcConnectionConfig().getUrl(), + sinkConfig.getJdbcConnectionConfig().getCompatibleMode()); return () -> new JdbcSink( options, @@ -106,7 +110,8 @@ public OptionRule optionRule() { GENERATE_SINK_SQL, AUTO_COMMIT, SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, - PRIMARY_KEYS) + PRIMARY_KEYS, + COMPATIBLE_MODE) .conditional( IS_EXACTLY_ONCE, true, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 39deac1efec..732892b21d6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -99,7 +99,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig()); this.query = jdbcSourceConfig.getQuery(); this.jdbcDialect = - JdbcDialectLoader.load(jdbcSourceConfig.getJdbcConnectionConfig().getUrl()); + JdbcDialectLoader.load( + jdbcSourceConfig.getJdbcConnectionConfig().getUrl(), + jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode()); try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { this.typeInfo = initTableField(connection); this.partitionParameter = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java index 8f9605182ec..43aa1c03d63 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java @@ -54,6 +54,7 @@ import java.util.Map; import java.util.Optional; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.FETCH_SIZE; @@ -83,7 +84,10 @@ TableSource createSource(TableFactoryContext context) { JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig()); final String querySql = config.getQuery(); - JdbcDialect dialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl()); + JdbcDialect dialect = + JdbcDialectLoader.load( + config.getJdbcConnectionConfig().getUrl(), + config.getJdbcConnectionConfig().getCompatibleMode()); TableSchema tableSchema = catalogTable.getTableSchema(); SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); Optional partitionParameter = @@ -228,7 +232,8 @@ public OptionRule optionRule() { PARTITION_COLUMN, PARTITION_UPPER_BOUND, PARTITION_LOWER_BOUND, - PARTITION_NUM) + PARTITION_NUM, + COMPATIBLE_MODE) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java new file mode 100644 index 00000000000..b8202e697a1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Assertions; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT { + + private static final String OCEANBASE_DATABASE = "seatunnel"; + private static final String OCEANBASE_SOURCE = "source"; + private static final String OCEANBASE_SINK = "sink"; + + private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" + HOST + ":%s"; + private static final String OCEANBASE_DRIVER_CLASS = "com.oceanbase.jdbc.Driver"; + + abstract String imageName(); + + abstract String host(); + + abstract int port(); + + abstract String username(); + + abstract String password(); + + abstract List configFile(); + + abstract String createSqlTemplate(); + + abstract String[] getFieldNames(); + + @Override + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port()); + Pair> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, fieldNames); + + return JdbcCase.builder() + .dockerImage(imageName()) + .networkAliases(host()) + .containerEnv(containerEnv) + .driverClass(OCEANBASE_DRIVER_CLASS) + .host(HOST) + .port(port()) + .localPort(port()) + .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE) + .jdbcUrl(jdbcUrl) + .userName(username()) + .password(password()) + .database(OCEANBASE_DATABASE) + .sourceTable(OCEANBASE_SOURCE) + .sinkTable(OCEANBASE_SINK) + .createSql(createSqlTemplate()) + .configFile(configFile()) + .insertSql(insertSql) + .testData(testDataSet) + .build(); + } + + @Override + void compareResult() { + String sourceSql = + String.format( + "select * from %s.%s order by 1", OCEANBASE_DATABASE, OCEANBASE_SOURCE); + String sinkSql = + String.format("select * from %s.%s order by 1", OCEANBASE_DATABASE, OCEANBASE_SINK); + try { + Statement sourceStatement = connection.createStatement(); + Statement sinkStatement = connection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + Assertions.assertEquals( + sourceResultSet.getMetaData().getColumnCount(), + sinkResultSet.getMetaData().getColumnCount()); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : getFieldNames()) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = + IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = + IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + } + } + } + sourceResultSet.last(); + sinkResultSet.last(); + } catch (Exception e) { + throw new RuntimeException("Compare result error", e); + } + } + + @Override + String driverUrl() { + return "https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar"; + } + + @Override + protected void createSchemaIfNeeded() { + String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE; + try { + connection.prepareStatement(sql).executeUpdate(); + } catch (Exception e) { + throw new SeaTunnelRuntimeException( + JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql " + sql, e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java new file mode 100644 index 00000000000..548fecaee66 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Disabled; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +@Disabled("Disabled due to insufficient hardware resources in the CI environment") +public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase { + + @Override + String imageName() { + return "oceanbase/oceanbase-ce:4.0.0.0"; + } + + @Override + String host() { + return "e2e_oceanbase_mysql"; + } + + @Override + int port() { + return 2881; + } + + @Override + String username() { + return "root"; + } + + @Override + String password() { + return ""; + } + + @Override + List configFile() { + return Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf"); + } + + @Override + String createSqlTemplate() { + return "CREATE TABLE IF NOT EXISTS %s\n" + + "(\n" + + " `c_bit_1` bit(1) DEFAULT NULL,\n" + + " `c_bit_8` bit(8) DEFAULT NULL,\n" + + " `c_bit_16` bit(16) DEFAULT NULL,\n" + + " `c_bit_32` bit(32) DEFAULT NULL,\n" + + " `c_bit_64` bit(64) DEFAULT NULL,\n" + + " `c_boolean` tinyint(1) DEFAULT NULL,\n" + + " `c_tinyint` tinyint(4) DEFAULT NULL,\n" + + " `c_tinyint_unsigned` tinyint(3) unsigned DEFAULT NULL,\n" + + " `c_smallint` smallint(6) DEFAULT NULL,\n" + + " `c_smallint_unsigned` smallint(5) unsigned DEFAULT NULL,\n" + + " `c_mediumint` mediumint(9) DEFAULT NULL,\n" + + " `c_mediumint_unsigned` mediumint(8) unsigned DEFAULT NULL,\n" + + " `c_int` int(11) DEFAULT NULL,\n" + + " `c_integer` int(11) DEFAULT NULL,\n" + + " `c_bigint` bigint(20) DEFAULT NULL,\n" + + " `c_bigint_unsigned` bigint(20) unsigned DEFAULT NULL,\n" + + " `c_decimal` decimal(20, 0) DEFAULT NULL,\n" + + " `c_decimal_unsigned` decimal(38, 18) DEFAULT NULL,\n" + + " `c_float` float DEFAULT NULL,\n" + + " `c_float_unsigned` float unsigned DEFAULT NULL,\n" + + " `c_double` double DEFAULT NULL,\n" + + " `c_double_unsigned` double unsigned DEFAULT NULL,\n" + + " `c_char` char(1) DEFAULT NULL,\n" + + " `c_tinytext` tinytext,\n" + + " `c_mediumtext` mediumtext,\n" + + " `c_text` text,\n" + + " `c_varchar` varchar(255) DEFAULT NULL,\n" + + " `c_json` json DEFAULT NULL,\n" + + " `c_longtext` longtext,\n" + + " `c_date` date DEFAULT NULL,\n" + + " `c_datetime` datetime DEFAULT NULL,\n" + + " `c_timestamp` timestamp NULL DEFAULT NULL,\n" + + " `c_tinyblob` tinyblob,\n" + + " `c_mediumblob` mediumblob,\n" + + " `c_blob` blob,\n" + + " `c_longblob` longblob,\n" + + " `c_varbinary` varbinary(255) DEFAULT NULL,\n" + + " `c_binary` binary(1) DEFAULT NULL,\n" + + " `c_year` year(4) DEFAULT NULL,\n" + + " `c_int_unsigned` int(10) unsigned DEFAULT NULL,\n" + + " `c_integer_unsigned` int(10) unsigned DEFAULT NULL,\n" + + " `c_bigint_30` BIGINT(40) unsigned DEFAULT NULL,\n" + + " `c_decimal_unsigned_30` DECIMAL(30) unsigned DEFAULT NULL,\n" + + " `c_decimal_30` DECIMAL(30) DEFAULT NULL\n" + + ");"; + } + + @Override + String[] getFieldNames() { + return new String[] { + "c_bit_1", + "c_bit_8", + "c_bit_16", + "c_bit_32", + "c_bit_64", + "c_boolean", + "c_tinyint", + "c_tinyint_unsigned", + "c_smallint", + "c_smallint_unsigned", + "c_mediumint", + "c_mediumint_unsigned", + "c_int", + "c_integer", + "c_year", + "c_int_unsigned", + "c_integer_unsigned", + "c_bigint", + "c_bigint_unsigned", + "c_decimal", + "c_decimal_unsigned", + "c_float", + "c_float_unsigned", + "c_double", + "c_double_unsigned", + "c_char", + "c_tinytext", + "c_mediumtext", + "c_text", + "c_varchar", + "c_json", + "c_longtext", + "c_date", + "c_datetime", + "c_timestamp", + "c_tinyblob", + "c_mediumblob", + "c_blob", + "c_longblob", + "c_varbinary", + "c_binary", + "c_bigint_30", + "c_decimal_unsigned_30", + "c_decimal_30", + }; + } + + @Override + Pair> initTestData() { + String[] fieldNames = getFieldNames(); + + List rows = new ArrayList<>(); + BigDecimal bigintValue = new BigDecimal("2844674407371055000"); + BigDecimal decimalValue = new BigDecimal("999999999999999999999999999899"); + for (int i = 0; i < 100; i++) { + byte byteArr = Integer.valueOf(i).byteValue(); + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + i % 2 == 0 ? (byte) 1 : (byte) 0, + new byte[] {byteArr}, + new byte[] {byteArr, byteArr}, + new byte[] {byteArr, byteArr, byteArr, byteArr}, + new byte[] { + byteArr, byteArr, byteArr, byteArr, byteArr, byteArr, byteArr, + byteArr + }, + i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE, + i, + i, + i, + i, + i, + i, + i, + i, + i, + Long.parseLong("1"), + Long.parseLong("1"), + Long.parseLong("1"), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 18), + BigDecimal.valueOf(i, 18), + Float.parseFloat("1.1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + Double.parseDouble("1.1"), + "f", + String.format("f1_%s", i), + String.format("f1_%s", i), + String.format("f1_%s", i), + String.format("f1_%s", i), + String.format("{\"aa\":\"bb_%s\"}", i), + String.format("f1_%s", i), + Date.valueOf(LocalDate.now()), + Timestamp.valueOf(LocalDateTime.now()), + new Timestamp(System.currentTimeMillis()), + "test".getBytes(), + "test".getBytes(), + "test".getBytes(), + "test".getBytes(), + "test".getBytes(), + "f".getBytes(), + bigintValue.add(BigDecimal.valueOf(i)), + decimalValue.add(BigDecimal.valueOf(i)), + decimalValue.add(BigDecimal.valueOf(i)), + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } + + @Override + GenericContainer initContainer() { + GenericContainer container = + new GenericContainer<>(imageName()) + .withNetwork(NETWORK) + .withNetworkAliases(host()) + .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) + .withStartupTimeout(Duration.ofMinutes(5)) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName()))); + + container.setPortBindings(Lists.newArrayList(String.format("%s:%s", port(), port()))); + + return container; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java new file mode 100644 index 00000000000..4c3cca5ddc1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Disabled; +import org.testcontainers.containers.GenericContainer; + +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.given; + +@Disabled("Oracle mode of OceanBase Enterprise Edition does not provide docker environment") +public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase { + + @Override + String imageName() { + return null; + } + + @Override + String host() { + return "e2e_oceanbase_oracle"; + } + + @Override + int port() { + return 2883; + } + + @Override + String username() { + return "root"; + } + + @Override + String password() { + return ""; + } + + @Override + List configFile() { + return Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf"); + } + + @Override + GenericContainer initContainer() { + throw new UnsupportedOperationException(); + } + + @Override + public void startUp() { + jdbcCase = getJdbcCase(); + + given().ignoreExceptions() + .await() + .atMost(360, TimeUnit.SECONDS) + .untilAsserted(() -> this.initializeJdbcConnection(jdbcCase.getJdbcUrl())); + + createSchemaIfNeeded(); + createNeededTables(); + insertTestData(); + } + + @Override + public String quoteIdentifier(String field) { + return "\"" + field + "\""; + } + + @Override + String createSqlTemplate() { + return "create table %s\n" + + "(\n" + + " VARCHAR_10_COL varchar2(10),\n" + + " CHAR_10_COL char(10),\n" + + " CLOB_COL clob,\n" + + " NUMBER_3_SF_2_DP number(3, 2),\n" + + " INTEGER_COL integer,\n" + + " FLOAT_COL float(10),\n" + + " REAL_COL real,\n" + + " BINARY_FLOAT_COL binary_float,\n" + + " BINARY_DOUBLE_COL binary_double,\n" + + " DATE_COL date,\n" + + " TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3),\n" + + " TIMESTAMP_WITH_LOCAL_TZ timestamp with local time zone\n" + + ")"; + } + + @Override + String[] getFieldNames() { + return new String[] { + "VARCHAR_10_COL", + "CHAR_10_COL", + "CLOB_COL", + "NUMBER_3_SF_2_DP", + "INTEGER_COL", + "FLOAT_COL", + "REAL_COL", + "BINARY_FLOAT_COL", + "BINARY_DOUBLE_COL", + "DATE_COL", + "TIMESTAMP_WITH_3_FRAC_SEC_COL", + "TIMESTAMP_WITH_LOCAL_TZ" + }; + } + + @Override + Pair> initTestData() { + String[] fieldNames = getFieldNames(); + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + String.format("f%s", i), + String.format("f%s", i), + String.format("f%s", i), + BigDecimal.valueOf(1.1), + i, + Float.parseFloat("2.2"), + Float.parseFloat("2.2"), + Float.parseFloat("22.2"), + Double.parseDouble("2.2"), + Date.valueOf(LocalDate.now()), + Timestamp.valueOf(LocalDateTime.now()), + Timestamp.valueOf(LocalDateTime.now()) + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf new file mode 100644 index 00000000000..098d3ffae26 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.oceanbase.jdbc.Driver + url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" + user = root + password = "" + query = "SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned, c_bigint_30, c_decimal_unsigned_30, c_decimal_30 FROM source" + compatible_mode = "mysql" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +sink { + Jdbc { + driver = com.oceanbase.jdbc.Driver + url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" + user = root + password = "" + query = "insert into sink(c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);" + compatible_mode = "mysql" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf new file mode 100644 index 00000000000..bf2b1ccf067 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + jdbc{ + # This is a example source plugin **only for test and demonstrate the feature source plugin** + url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel" + driver = com.oceanbase.jdbc.Driver + user = "root" + password = "" + query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ FROM source" + compatible_mode = "oracle" + } +} + +transform { +} + +sink { + jdbc{ + url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel" + driver = com.oceanbase.jdbc.Driver + user = "root" + password = "" + query = "INSERT INTO sink (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)" + compatible_mode = "oracle" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java index b1ea69efac6..092f37f9bc4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java @@ -56,6 +56,7 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -113,6 +114,7 @@ public void startUp() throws Exception { new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME)) .withNetwork(NETWORK) .withNetworkAliases(PULSAR_HOST) + .withStartupTimeout(Duration.ofMinutes(3)) .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));