Skip to content

Commit

Permalink
Merge pull request #55 from TraceNature/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
pingxingshikong authored Jul 26, 2021
2 parents 15820bf + 58aa935 commit d7900bd
Show file tree
Hide file tree
Showing 43 changed files with 411 additions and 243 deletions.
73 changes: 41 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及
* redissycner 客户端
[redissyncer-cli](https://github.com/TraceNature/redissyncer-cli)

* dashboard web控制面板
[redissycner-dashboard](https://github.com/TraceNature/dashboard_release)

* redis 数据校验工具
[redissycner-compare](https://github.com/TraceNature/rediscompare)

Expand All @@ -32,6 +35,7 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及
* rdb跨版本支持,支持高版本至低版本
* ttl校准
* 命令订阅(目标端支持kafka)

## Quick start

请参阅[Quick Start Guide](docs/quickstart.md),文档包括构建及部署方法及基本使用方法
Expand All @@ -45,27 +49,31 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及

## 编译环境

| **环境条件** | **版本号** |
| **环境条件** | **版本号** |
| :----: | :----: |
| \[Maven\] | \[3.0+ \] |
| \[Maven\] | \[3.0+ \] |
| \[JDK\] | \[1.8 \] |

## 运行环境

| **环境条件** | **版本号** |
| **环境条件** | **版本号** |
| :----: | :----: |
| \[JDK\] | \[1.8+ \] |
| \[JDK\] | \[1.8+ \] |

## 支持Redis版本

| **环境条件** |**版本号** |
| **环境条件** |**版本号** |
| :----:| :----: |
| \[Redis\] | \[2.8-6.2\] |
| \[Redis\] | \[2.8-6.2\] |

## [dashboard](https://github.com/TraceNature/dashboard_release)

![](docs/images/dashboard/dashborad1.png)


## 支持的命令(写命令)
| system | String | List | Hash | Set | ZSet | Transactions | GEO | Stream | HyperLogLog |
|------------|-------------|--------------|------------|-------------|----------|--------------|------------------|------------------|------------------|
|------------|-------------|--------------|------------|-------------|----------|--------------|------------------|------------------|------------------|
| SElECT | BITFIELD | BLMOVE | HDEL | SADD | BZPOPMAX | EXEC | GEOADD | XSETID | PFMERGE |
| FLUSHALL | APPEND | BLPOP | HINCRBY | SDIFFSTORE | BZPOPMIN | MULTI | GEOSEARCHSTORE | XACK | PFADD |
| FLUSHDB | BITOP | BRPOP | HMSET | SINTERSTORE | ZADD | DISCARD | | XADD | PFCOUNT |
Expand All @@ -90,57 +98,58 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及

| **源数据类型** | **说明** |
| :----:| :----: |
| \[Redis\] | \[存量数据同步\] |
| \[Redis\] | \[增量实时同步\] |
| \[Redis\] | \[存量+增量实时同步\] |
| \[Redis\] | \[存量数据同步\] |
| \[Redis\] | \[增量实时同步\] |
| \[Redis\] | \[存量+增量实时同步\] |
| \[Redis\] | \[生成实时增量AOF文件\] |
| \[本地RDB\] | \[本地RDB文件导入\] |
| \[本地RDB\] | \[本地RDB文件导入\] |
| \[在线RDB\] | \[文件url导入\] |
| \[本地AOF\] | \[本地AOF文件导入\] |
| \[本地AOF\] | \[本地AOF文件导入\] |
| \[在线AOF\] | \[文件url导入\] |
| \[本地混合文件\] | \[本地混合文件导入\] |
| \[在线混合文件\] | \[文件url导入\] |
| \[本地混合文件\] | \[本地混合文件导入\] |
| \[在线混合文件\] | \[文件url导入\] |


[comment]: <> (##支持命令)
[comment]: <> "##支持命令"

[comment]: <> (| 命令 | 命令 | 命令 | 命令 |)
[comment]: <> "| 命令 | 命令 | 命令 | 命令 |"

[comment]: <> (| :----:| :----: | :----: | :----: |)
[comment]: <> "| :----:| :----: | :----: | :----: |"

[comment]: <> (| APPEND | BLPOP | SADD |)
[comment]: <> "| APPEND | BLPOP | SADD |"

[comment]: <> (| SET | BRPOP | SCARD |)
[comment]: <> "| SET | BRPOP | SCARD |"

[comment]: <> (| SETEX | BRPOPLPUSH | SDIFFSTORE |)
[comment]: <> "| SETEX | BRPOPLPUSH | SDIFFSTORE |"

[comment]: <> (| SETNX | LINSERT | SINTERSTORE |)
[comment]: <> "| SETNX | LINSERT | SINTERSTORE |"

[comment]: <> (| GETSET | LPOP | SMOVE |)
[comment]: <> "| GETSET | LPOP | SMOVE |"

[comment]: <> (| SETBIT | LPUSH | SPOP |)
[comment]: <> "| SETBIT | LPUSH | SPOP |"

[comment]: <> (| SETRANGE | LPUSHX | SREM |)
[comment]: <> "| SETRANGE | LPUSHX | SREM |"

[comment]: <> (| MSET | LREM | SUNIONSTORE |)
[comment]: <> "| MSET | LREM | SUNIONSTORE |"

[comment]: <> (| MSETNX | LSET | 单元格 |)
[comment]: <> "| MSETNX | LSET | 单元格 |"

[comment]: <> (| PSETEX | LTRIM | 单元格 |)
[comment]: <> "| PSETEX | LTRIM | 单元格 |"

[comment]: <> (| INCR | RPOP | 单元格 |)
[comment]: <> "| INCR | RPOP | 单元格 |"

[comment]: <> (| INCRBY | RPOPLPUSH | 单元格 |)
[comment]: <> "| INCRBY | RPOPLPUSH | 单元格 |"

[comment]: <> (|INCRBYFLOAT | RPUSH | 单元格 |)
[comment]: <> "|INCRBYFLOAT | RPUSH | 单元格 |"

[comment]: <> (| DECR | RPUSHX | 单元格 |)
[comment]: <> "| DECR | RPUSHX | 单元格 |"

[comment]: <> (| DECRBY | 单元格 | 单元格 |)
[comment]: <> "| DECRBY | 单元格 | 单元格 |"

## 致谢

### [Jedis](https://github.com/redis/jedis)

### [replicatior](https://github.com/leonchen83/redis-replicator)

### [IntelliJ IDEA](https://www.jetbrains.com/?from=redis-replicator)
Binary file added docs/images/dashboard/dashborad1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions docs/requirement.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,13 @@
repl-ping-slave-period要小于readTimeout(redissyncer默认60000ms)
源节点内存不够无法进行bgsave
offset刷过


### Broken状态
请查询反馈导致异常的问题,常见问题如下:

| 异常信息 | 原因 |
| :----------------------------------------------------------: | :---------------------------------: |
| WRONGTYPE Operation against a key holding the wrong kind of value | 目标中已存在key和源节点中type不一致 |
| | |
| | |
24 changes: 14 additions & 10 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@
- [x] key过滤
- [x] 命令过滤
- [x] 限制任务数,根据内存容量限制创建任务
- [ ] 实现增量续传2.0,通过redis事务命令,尽最大可能保证数据一致性
- [ ] 支持源Redis主从故障转移以及支持sentinel模式
- [x] 目标连接retry机制
- [ ] 实现 rewrite
- [ ] source.type target.type
- [ ] 内存级别双向同步
- [x] 实现增量续传2.0,通过redis事务命令,尽最大可能保证数据一致性
- [x] 目标连接retry机制
- [x] source.type target.type
- [x] 目标为kafka,实现命令订阅
- [x] 集成log4j2,日志可通过application.yml或启动参数配置,默认输出位置 ./log
- [ ] 数据校验,由goclient集成
- [x] incr 、incrby等命令幂等操作
- [x] swagger 补充api说明
- [ ] goclient 适应v2 api
- [ ] goclient 实现交互模式类似redis-cli
- [x] goclient 适应v2 api
- [x] goclient 实现交互模式类似redis-cli
- [ ] 源端节点scan模式,使用scan命令实现不支持sync命令云的Redis的全量数据拉取
- [ ] 数据校验,由goclient集成
- [ ] 支持目标Redis sentinel模式
- [ ] 支持源Redis主从故障转移以及支持sentinel模式
- [ ] 实现 rewrite
- [ ] 内存级别双向同步
- [ ] 目标为rediscluster 实现pipeline写入


* testcase完善,形成完整回归测试案例
- [x] single2single
- [ ] single2single with dbmap
Expand All @@ -60,7 +64,7 @@


* 4.X
- [ ] 任务元数据改为ETCD存储
- [x] 任务元数据改为ETCD存储
- [ ] 实现portal及任务调度
- [ ] 实现任务在集群某节点不可用的情况下自动迁移至其他节点并续传任务
- [ ] 兼容redis协议的其他kv产品例如Tides,TiKV
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ public enum ResultCodeAndMessage {
TASK_MSG_TASK_TARGET_TOPIC_NAME_NULL("4031","kafka命令订阅模式topicName不能为空","kafka命令订阅模式topicName不能为空"),
TASK_MSG_TASK_TARGET_KAFKA_ADDRESS_NULL("4032","kafka命令订阅模式targetKafkaAddress不能为空","kafka命令订阅模式targetKafkaAddress不能为空"),

TASK_MSG_RDB_VERSION_MSG_ERROR("4033","rdb版本获取失败,暂不支持目标Redis版本","rdb version错误"),

TASK_MSG_TASK_TARGET_REDIS_ADDRESS_NULL("100","目标redis地址不能为空","目标redis地址不能为空")



;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ public class ReplicConfig {
*/
private HostnameVerifier hostnameVerifier;


/**
* 用于进度计算
*/
private long readFileSize;
/**
* 文件总大小
*/
private long fileSize;

public void setReplOffset(long offset){
replOffset.set(offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@


import syncer.replica.util.bytes.ByteArray;

import java.io.IOException;
import java.io.InputStream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class RedisInputStream extends InputStream {
protected final InputStream in;
protected List<TaskRawByteListener> rawByteListenerList;


public RedisInputStream(final InputStream in) {
this(in, 8192);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public long parse() throws IOException, IncrementException {
}
long start = offset;
offset += in.unmark();
replication.getConfig().setReadFileSize(offset);
if (Objects.isNull(event)) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class AbstractReplication extends AbstractReplicationListener implements
private final AtomicBoolean manual = new AtomicBoolean(false);
protected AtomicBoolean handStop =new AtomicBoolean(false);
protected String brokenMSg="";

/**
* 状态
*/
Expand Down Expand Up @@ -165,6 +166,8 @@ public void submitEvent(Event event, Tuple2<Long, Long> offsets) {
}




/**
* 设置offset
* @param event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ public AofReplication(String filePath, ReplicConfig config ,boolean online) {
try {
if(online){
NetStream netStream= NetStream.builder().build();
in=netStream.getInputStreamByOnlineFile(filePath);
in=netStream.getInputStreamByOnlineFile(filePath,config);
}else {
in = new FileInputStream(filePath);
try {
config.setFileSize(in.available());
}catch (Exception e){
log.error("获取在本地数据文件大小失败...");
}
}
} catch (FileNotFoundException e) {
connected.set(TaskStatus.BROKEN);
Expand Down Expand Up @@ -170,7 +175,6 @@ protected void doOpen() throws IOException, IncrementException {
while (getStatus() == TaskStatus.COMMANDRUNNING) {
Object obj = replyParser.parse(len -> offset[0] = len);
if (obj instanceof Object[]) {

Object[] raw = (Object[]) obj;
CommandName name = CommandName.name(Strings.toString(raw[0]));
final CommandParser<? extends Command> parser;
Expand All @@ -187,10 +191,16 @@ protected void doOpen() throws IOException, IncrementException {
log.warn("unexpected redis reply:{}", obj);
}
config.addOffset(offset[0]);
try {
config.setReadFileSize(config.getReplOffset());
}catch (Exception e){

}
offset[0] = 0L;
}
} catch (EOFException ignore) {
submitEvent(new PostCommandSyncEvent());
config.setReadFileSize(config.getFileSize());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ public MixedReplication(String filePath, ReplicConfig config ,boolean online) {
try {
if(online){
NetStream netStream= NetStream.builder().build();
in=netStream.getInputStreamByOnlineFile(filePath);
in=netStream.getInputStreamByOnlineFile(filePath,config);
}else {
in = new FileInputStream(filePath);
try {
config.setFileSize(in.available());
}catch (Exception e){
log.error("获取在本地数据文件大小失败...");
}
}
} catch (FileNotFoundException e) {
connected.set(TaskStatus.BROKEN);
Expand Down Expand Up @@ -207,8 +212,8 @@ protected void doOpen() throws IOException, IncrementException {
while (getStatus() == TaskStatus.COMMANDRUNNING) {
Object obj = replyParser.parse(len -> offset[0] = len);
if (obj instanceof Object[]) {

Object[] raw = (Object[]) obj;

CommandName name = CommandName.name(Strings.toString(raw[0]));
final CommandParser<? extends Command> parser;
if ((parser = commands.get(name)) == null) {
Expand All @@ -224,10 +229,16 @@ protected void doOpen() throws IOException, IncrementException {
log.warn("unexpected redis reply:{}", obj);
}
config.addOffset(offset[0]);
try {
config.setReadFileSize(config.getReplOffset());
}catch (Exception e){

}
offset[0] = 0L;
}
} catch (EOFException ignore) {
submitEvent(new PostCommandSyncEvent());
config.setReadFileSize(config.getFileSize());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ public RdbReplication(String filePath, ReplicConfig config,boolean online) throw
try {
if(online){
NetStream netStream= NetStream.builder().build();

in=netStream.getInputStreamByOnlineFile(filePath);

in=netStream.getInputStreamByOnlineFile(filePath,config);
}else {
in = new FileInputStream(filePath);
try {
config.setFileSize(in.available());
}catch (Exception e){
log.error("获取在本地数据文件大小失败...");
}
}
} catch (FileNotFoundException e) {
connected.set(TaskStatus.BROKEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ public class SocketReplication extends AbstractReplication{
private Heartbeat heartbeat;
private RedisOutputStream outputStream;
protected RedisSocketFactory socketFactory;

protected DefaultSyncRedisProtocol syncRedisProtocol;

private SocketReplicationRetrier socketReplicationRetrier;


/**
* sentinel failover 结束
* * 是否属于哨兵模式-》哨兵模式不会进入break 而是 failover
Expand Down
Loading

0 comments on commit d7900bd

Please sign in to comment.