Skip to content

Commit

Permalink
repairing the support of select command from single mode to cluster mode
Browse files Browse the repository at this point in the history
add and display data file synchronization progress
  • Loading branch information
pingxingshikong committed Jul 26, 2021
1 parent 41a5961 commit 58aa935
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 198 deletions.
73 changes: 41 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及
* redissycner 客户端
[redissyncer-cli](https://github.com/TraceNature/redissyncer-cli)

* dashboard web控制面板
[redissycner-dashboard](https://github.com/TraceNature/dashboard_release)

* redis 数据校验工具
[redissycner-compare](https://github.com/TraceNature/rediscompare)

Expand All @@ -32,6 +35,7 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及
* rdb跨版本支持,支持高版本至低版本
* ttl校准
* 命令订阅(目标端支持kafka)

## Quick start

请参阅[Quick Start Guide](docs/quickstart.md),文档包括构建及部署方法及基本使用方法
Expand All @@ -45,27 +49,31 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及

## 编译环境

| **环境条件** | **版本号** |
| **环境条件** | **版本号** |
| :----: | :----: |
| \[Maven\] | \[3.0+ \] |
| \[Maven\] | \[3.0+ \] |
| \[JDK\] | \[1.8 \] |

## 运行环境

| **环境条件** | **版本号** |
| **环境条件** | **版本号** |
| :----: | :----: |
| \[JDK\] | \[1.8+ \] |
| \[JDK\] | \[1.8+ \] |

## 支持Redis版本

| **环境条件** |**版本号** |
| **环境条件** |**版本号** |
| :----:| :----: |
| \[Redis\] | \[2.8-6.2\] |
| \[Redis\] | \[2.8-6.2\] |

## [dashboard](https://github.com/TraceNature/dashboard_release)

![](docs/images/dashboard/dashborad1.png)


## 支持的命令(写命令)
| system | String | List | Hash | Set | ZSet | Transactions | GEO | Stream | HyperLogLog |
|------------|-------------|--------------|------------|-------------|----------|--------------|------------------|------------------|------------------|
|------------|-------------|--------------|------------|-------------|----------|--------------|------------------|------------------|------------------|
| SElECT | BITFIELD | BLMOVE | HDEL | SADD | BZPOPMAX | EXEC | GEOADD | XSETID | PFMERGE |
| FLUSHALL | APPEND | BLPOP | HINCRBY | SDIFFSTORE | BZPOPMIN | MULTI | GEOSEARCHSTORE | XACK | PFADD |
| FLUSHDB | BITOP | BRPOP | HMSET | SINTERSTORE | ZADD | DISCARD | | XADD | PFCOUNT |
Expand All @@ -90,57 +98,58 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及

| **源数据类型** | **说明** |
| :----:| :----: |
| \[Redis\] | \[存量数据同步\] |
| \[Redis\] | \[增量实时同步\] |
| \[Redis\] | \[存量+增量实时同步\] |
| \[Redis\] | \[存量数据同步\] |
| \[Redis\] | \[增量实时同步\] |
| \[Redis\] | \[存量+增量实时同步\] |
| \[Redis\] | \[生成实时增量AOF文件\] |
| \[本地RDB\] | \[本地RDB文件导入\] |
| \[本地RDB\] | \[本地RDB文件导入\] |
| \[在线RDB\] | \[文件url导入\] |
| \[本地AOF\] | \[本地AOF文件导入\] |
| \[本地AOF\] | \[本地AOF文件导入\] |
| \[在线AOF\] | \[文件url导入\] |
| \[本地混合文件\] | \[本地混合文件导入\] |
| \[在线混合文件\] | \[文件url导入\] |
| \[本地混合文件\] | \[本地混合文件导入\] |
| \[在线混合文件\] | \[文件url导入\] |


[comment]: <> (##支持命令)
[comment]: <> "##支持命令"

[comment]: <> (| 命令 | 命令 | 命令 | 命令 |)
[comment]: <> "| 命令 | 命令 | 命令 | 命令 |"

[comment]: <> (| :----:| :----: | :----: | :----: |)
[comment]: <> "| :----:| :----: | :----: | :----: |"

[comment]: <> (| APPEND | BLPOP | SADD |)
[comment]: <> "| APPEND | BLPOP | SADD |"

[comment]: <> (| SET | BRPOP | SCARD |)
[comment]: <> "| SET | BRPOP | SCARD |"

[comment]: <> (| SETEX | BRPOPLPUSH | SDIFFSTORE |)
[comment]: <> "| SETEX | BRPOPLPUSH | SDIFFSTORE |"

[comment]: <> (| SETNX | LINSERT | SINTERSTORE |)
[comment]: <> "| SETNX | LINSERT | SINTERSTORE |"

[comment]: <> (| GETSET | LPOP | SMOVE |)
[comment]: <> "| GETSET | LPOP | SMOVE |"

[comment]: <> (| SETBIT | LPUSH | SPOP |)
[comment]: <> "| SETBIT | LPUSH | SPOP |"

[comment]: <> (| SETRANGE | LPUSHX | SREM |)
[comment]: <> "| SETRANGE | LPUSHX | SREM |"

[comment]: <> (| MSET | LREM | SUNIONSTORE |)
[comment]: <> "| MSET | LREM | SUNIONSTORE |"

[comment]: <> (| MSETNX | LSET | 单元格 |)
[comment]: <> "| MSETNX | LSET | 单元格 |"

[comment]: <> (| PSETEX | LTRIM | 单元格 |)
[comment]: <> "| PSETEX | LTRIM | 单元格 |"

[comment]: <> (| INCR | RPOP | 单元格 |)
[comment]: <> "| INCR | RPOP | 单元格 |"

[comment]: <> (| INCRBY | RPOPLPUSH | 单元格 |)
[comment]: <> "| INCRBY | RPOPLPUSH | 单元格 |"

[comment]: <> (|INCRBYFLOAT | RPUSH | 单元格 |)
[comment]: <> "|INCRBYFLOAT | RPUSH | 单元格 |"

[comment]: <> (| DECR | RPUSHX | 单元格 |)
[comment]: <> "| DECR | RPUSHX | 单元格 |"

[comment]: <> (| DECRBY | 单元格 | 单元格 |)
[comment]: <> "| DECRBY | 单元格 | 单元格 |"

## 致谢

### [Jedis](https://github.com/redis/jedis)

### [replicatior](https://github.com/leonchen83/redis-replicator)

### [IntelliJ IDEA](https://www.jetbrains.com/?from=redis-replicator)
Binary file added docs/images/dashboard/dashborad1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
24 changes: 14 additions & 10 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@
- [x] key过滤
- [x] 命令过滤
- [x] 限制任务数,根据内存容量限制创建任务
- [ ] 实现增量续传2.0,通过redis事务命令,尽最大可能保证数据一致性
- [ ] 支持源Redis主从故障转移以及支持sentinel模式
- [x] 目标连接retry机制
- [ ] 实现 rewrite
- [ ] source.type target.type
- [ ] 内存级别双向同步
- [x] 实现增量续传2.0,通过redis事务命令,尽最大可能保证数据一致性
- [x] 目标连接retry机制
- [x] source.type target.type
- [x] 目标为kafka,实现命令订阅
- [x] 集成log4j2,日志可通过application.yml或启动参数配置,默认输出位置 ./log
- [ ] 数据校验,由goclient集成
- [x] incr 、incrby等命令幂等操作
- [x] swagger 补充api说明
- [ ] goclient 适应v2 api
- [ ] goclient 实现交互模式类似redis-cli
- [x] goclient 适应v2 api
- [x] goclient 实现交互模式类似redis-cli
- [ ] 源端节点scan模式,使用scan命令实现不支持sync命令云的Redis的全量数据拉取
- [ ] 数据校验,由goclient集成
- [ ] 支持目标Redis sentinel模式
- [ ] 支持源Redis主从故障转移以及支持sentinel模式
- [ ] 实现 rewrite
- [ ] 内存级别双向同步
- [ ] 目标为rediscluster 实现pipeline写入


* testcase完善,形成完整回归测试案例
- [x] single2single
- [ ] single2single with dbmap
Expand All @@ -60,7 +64,7 @@


* 4.X
- [ ] 任务元数据改为ETCD存储
- [x] 任务元数据改为ETCD存储
- [ ] 实现portal及任务调度
- [ ] 实现任务在集群某节点不可用的情况下自动迁移至其他节点并续传任务
- [ ] 兼容redis协议的其他kv产品例如Tides,TiKV
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ public class SocketReplication extends AbstractReplication{
private Heartbeat heartbeat;
private RedisOutputStream outputStream;
protected RedisSocketFactory socketFactory;

protected DefaultSyncRedisProtocol syncRedisProtocol;

private SocketReplicationRetrier socketReplicationRetrier;


/**
* sentinel failover 结束
* * 是否属于哨兵模式-》哨兵模式不会进入break 而是 failover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
@Slf4j
public class SingleTaskServiceImpl implements ISingleTaskService {
RedisReplIdCheck redisReplIdCheck = new RedisReplIdCheck();

/**
* 运行之前已经创建过任务
*
Expand Down Expand Up @@ -167,7 +166,6 @@ public String runSyncerCommandDumpUpTask(TaskModel taskModel) throws Exception {
SingleTaskDataManagerUtils.changeThreadStatus(taskModel.getId(), -1L, TaskStatus.CREATING);

try {

//校验
TaskCheckStrategyGroupSelecter.select(RedisTaskStrategyGroupType.NODISTINCT, null, taskModel).run(null, taskModel);

Expand All @@ -176,7 +174,6 @@ public String runSyncerCommandDumpUpTask(TaskModel taskModel) throws Exception {
throw e;
}


//创建完成
SingleTaskDataManagerUtils.changeThreadStatus(taskModel.getId(), -1L, TaskStatus.CREATED);

Expand All @@ -186,7 +183,6 @@ public String runSyncerCommandDumpUpTask(TaskModel taskModel) throws Exception {
SingleTaskDataManagerUtils.brokenStatusAndLog(e, this.getClass(), taskModel.getId());
}


return taskModel.getId();
}

Expand All @@ -204,7 +200,6 @@ public void run() {
TaskDataEntity data = SingleTaskDataManagerUtils.getAliveThreadHashMap().get(taskModel.getId());
try {
SingleTaskDataManagerUtils.changeThreadStatus(taskModel.getId(), data.getOffSetEntity().getReplOffset().get(), TaskStatus.STOP);

try {
data.getReplication().close();
} catch (IOException e) {
Expand Down Expand Up @@ -247,7 +242,6 @@ public void run() {
}
}
}

@Override
public String lockName() {
return "startRunLock" + taskModel.getTaskId();
Expand All @@ -263,7 +257,6 @@ public int grant() {
} catch (Exception e) {
e.printStackTrace();
}

return result;
}

Expand Down Expand Up @@ -305,7 +298,6 @@ public void run() {
result.setMsg("The task does not exist. Please create the task first");
}
}

} catch (Exception e) {
result.setCode("1000");
result.setTaskId(taskId);
Expand All @@ -331,7 +323,6 @@ public int grant() {
@Override
public StartTaskEntity startTaskByTaskId(String taskId, boolean afresh) {
final StartTaskEntity result = StartTaskEntity.builder().build();

TaskRunUtils.getTaskLock(taskId, new EtcdLockCommandRunner() {
@Override
public void run() {
Expand All @@ -343,7 +334,6 @@ public void run() {
return ;
}
TaskModel taskModel = SqlOPUtils.findTaskById(taskId);

if (Objects.isNull(taskModel)) {
result.setCode("1002");
result.setTaskId(taskId);
Expand All @@ -359,8 +349,6 @@ public void run() {
* todo offset更新
*/
taskModel.setAfresh(afresh);


SqlOPUtils.updateAfreshsetById(taskId, afresh);
String id = null;
if (!SingleTaskDataManagerUtils.isTaskClose(taskId)) {
Expand All @@ -375,15 +363,11 @@ public void run() {
} else {
id = runSyncerTask(taskModel);
}

result.setCode("2000");
result.setTaskId(id);
result.setMsg("OK");
return;


} catch (Exception e) {

result.setCode("1000");
result.setTaskId(taskId);
result.setMsg("Error_" + e.getMessage());
Expand Down Expand Up @@ -424,7 +408,6 @@ public void run() {
.msg("task is running,please stop the task first")
.build();
result.add(startTaskEntity);

} else {
try {
SqlOPUtils.deleteTaskById(taskModel.getId());
Expand Down Expand Up @@ -476,7 +459,6 @@ public StartTaskEntity removeTaskByTaskId(String taskId) throws Exception {
.build();
return result;
}

try {
if (SqlOPUtils.findTaskById(taskId) != null) {
SqlOPUtils.deleteTaskById(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ public int grant() {
return 30;
}
});

}

return resultList;
}

Expand Down
Loading

0 comments on commit 58aa935

Please sign in to comment.