Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][Redis]Redis scan command supports versions 5, 6, 7 #7666

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
Open
57 changes: 39 additions & 18 deletions docs/en/connector-v2/source/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ Used to read data from Redis.

## Options

| name | type | required | default value |
|---------------------|--------|-----------------------|---------------|
| name | type | required | default value |
| ------------------- | ------ | --------------------- | ------------- |
| host | string | yes | - |
| port | int | yes | - |
| keys | string | yes | - |
Expand Down Expand Up @@ -67,7 +67,6 @@ for example, if the value of hash key is the following shown:
if hash_key_parse_mode is `all` and schema config as the following shown, it will generate the following data:

```hocon

schema {
fields {
001 {
Expand All @@ -83,14 +82,13 @@ schema {

```

| 001 | 002 |
|---------------------------------|---------------------------|
| 001 | 002 |
| ------------------------------- | ------------------------- |
| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |

if hash_key_parse_mode is `kv` and schema config as the following shown, it will generate the following data:

```hocon

schema {
fields {
hash_key = string
Expand All @@ -101,10 +99,10 @@ schema {

```

| hash_key | name | age |
|----------|---------------|-----|
| 001 | tyrantlucifer | 26 |
| 002 | Zongwen | 26 |
| hash_key | name | age |
| -------- | ------------- | ---- |
| 001 | tyrantlucifer | 26 |
| 002 | Zongwen | 26 |

each kv that in hash key it will be treated as a row and send it to upstream.

Expand Down Expand Up @@ -180,15 +178,13 @@ when you assign format is `json`, you should also assign schema option, for exam
upstream data is the following:

```json

{"code": 200, "data": "get success", "success": true}

```

you should assign schema as the following:

```hocon

schema {
fields {
code = int
Expand All @@ -201,24 +197,23 @@ schema {

connector will generate data as the following:

| code | data | success |
|------|-------------|---------|
| code | data | success |
| ---- | ----------- | ------- |
| 200 | get success | true |

when you assign format is `text`, connector will do nothing for upstream data, for example:

upstream data is the following:

```json

{"code": 200, "data": "get success", "success": true}

```

connector will generate data as the following:

| content |
|----------------------------------------------------------|
| content |
| -------------------------------------------------------- |
| {"code": 200, "data": "get success", "success": true} |

### schema [config]
Expand Down Expand Up @@ -261,6 +256,32 @@ Redis {
}
```

read string type keys write append to list

