-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][Redis]Redis scan command supports versions 5, 6, 7 #7666
Changes from 7 commits
6508049
997d793
0b3ce6b
4725c57
2761246
f038afd
d96edb8
d94b171
a825e4d
15269cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,11 +19,13 @@ | |
|
||
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; | ||
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; | ||
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisVersion; | ||
|
||
import redis.clients.jedis.Jedis; | ||
import redis.clients.jedis.params.ScanParams; | ||
import redis.clients.jedis.resps.ScanResult; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
@@ -32,6 +34,8 @@ public abstract class RedisClient extends Jedis { | |
|
||
protected final RedisParameters redisParameters; | ||
|
||
private final RedisVersion redisVersion; | ||
|
||
protected final int batchSize; | ||
|
||
protected final Jedis jedis; | ||
|
@@ -40,14 +44,42 @@ protected RedisClient(RedisParameters redisParameters, Jedis jedis) { | |
this.redisParameters = redisParameters; | ||
this.batchSize = redisParameters.getBatchSize(); | ||
this.jedis = jedis; | ||
this.redisVersion = redisParameters.getRedisVersion(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we get the version by use redis client api instead of get from users? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok,good idea. We can get the redis information through jedis.info(), so we can remove the configuration option for redis_version. private int extractRedisVersion(Jedis jedis) {
log.info("Try to get redis version information from the jedis.info() method");
// # Server
// redis_version:5.0.14
// redis_git_sha1:00000000
// redis_git_dirty:0
String info = jedis.info();
try {
for (String line : info.split("\n")) {
if (line.startsWith("redis_version:")) {
// 5.0.14
String versionInfo = line.split(":")[1].trim();
log.info("The version of Redis is :{}", versionInfo);
String[] parts = versionInfo.split("\\.");
return Integer.parseInt(parts[0]);
}
}
} catch (Exception e) {
throw new RedisConnectorException(
GET_REDIS_VERSION_INFO_FAILED,
GET_REDIS_VERSION_INFO_FAILED.getErrorMessage(),
e);
}
throw new RedisConnectorException(
GET_REDIS_VERSION_INFO_FAILED,
"Did not get the expected redis_version from the jedis.info() method");
} |
||
} | ||
|
||
public ScanResult<String> scanKeys( | ||
String cursor, int batchSize, String keysPattern, RedisDataType type) { | ||
ScanParams scanParams = new ScanParams(); | ||
scanParams.match(keysPattern); | ||
scanParams.count(batchSize); | ||
return jedis.scan(cursor, scanParams, type.name()); | ||
return scanByRedisVersion(cursor, scanParams, type, redisVersion); | ||
} | ||
|
||
private ScanResult<String> scanByRedisVersion( | ||
String cursor, ScanParams scanParams, RedisDataType type, RedisVersion redisVersion) { | ||
if (RedisVersion.Redis3.equals(redisVersion) | ||
|| RedisVersion.Redis4.equals(redisVersion) | ||
|| RedisVersion.Redis5.equals(redisVersion)) { | ||
return scanOnRedis5(cursor, scanParams, type); | ||
} else { | ||
return jedis.scan(cursor, scanParams, type.name()); | ||
} | ||
} | ||
|
||
// When the version is earlier than redis5, scan command does not support type | ||
private ScanResult<String> scanOnRedis5( | ||
String cursor, ScanParams scanParams, RedisDataType type) { | ||
ScanResult<String> scanResult = jedis.scan(cursor, scanParams); | ||
String resultCursor = scanResult.getCursor(); | ||
List<String> keys = scanResult.getResult(); | ||
List<String> typeKeys = new ArrayList<>(keys.size()); | ||
for (String key : keys) { | ||
String keyType = jedis.type(key); | ||
if (type.name().equalsIgnoreCase(keyType)) { | ||
typeKeys.add(key); | ||
} | ||
} | ||
return new ScanResult<>(resultCursor, typeKeys); | ||
} | ||
|
||
public abstract List<String> batchGetString(List<String> keys); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,12 @@ public enum HashKeyParseMode { | |
KV; | ||
} | ||
|
||
public static final Option<RedisVersion> REDIS_VERSION = | ||
Options.key("redis_version") | ||
.enumType(RedisVersion.class) | ||
.noDefaultValue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please set default version to version 7. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the next commit, this configuration is removed because we get the version through the client. |
||
.withDescription("the version of redis,minimum supported Redis3"); | ||
|
||
public static final Option<String> HOST = | ||
Options.key("host") | ||
.stringType() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seatunnel.connectors.seatunnel.redis.config; | ||
|
||
public enum RedisVersion { | ||
Redis3, | ||
Redis4, | ||
Redis5, | ||
Redis6, | ||
Redis7; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seatunnel.e2e.connector.redis; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
public class Redis5IT extends RedisTestCaseTemplateIT { | ||
|
||
@Override | ||
public RedisContainerInfo getRedisContainerInfo() { | ||
return new RedisContainerInfo("redis-e2e", 6379, "SeaTunnel", "redis:5"); | ||
} | ||
|
||
@Override | ||
public List<String> getVariables() { | ||
return Collections.singletonList("redisVersion=Redis5"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was careless. I deleted it