Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][Redis]Redis scan command supports versions 5, 6, 7 #7666

Merged
merged 10 commits into from
Sep 21, 2024
Merged
78 changes: 60 additions & 18 deletions docs/en/connector-v2/source/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ Used to read data from Redis.

## Options

| name | type | required | default value |
|---------------------|--------|-----------------------|---------------|
| name | type | required | default value |
| ------------------- | ------ | --------------------- | ------------- |
| redis_version | string | yes | - |
| host | string | yes | - |
| port | int | yes | - |
| keys | string | yes | - |
Expand All @@ -34,6 +35,10 @@ Used to read data from Redis.
| format | string | no | json |
| common-options | | no | - |

### redis_version

version of redis, support `Redis3`,` Redis4`, `Redis5`,` Redis6`,` Redis7`

### host [string]

redis host
Expand Down Expand Up @@ -67,7 +72,6 @@ for example, if the value of hash key is the following shown:
if hash_key_parse_mode is `all` and schema config as the following shown, it will generate the following data:

```hocon

schema {
fields {
001 {
Expand All @@ -83,14 +87,13 @@ schema {

```

| 001 | 002 |
|---------------------------------|---------------------------|
| 001 | 002 |
| ------------------------------- | ------------------------- |
| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |

if hash_key_parse_mode is `kv` and schema config as the following shown, it will generate the following data:

```hocon

schema {
fields {
hash_key = string
Expand All @@ -101,10 +104,10 @@ schema {

```

| hash_key | name | age |
|----------|---------------|-----|
| 001 | tyrantlucifer | 26 |
| 002 | Zongwen | 26 |
| hash_key | name | age |
| -------- | ------------- | ---- |
| 001 | tyrantlucifer | 26 |
| 002 | Zongwen | 26 |

each kv that in hash key it will be treated as a row and send it to upstream.

Expand Down Expand Up @@ -180,15 +183,13 @@ when you assign format is `json`, you should also assign schema option, for exam
upstream data is the following:

```json

{"code": 200, "data": "get success", "success": true}

```

you should assign schema as the following:

```hocon

schema {
fields {
code = int
Expand All @@ -201,24 +202,23 @@ schema {

connector will generate data as the following:

| code | data | success |
|------|-------------|---------|
| code | data | success |
| ---- | ----------- | ------- |
| 200 | get success | true |

when you assign format is `text`, connector will do nothing for upstream data, for example:

upstream data is the following:

```json

{"code": 200, "data": "get success", "success": true}

```

connector will generate data as the following:

| content |
|----------------------------------------------------------|
| content |
| -------------------------------------------------------- |
| {"code": 200, "data": "get success", "success": true} |

### schema [config]
Expand All @@ -242,6 +242,7 @@ Redis {
keys = "key_test*"
data_type = key
format = text
redis_version = Redis5
}
```

