English | 简体中文
This is the Flink connector for OceanBase, which can be used to sink data to OceanBase via JDBC driver.
You can get the release packages at Releases Page or Maven Central.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase</artifactId>
<version>${project.version}</version>
</dependency>
If you'd rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
You can also manually build it from the source code.
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-${project.version}.jar
:
- Release versions: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase
- Snapshot versions: https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase
This project has built-in MySQL driver 8.0.28. For users of OceanBase EE who want to use OceanBase JDBC driver, they need to manually introduce the following dependencies:
Dependency Item | Description |
---|---|
com.oceanbase:oceanbase-client:2.4.9 | Used for connecting to OceanBase EE. |
Create the destination table 't_sink' under the 'test' database of the OceanBase MySQL mode.
USE test;
CREATE TABLE `t_sink` (
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client.
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:mysql://127.0.0.1:2881/test',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3');
Insert records by Flink SQL.
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
Once executed, the records should have been written to OceanBase.
For users of OceanBase EE, you need to specify the url
and driver-class-name
corresponding to the OceanBase JDBC driver.
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
Option | Required by Table API | Required by DataStream | Default | Type | Description |
---|---|---|---|---|---|
url | Yes | Yes | String | JDBC url. | |
username | Yes | Yes | String | The username. | |
password | Yes | Yes | String | The password. | |
schema-name | Yes | Not supported | String | The schema name or database name. | |
table-name | Yes | Not supported | String | The table name. | |
driver-class-name | No | No | com.mysql.cj.jdbc.Driver | String | The driver class name, use 'com.mysql.cj.jdbc.Driver' by default. If other value is set, you need to introduce the driver manually. |
druid-properties | No | No | String | Druid connection pool properties, multiple values are separated by semicolons. | |
sync-write | No | No | false | Boolean | Whether to write data synchronously, will not use buffer if it's set to 'true'. |
buffer-flush.interval | No | No | 1s | Duration | Buffer flush interval. Set '0' to disable scheduled flushing. |
buffer-flush.buffer-size | No | No | 1000 | Integer | Buffer size. |
max-retries | No | No | 3 | Integer | Max retry times on failure. |
memstore-check.enabled | No | No | true | Boolean | Whether enable memstore check. |
memstore-check.threshold | No | No | 0.9 | Double | Memstore usage threshold ratio relative to the limit value. |
memstore-check.interval | No | No | 30s | Duration | Memstore check interval. |
partition.enabled | No | No | false | Boolean | Whether to enable partition calculation and flush records by partitions. Only works when 'sync-write' and 'direct-load.enabled' are 'false'. |
table.oracle-tenant-case-insensitive | No | No | true | Boolean | By default, under the Oracle tenant, schema names and column names are case-insensitive. |
https://issues.apache.org/jira/browse/FLINK-25569