Skip to content

Commit

Permalink
[Feature][Doc] Add RocketMq connector (apache#5361)
Browse files Browse the repository at this point in the history
* [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector

* [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector

* [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector

* fix

* add case
  • Loading branch information
zhilinli123 authored and gnehil committed Oct 12, 2023
1 parent 81aa7b6 commit c8f7b07
Show file tree
Hide file tree
Showing 2 changed files with 333 additions and 135 deletions.
197 changes: 159 additions & 38 deletions docs/en/connector-v2/sink/RocketMQ.md
Original file line number Diff line number Diff line change
@@ -1,44 +1,42 @@
# RocketMQ

> RocketMQ sink connector
>
## Description
Write Rows to a Apache RocketMQ topic.
## Support Apache RocketMQ Version

## Key features
- 4.9.0 (Or a newer version, for reference)

- [x] [exactly-once](../../concept/connector-v2-features.md)
## Support These Engines

By default, we will use 2pc to guarantee the message is sent to RocketMQ exactly once.
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Options

| name | type | required | default value |
|----------------------|---------|----------|--------------------------|
| topic | string | yes | - |
| name.srv.addr | string | yes | - |
| acl.enabled | Boolean | no | false |
| access.key | String | no | |
| secret.key | String | no | |
| producer.group | String | no | SeaTunnel-producer-Group |
| semantic | string | no | NON |
| partition.key.fields | array | no | - |
| format | String | no | json |
| field.delimiter | String | no | , |
| common-options | config | no | - |

### topic [string]
## Key features

`RocketMQ topic` name.
- [x] [exactly-once](../../concept/connector-v2-features.md)

### name.srv.addr [string]
By default, we will use 2pc to guarantee the message is sent to RocketMQ exactly once.

`RocketMQ` name server cluster address.
## Description

### semantic [string]
Write Rows to a Apache RocketMQ topic.

Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
## Sink Options

| Name | Type | Required | Default | Description |
|----------------------|---------|----------|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | string | yes | - | `RocketMQ topic` name. |
| name.srv.addr | string | yes | - | `RocketMQ` name server cluster address. |
| acl.enabled | Boolean | no | false | false |
| access.key | String | no | | When ACL_ENABLED is true, access key cannot be empty |
| secret.key | String | no | | When ACL_ENABLED is true, secret key cannot be empty |
| producer.group | String | no | SeaTunnel-producer-Group | SeaTunnel-producer-Group |
| partition.key.fields | array | no | - | - |
| format | String | no | json | Data format. The default format is json. Optional text format. The default field separator is ",".If you customize the delimiter, add the "field_delimiter" option. |
| field.delimiter | String | no | , | Customize the field delimiter for data format. |
| producer.send.sync | Boolean | no | false | If true, the message will be sync sent. |
| common-options | config | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. |

### partition.key.fields [array]

Expand All @@ -55,27 +53,150 @@ Upstream data is the following:

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

### format
## Task Example

### Fake to Rocketmq Simple

> The data is randomly generated and asynchronously sent to the test topic
```hocon
env {
execution.parallelism = 1
}
source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
Rocketmq {
name.srv.addr = "localhost:9876"
topic = "test_topic"
}
}
```

Data format. The default format is json. Optional text format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.
### Rocketmq To Rocketmq Simple

### field_delimiter
> Consuming Rocketmq writes to c_int field Hash number of partitions written to different partitions This is the default asynchronous way to write
Customize the field delimiter for data format.
```hocon
env {
execution.parallelism = 1
}
### common options [config]
source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
result_table_name = "rocketmq_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
sink {
Rocketmq {
name.srv.addr = "localhost:9876"
topic = "test_topic_sink"
partition.key.fields = ["c_int"]
}
}
```

### Timestamp consumption write Simple

## Examples
> This is a stream consumption specified time stamp consumption, when there are new partitions added the program will refresh the perception and consumption at intervals, and write to another topic type
```hocon
env {
execution.parallelism = 1
job.mode = "STREAMING"
}
source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
result_table_name = "rocketmq_table"
start.mode = "CONSUME_FROM_FIRST_OFFSET"
batch.size = "400"
consumer.group = "test_topic_group"
format = "json"
format = json
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
Rocketmq {
name.srv.addr = "localhost:9876"
topic = "test-topic-003"
partition.key.fields = ["name"]
topic = "test_topic"
partition.key.fields = ["c_int"]
producer.send.sync = true
}
}
```
Expand Down
Loading

0 comments on commit c8f7b07

Please sign in to comment.