Expand All @@ -252,6 +253,7 @@ Redis {
keys = "key_test*"
data_type = key
format = json
redis_version = Redis5
schema {
fields {
name = string
Expand All @@ -261,6 +263,46 @@ Redis {
}
```

read string type keys write append to list

```hocon
env {
parallelism = 1
job.mode = "BATCH"
shade.identifier = "base64"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was careless. I deleted it

}

source {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
keys = "string_test*"
data_type = string
batch_size = 33
redis_version = Redis7
}
}

sink {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
key = "string_test_list"
data_type = list
batch_size = 33
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand All @@ -270,4 +312,4 @@ Redis {
### next version

- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/seatunnel/pull/3188)

- [Bug] Redis scan command supports versions 3, 4, 5, 6, 7 [7666](https://github.com/apache/seatunnel/pull/7666)
3 changes: 2 additions & 1 deletion release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997)
- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085)
- [Connector-V2] [Clickhouse] Add a new optional configuration `clickhouse.config` to the source connector of ClickHouse (#7143)
- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)
- [Connector-V2] [Redis] Redis scan command supports versions 3, 4, 5, 6, 7 (#7666)

### Zeta(ST-Engine)

Expand Down Expand Up @@ -200,6 +200,7 @@
- [Connector-V2] [Assert] Support field type assert and field value equality assert for full data types (#6275)
- [Connector-V2] [Iceberg] Support iceberg sink #6198
- [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector #4578
- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)

### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisVersion;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -32,6 +34,8 @@ public abstract class RedisClient extends Jedis {

protected final RedisParameters redisParameters;

private final RedisVersion redisVersion;

protected final int batchSize;

protected final Jedis jedis;
Expand All @@ -40,14 +44,42 @@ protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
this.redisParameters = redisParameters;
this.batchSize = redisParameters.getBatchSize();
this.jedis = jedis;
this.redisVersion = redisParameters.getRedisVersion();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the version by use redis client api instead of get from users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,good idea. We can get the redis information through jedis.info(), so we can remove the configuration option for redis_version.

private int extractRedisVersion(Jedis jedis) {
        log.info("Try to get redis version information from the jedis.info() method");
        // # Server
        // redis_version:5.0.14
        // redis_git_sha1:00000000
        // redis_git_dirty:0
        String info = jedis.info();
        try {
            for (String line : info.split("\n")) {
                if (line.startsWith("redis_version:")) {
                    // 5.0.14
                    String versionInfo = line.split(":")[1].trim();
                    log.info("The version of Redis is :{}", versionInfo);
                    String[] parts = versionInfo.split("\\.");
                    return Integer.parseInt(parts[0]);
                }
            }
        } catch (Exception e) {
            throw new RedisConnectorException(
                    GET_REDIS_VERSION_INFO_FAILED,
                    GET_REDIS_VERSION_INFO_FAILED.getErrorMessage(),
                    e);
        }
        throw new RedisConnectorException(
                GET_REDIS_VERSION_INFO_FAILED,
                "Did not get the expected redis_version from the jedis.info() method");
    }

}

public ScanResult<String> scanKeys(
String cursor, int batchSize, String keysPattern, RedisDataType type) {
ScanParams scanParams = new ScanParams();
scanParams.match(keysPattern);
scanParams.count(batchSize);
return jedis.scan(cursor, scanParams, type.name());
return scanByRedisVersion(cursor, scanParams, type, redisVersion);
}

private ScanResult<String> scanByRedisVersion(
String cursor, ScanParams scanParams, RedisDataType type, RedisVersion redisVersion) {
if (RedisVersion.Redis3.equals(redisVersion)
|| RedisVersion.Redis4.equals(redisVersion)
|| RedisVersion.Redis5.equals(redisVersion)) {
return scanOnRedis5(cursor, scanParams, type);
} else {
return jedis.scan(cursor, scanParams, type.name());
}
}

// When the version is earlier than redis5, scan command does not support type
private ScanResult<String> scanOnRedis5(
String cursor, ScanParams scanParams, RedisDataType type) {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
String resultCursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
List<String> typeKeys = new ArrayList<>(keys.size());
for (String key : keys) {
String keyType = jedis.type(key);
if (type.name().equalsIgnoreCase(keyType)) {
typeKeys.add(key);
}
}
return new ScanResult<>(resultCursor, typeKeys);
}

public abstract List<String> batchGetString(List<String> keys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public enum HashKeyParseMode {
KV;
}

public static final Option<RedisVersion> REDIS_VERSION =
Options.key("redis_version")
.enumType(RedisVersion.class)
.noDefaultValue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please set default version to version 7.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the next commit, this configuration is removed because we get the version through the client.

.withDescription("the version of redis,minimum supported Redis3");

public static final Option<String> HOST =
Options.key("host")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;

@Data
public class RedisParameters implements Serializable {
Expand All @@ -53,7 +54,12 @@ public class RedisParameters implements Serializable {
private long expire = RedisConfig.EXPIRE.defaultValue();
private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();

private RedisVersion redisVersion;

public void buildWithConfig(ReadonlyConfig config) {
// redis version
Optional<RedisVersion> versionOptional = config.getOptional(RedisConfig.REDIS_VERSION);
versionOptional.ifPresent(version -> this.redisVersion = version);
// set host
this.host = config.get(RedisConfig.HOST);
// set port
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.redis.config;

public enum RedisVersion {
Redis3,
Redis4,
Redis5,
Redis6,
Redis7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public OptionRule optionRule() {
RedisConfig.HOST,
RedisConfig.PORT,
RedisConfig.KEY_PATTERN,
RedisConfig.DATA_TYPE)
RedisConfig.DATA_TYPE,
RedisConfig.REDIS_VERSION)
.optional(
RedisConfig.MODE,
RedisConfig.HASH_KEY_PARSE_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
redisClient.scanKeys(cursor, batchSize, keysPattern, redisDataType);
cursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
if (CollectionUtils.isEmpty(keys)) {
break;
}
pollNext(keys, redisDataType, output);
// when cursor return "0", scan end
if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
Expand All @@ -94,6 +91,9 @@ public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {

private void pollNext(List<String> keys, RedisDataType dataType, Collector<SeaTunnelRow> output)
throws IOException {
if (CollectionUtils.isEmpty(keys)) {
return;
}
if (RedisDataType.HASH.equals(dataType)) {
pollHashMapToNext(keys, output);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.e2e.connector.redis;

import java.util.Collections;
import java.util.List;

public class Redis5IT extends RedisTestCaseTemplateIT {

@Override
public RedisContainerInfo getRedisContainerInfo() {
return new RedisContainerInfo("redis-e2e", 6379, "SeaTunnel", "redis:5");
}

@Override
public List<String> getVariables() {
return Collections.singletonList("redisVersion=Redis5");
}
}
Loading
Loading