```hocon
source {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
keys = "string_test*"
data_type = string
batch_size = 33
}
}

sink {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
key = "string_test_list"
data_type = list
batch_size = 33
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand All @@ -270,4 +291,4 @@ Redis {
### next version

- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/seatunnel/pull/3188)

- [Bug] Redis scan command supports versions 5, 6, 7 [7666](https://github.com/apache/seatunnel/pull/7666)
3 changes: 2 additions & 1 deletion release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997)
- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085)
- [Connector-V2] [Clickhouse] Add a new optional configuration `clickhouse.config` to the source connector of ClickHouse (#7143)
- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)
- [Connector-V2] [Redis] Redis scan command supports versions 3, 4, 5, 6, 7 (#7666)

### Zeta(ST-Engine)

Expand Down Expand Up @@ -200,6 +200,7 @@
- [Connector-V2] [Assert] Support field type assert and field value equality assert for full data types (#6275)
- [Connector-V2] [Iceberg] Support iceberg sink #6198
- [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector #4578
- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)

### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -32,22 +33,52 @@ public abstract class RedisClient extends Jedis {

protected final RedisParameters redisParameters;

private final Integer redisVersion;

protected final int batchSize;

protected final Jedis jedis;

protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
private static final int REDIS_5 = 5;

protected RedisClient(RedisParameters redisParameters, Jedis jedis, int redisVersion) {
this.redisParameters = redisParameters;
this.batchSize = redisParameters.getBatchSize();
this.jedis = jedis;
this.redisVersion = redisVersion;
}

public ScanResult<String> scanKeys(
String cursor, int batchSize, String keysPattern, RedisDataType type) {
ScanParams scanParams = new ScanParams();
scanParams.match(keysPattern);
scanParams.count(batchSize);
return jedis.scan(cursor, scanParams, type.name());
return scanByRedisVersion(cursor, scanParams, type, redisVersion);
}

private ScanResult<String> scanByRedisVersion(
String cursor, ScanParams scanParams, RedisDataType type, Integer redisVersion) {
if (redisVersion <= REDIS_5) {
return scanOnRedis5(cursor, scanParams, type);
} else {
return jedis.scan(cursor, scanParams, type.name());
}
}

// When the version is earlier than redis5, scan command does not support type
private ScanResult<String> scanOnRedis5(
String cursor, ScanParams scanParams, RedisDataType type) {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
String resultCursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
List<String> typeKeys = new ArrayList<>(keys.size());
for (String key : keys) {
String keyType = jedis.type(key);
if (type.name().equalsIgnoreCase(keyType)) {
typeKeys.add(key);
}
}
return new ScanResult<>(resultCursor, typeKeys);
}

public abstract List<String> batchGetString(List<String> keys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.Set;

public class RedisClusterClient extends RedisClient {
public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) {
super(redisParameters, jedis);
public RedisClusterClient(RedisParameters redisParameters, Jedis jedis, int redisVersion) {
super(redisParameters, jedis, redisVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
// In standalone mode, pipeline can be used to improve batch read performance
public class RedisSingleClient extends RedisClient {

public RedisSingleClient(RedisParameters redisParameters, Jedis jedis) {
super(redisParameters, jedis);
public RedisSingleClient(RedisParameters redisParameters, Jedis jedis, int redisVersion) {
super(redisParameters, jedis, redisVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient;
Expand All @@ -27,6 +27,7 @@
import org.apache.commons.lang3.StringUtils;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
Expand All @@ -37,7 +38,11 @@
import java.util.HashSet;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.GET_REDIS_VERSION_INFO_FAILED;
import static org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.INVALID_CONFIG;

@Data
@Slf4j
public class RedisParameters implements Serializable {
private String host;
private int port;
Expand All @@ -53,6 +58,8 @@ public class RedisParameters implements Serializable {
private long expire = RedisConfig.EXPIRE.defaultValue();
private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();

private int redisVersion;

public void buildWithConfig(ReadonlyConfig config) {
// set host
this.host = config.get(RedisConfig.HOST);
Expand Down Expand Up @@ -94,11 +101,40 @@ public void buildWithConfig(ReadonlyConfig config) {

public RedisClient buildRedisClient() {
Jedis jedis = this.buildJedis();
this.redisVersion = extractRedisVersion(jedis);
if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
return new RedisSingleClient(this, jedis);
return new RedisSingleClient(this, jedis, redisVersion);
} else {
return new RedisClusterClient(this, jedis);
return new RedisClusterClient(this, jedis, redisVersion);
}
}

private int extractRedisVersion(Jedis jedis) {
log.info("Try to get redis version information from the jedis.info() method");
// # Server
// redis_version:5.0.14
// redis_git_sha1:00000000
// redis_git_dirty:0
String info = jedis.info();
try {
for (String line : info.split("\n")) {
if (line.startsWith("redis_version:")) {
// 5.0.14
String versionInfo = line.split(":")[1].trim();
log.info("The version of Redis is :{}", versionInfo);
String[] parts = versionInfo.split("\\.");
return Integer.parseInt(parts[0]);
}
}
} catch (Exception e) {
throw new RedisConnectorException(
GET_REDIS_VERSION_INFO_FAILED,
GET_REDIS_VERSION_INFO_FAILED.getErrorMessage(),
e);
}
throw new RedisConnectorException(
GET_REDIS_VERSION_INFO_FAILED,
"Did not get the expected redis_version from the jedis.info() method");
}

public Jedis buildJedis() {
Expand All @@ -122,7 +158,7 @@ public Jedis buildJedis() {
String[] splits = redisNode.split(":");
if (splits.length != 2) {
throw new RedisConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
INVALID_CONFIG,
"Invalid redis node information,"
+ "redis node information must like as the following: [host:port]");
}
Expand Down Expand Up @@ -151,8 +187,7 @@ public Jedis buildJedis() {
default:
// do nothing
throw new RedisConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"Not support this redis mode");
CommonErrorCode.OPERATION_NOT_SUPPORTED, "Not support this redis mode");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.seatunnel.connectors.seatunnel.redis.exception;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

public enum RedisErrorCode implements SeaTunnelErrorCode {
GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the redis version"),
INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config");

private final String code;
private final String description;

RedisErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
Loading
Loading