From 12c03605fa0e6c14a47b75f860474d0b8b178b03 Mon Sep 17 00:00:00 2001 From: zhanenqiang <1604030622@qq.com> Date: Thu, 8 Aug 2024 16:44:53 +0800 Subject: [PATCH] fixed:fix the error of invalid username password pair when password # exists --- .../main/java/syncer/jedis/BinaryClient.java | 62 +++++++++---------- .../java/syncer/replica/config/RedisURI.java | 18 +++++- .../replica/replication/AofReplication.java | 6 +- .../replica/replication/MixedReplication.java | 4 +- .../replica/replication/RedisReplication.java | 2 +- .../retry/SocketReplicationRetrier.java | 4 +- ...mmandProcessingRdbCommandSendStrategy.java | 1 - ...ProcessingRdbMultiCommandSendStrategy.java | 2 - .../RedisDataCommandUpTransmissionTask.java | 4 +- .../task/RedisDataSyncTransmissionTask.java | 4 -- ...ingRingByAuxiliaryKeyTransmissionTask.java | 1 - .../util/redis/KeyCountUtils.java | 3 +- .../util/redis/RedisUrlCheck.java | 2 - syncer-webapp/pom.xml | 2 +- 14 files changed, 60 insertions(+), 55 deletions(-) diff --git a/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java b/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java index 319edbb2..91d3d5d5 100644 --- a/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java +++ b/syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java @@ -1286,17 +1286,17 @@ public void bitfield(final byte[] key, final byte[]... value) { public void hstrlen(final byte[] key, final byte[] field) { sendCommand(HSTRLEN, key, field); } - + public void xadd(final byte[] key, final byte[] id, final Map hash, long maxLen, boolean approximateLength) { int maxLexArgs = 0; if(maxLen < Long.MAX_VALUE) { // optional arguments if(approximateLength) { - maxLexArgs = 3; // e.g. MAXLEN ~ 1000 + maxLexArgs = 3; // e.g. MAXLEN ~ 1000 } else { maxLexArgs = 2; // e.g. MAXLEN 1000 } } - + final byte[][] params = new byte[2 + maxLexArgs + hash.size() * 2][]; int index = 0; params[index++] = key; @@ -1307,7 +1307,7 @@ public void xadd(final byte[] key, final byte[] id, final Map ha } params[index++] = toByteArray(maxLen); } - + params[index++] = id; for (final Entry entry : hash.entrySet()) { params[index++] = entry.getKey(); @@ -1315,15 +1315,15 @@ public void xadd(final byte[] key, final byte[] id, final Map ha } sendCommand(XADD, params); } - + public void xlen(final byte[] key) { sendCommand(XLEN, key); } - - public void xrange(final byte[] key, final byte[] start, final byte[] end, final long count) { + + public void xrange(final byte[] key, final byte[] start, final byte[] end, final long count) { sendCommand(XRANGE, key, start, end, Keyword.COUNT.raw, toByteArray(count)); } - + public void xrevrange(final byte[] key, final byte[] end, final byte[] start, final int count) { sendCommand(XREVRANGE, key, end, start, Keyword.COUNT.raw, toByteArray(count)); } @@ -1338,7 +1338,7 @@ public void xread(final int count, final long block, final Map s params[streamsIndex++] = Keyword.BLOCK.raw; params[streamsIndex++] = toByteArray(block); } - + params[streamsIndex++] = Keyword.STREAMS.raw; int idsIndex = streamsIndex + streams.size(); @@ -1346,10 +1346,10 @@ public void xread(final int count, final long block, final Map s params[streamsIndex++] = entry.getKey(); params[idsIndex++] = entry.getValue(); } - + sendCommand(XREAD, params); } - + public void xack(final byte[] key, final byte[] group, final byte[]... ids) { final byte[][] params = new byte[2 + ids.length][]; int index = 0; @@ -1360,27 +1360,27 @@ public void xack(final byte[] key, final byte[] group, final byte[]... ids) { } sendCommand(XACK, params); } - + public void xgroupCreate(final byte[] key, final byte[] groupname, final byte[] id, boolean makeStream) { if(makeStream) { - sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id, Keyword.MKSTREAM.raw); + sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id, Keyword.MKSTREAM.raw); } else { - sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id); + sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id); } } public void xgroupSetID(final byte[] key, final byte[] groupname, final byte[] id) { - sendCommand(XGROUP, Keyword.SETID.raw, key, groupname, id); + sendCommand(XGROUP, Keyword.SETID.raw, key, groupname, id); } public void xgroupDestroy(final byte[] key, final byte[] groupname) { - sendCommand(XGROUP, Keyword.DESTROY.raw, key, groupname); + sendCommand(XGROUP, Keyword.DESTROY.raw, key, groupname); } public void xgroupDelConsumer(final byte[] key, final byte[] groupname, final byte[] consumerName) { - sendCommand(XGROUP, Keyword.DELCONSUMER.raw, key, groupname, consumerName); + sendCommand(XGROUP, Keyword.DELCONSUMER.raw, key, groupname, consumerName); } - + public void xdel(final byte[] key, final byte[]... ids) { final byte[][] params = new byte[1 + ids.length][]; int index = 0; @@ -1390,7 +1390,7 @@ public void xdel(final byte[] key, final byte[]... ids) { } sendCommand(XDEL, params); } - + public void xtrim(byte[] key, long maxLen, boolean approximateLength) { if(approximateLength) { sendCommand(XTRIM, key, Keyword.MAXLEN.raw, Protocol.BYTES_TILDE ,toByteArray(maxLen)); @@ -1398,9 +1398,9 @@ public void xtrim(byte[] key, long maxLen, boolean approximateLength) { sendCommand(XTRIM, key, Keyword.MAXLEN.raw, toByteArray(maxLen)); } } - + public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map streams) { - + int optional = 0; if(count>0) { optional += 2; @@ -1411,8 +1411,8 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, if(noAck) { optional += 1; } - - + + final byte[][] params = new byte[4 + optional + streams.size() * 2][]; int streamsIndex = 0; @@ -1431,17 +1431,17 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, params[streamsIndex++] = Keyword.NOACK.raw; } params[streamsIndex++] = Keyword.STREAMS.raw; - + int idsIndex = streamsIndex + streams.size(); for (final Entry entry : streams.entrySet()) { params[streamsIndex++] = entry.getKey(); params[idsIndex++] = entry.getValue(); } - + sendCommand(XREADGROUP, params); } - + public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) { if(consumername == null) { sendCommand(XPENDING, key, groupname, start, end, toByteArray(count)); @@ -1451,16 +1451,16 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int } public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids) { - + ArrayList arguments = new ArrayList<>(10 + ids.length); arguments.add(key); arguments.add(groupname); arguments.add(consumername); arguments.add(toByteArray(minIdleTime)); - + for(byte[] id : ids) { - arguments.add(id); + arguments.add(id); } if(newIdleTime > 0) { arguments.add(Keyword.IDLE.raw); @@ -1468,10 +1468,10 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId } if(retries > 0) { arguments.add(Keyword.RETRYCOUNT.raw); - arguments.add(toByteArray(retries)); + arguments.add(toByteArray(retries)); } if(force) { - arguments.add(Keyword.FORCE.raw); + arguments.add(Keyword.FORCE.raw); } sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][])); } diff --git a/syncer-replica/src/main/java/syncer/replica/config/RedisURI.java b/syncer-replica/src/main/java/syncer/replica/config/RedisURI.java index 3118b9fc..5876205f 100644 --- a/syncer-replica/src/main/java/syncer/replica/config/RedisURI.java +++ b/syncer-replica/src/main/java/syncer/replica/config/RedisURI.java @@ -5,7 +5,6 @@ import syncer.replica.type.FileType; import syncer.replica.util.strings.Strings; - import java.io.*; import java.net.MalformedURLException; import java.net.URI; @@ -157,15 +156,25 @@ private void parse(String uri) throws URISyntaxException { } } + + String authPassword=""; + if(uri.contains("authPassword=")){ + authPassword=uri.substring(uri.indexOf("authPassword=")+13); + if(authPassword.contains("&")){ + authPassword=authPassword.substring(0,authPassword.indexOf("&")); + } + } if (this.userInfo != null) { int idx = this.userInfo.indexOf(':'); if (idx < 0) { this.user = this.userInfo; } else if (idx == 0) { - this.password = this.userInfo.substring(idx + 1); +// this.password = this.userInfo.substring(idx + 1); + this.password =authPassword; } else /*(idx > 0)*/{ this.user = this.userInfo.substring(0, idx); - String password = this.userInfo.substring(idx + 1); +// String password = this.userInfo.substring(idx + 1); + String password =authPassword; if (password != null && password.length() != 0) { this.password = password; } @@ -201,6 +210,9 @@ private void parse(String uri) throws URISyntaxException { if (key.length() > 0 && value.length() > 0) { parameters.put(decode(key.toString()), decode(value.toString())); } + if(!"".equals(authPassword)) { + parameters.put("authPassword", authPassword); + } } private static String decode(String s) { diff --git a/syncer-replica/src/main/java/syncer/replica/replication/AofReplication.java b/syncer-replica/src/main/java/syncer/replica/replication/AofReplication.java index ab1094ce..efc7df8c 100644 --- a/syncer-replica/src/main/java/syncer/replica/replication/AofReplication.java +++ b/syncer-replica/src/main/java/syncer/replica/replication/AofReplication.java @@ -175,7 +175,9 @@ protected void doOpen() throws IOException, IncrementException { CommandName name = CommandName.name(Strings.toString(raw[0])); final CommandParser parser; if ((parser = commands.get(name)) == null) { - log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw)); + if(!"opinfo".equalsIgnoreCase(name.name)){ + log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw)); + } config.addOffset(offset[0]); offset[0] = 0L; continue; @@ -195,4 +197,4 @@ protected void doOpen() throws IOException, IncrementException { } -} \ No newline at end of file +} diff --git a/syncer-replica/src/main/java/syncer/replica/replication/MixedReplication.java b/syncer-replica/src/main/java/syncer/replica/replication/MixedReplication.java index e65c3019..72816c96 100644 --- a/syncer-replica/src/main/java/syncer/replica/replication/MixedReplication.java +++ b/syncer-replica/src/main/java/syncer/replica/replication/MixedReplication.java @@ -212,7 +212,9 @@ protected void doOpen() throws IOException, IncrementException { CommandName name = CommandName.name(Strings.toString(raw[0])); final CommandParser parser; if ((parser = commands.get(name)) == null) { - log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw)); + if(!"opinfo".equalsIgnoreCase(name.name)){ + log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw)); + } config.addOffset(offset[0]); offset[0] = 0L; continue; diff --git a/syncer-replica/src/main/java/syncer/replica/replication/RedisReplication.java b/syncer-replica/src/main/java/syncer/replica/replication/RedisReplication.java index 3eb81ed1..c59aa62b 100644 --- a/syncer-replica/src/main/java/syncer/replica/replication/RedisReplication.java +++ b/syncer-replica/src/main/java/syncer/replica/replication/RedisReplication.java @@ -260,4 +260,4 @@ public void close() throws IOException { public void broken(String reason) throws IOException { replication.broken(reason); } -} \ No newline at end of file +} diff --git a/syncer-replica/src/main/java/syncer/replica/retry/SocketReplicationRetrier.java b/syncer-replica/src/main/java/syncer/replica/retry/SocketReplicationRetrier.java index fe9a33f3..d5fadd2f 100644 --- a/syncer-replica/src/main/java/syncer/replica/retry/SocketReplicationRetrier.java +++ b/syncer-replica/src/main/java/syncer/replica/retry/SocketReplicationRetrier.java @@ -109,7 +109,9 @@ public boolean open() throws IOException, IncrementException, RedisAuthErrorExce CommandName name = CommandName.name(Strings.toString(raws[0])); final CommandParser parser; if(Objects.isNull(parser=socketReplication.getCommandParser(name))){ - log.warn("[TASKID {}] command [{}] not register. raw command:{}",config.getTaskId(),name,Strings.format(raws)); + if(!"opinfo".equalsIgnoreCase(name.name)){ + log.warn("[TASKID {}] command [{}] not register. raw command:{}",config.getTaskId(),name,Strings.format(raws)); + } config.addOffset(offset[0]); offset[0] = 0L; continue; diff --git a/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbCommandSendStrategy.java b/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbCommandSendStrategy.java index a839e411..d29e4f5d 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbCommandSendStrategy.java +++ b/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbCommandSendStrategy.java @@ -138,7 +138,6 @@ public void run(Replication replication, KeyValueEventEntity eventEntity, TaskMo if(valueString.getBatch()==0){ String res=client.set(duNum,valueString.getKey(), valueString.getValue()); iSyncerCompensator.set(duNum,valueString.getKey(), valueString.getValue(),res); - log.info("string set key 2 set key: {} ",stringKey); }else { Long res=client.append(duNum,valueString.getKey(), valueString.getValue()); diff --git a/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbMultiCommandSendStrategy.java b/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbMultiCommandSendStrategy.java index 1aa3d5ae..dc7589d2 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbMultiCommandSendStrategy.java +++ b/syncer-transmission/src/main/java/syncer/transmission/strategy/commandprocessing/impl/CommandProcessingRdbMultiCommandSendStrategy.java @@ -135,7 +135,6 @@ public void run(Replication replication, KeyValueEventEntity eventEntity, TaskMo if(valueString.getBatch()==0){ String res=client.set(duNum,valueString.getKey(), valueString.getValue()); iSyncerCompensator.set(duNum,valueString.getKey(), valueString.getValue(),res); - log.info("string set key 2 set key:{}",stringKey); }else { Long res=client.append(duNum,valueString.getKey(), valueString.getValue()); @@ -152,7 +151,6 @@ public void run(Replication replication, KeyValueEventEntity eventEntity, TaskMo String res=client.set(duNum,valueString.getKey(), valueString.getValue(),ms); iSyncerCompensator.set(duNum,valueString.getKey(), valueString.getValue(),res); // log.info("string set key 2 set {} {}",valueString.getKey(),valueString.getValue()); - log.info("string set key 2 set key: {} ms:{}",stringKey,ms); }else { Long res=client.append(duNum,valueString.getKey(), valueString.getValue()); iSyncerCompensator.append(duNum,valueString.getKey(), valueString.getValue(),res); diff --git a/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataCommandUpTransmissionTask.java b/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataCommandUpTransmissionTask.java index 6253c31a..c036c29a 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataCommandUpTransmissionTask.java +++ b/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataCommandUpTransmissionTask.java @@ -13,7 +13,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; -import syncer.common.util.TemplateUtils; import syncer.common.util.ThreadPoolUtils; import syncer.common.util.file.FileUtils; import syncer.replica.config.RedisURI; @@ -31,7 +30,6 @@ import syncer.transmission.model.TaskModel; import syncer.transmission.util.manger.DefaultSyncerStatusManger; import syncer.transmission.util.redis.RedisReplIdCheck; -import syncer.transmission.util.redis.RedisUrlCheck; import syncer.transmission.util.taskStatus.SingleTaskDataManagerUtils; import java.io.*; @@ -299,4 +297,4 @@ public void run() { } -} \ No newline at end of file +} diff --git a/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataSyncTransmissionTask.java b/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataSyncTransmissionTask.java index faa01978..bb5e1011 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataSyncTransmissionTask.java +++ b/syncer-transmission/src/main/java/syncer/transmission/task/RedisDataSyncTransmissionTask.java @@ -11,7 +11,6 @@ package syncer.transmission.task; -import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -28,7 +27,6 @@ import syncer.replica.listener.TaskStatusListener; import syncer.replica.listener.ValueDumpIterableEventListener; import syncer.replica.parser.syncer.ValueDumpIterableRdbParser; -import syncer.replica.parser.syncer.datatype.DumpKeyValuePairEvent; import syncer.replica.register.DefaultCommandRegister; import syncer.replica.replication.RedisReplication; import syncer.replica.replication.Replication; @@ -36,7 +34,6 @@ import syncer.replica.type.SyncType; import syncer.replica.util.SyncTypeUtils; import syncer.replica.util.TaskRunTypeEnum; -import syncer.replica.util.strings.Strings; import syncer.transmission.checkpoint.breakpoint.BreakPoint; import syncer.transmission.client.RedisClient; import syncer.transmission.client.RedisClientFactory; @@ -56,7 +53,6 @@ import syncer.transmission.util.taskStatus.SingleTaskDataManagerUtils; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; diff --git a/syncer-transmission/src/main/java/syncer/transmission/task/RedisMultiSyncBreakingRingByAuxiliaryKeyTransmissionTask.java b/syncer-transmission/src/main/java/syncer/transmission/task/RedisMultiSyncBreakingRingByAuxiliaryKeyTransmissionTask.java index 4f0e17b4..1c44786c 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/task/RedisMultiSyncBreakingRingByAuxiliaryKeyTransmissionTask.java +++ b/syncer-transmission/src/main/java/syncer/transmission/task/RedisMultiSyncBreakingRingByAuxiliaryKeyTransmissionTask.java @@ -14,7 +14,6 @@ import lombok.extern.slf4j.Slf4j; import syncer.jedis.Protocol; import syncer.replica.config.RedisURI; -import syncer.replica.constant.RedisType; import syncer.replica.datatype.command.DefaultCommand; import syncer.replica.entity.RedisDB; import syncer.replica.event.Event; diff --git a/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java b/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java index 961b8732..ba31abe3 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java +++ b/syncer-transmission/src/main/java/syncer/transmission/util/redis/KeyCountUtils.java @@ -20,7 +20,6 @@ import syncer.transmission.util.sql.SqlOPUtils; import syncer.transmission.util.taskStatus.SingleTaskDataManagerUtils; -import java.net.URISyntaxException; import java.util.List; import java.util.Objects; @@ -87,4 +86,4 @@ public synchronized static void updateKeyCount(String taskId, RedisURI suri){ } -} \ No newline at end of file +} diff --git a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java index 07c9085d..5336e05c 100644 --- a/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java +++ b/syncer-transmission/src/main/java/syncer/transmission/util/redis/RedisUrlCheck.java @@ -14,13 +14,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; import syncer.common.exception.TaskMsgException; -import syncer.common.exception.TaskRestoreException; import syncer.jedis.Jedis; import syncer.jedis.exceptions.JedisDataException; import syncer.replica.cmd.CMD; import syncer.replica.config.RedisURI; import syncer.replica.config.ReplicConfig; -import syncer.replica.constant.RedisType; import syncer.transmission.constants.TaskMsgConstant; import syncer.transmission.util.code.CodeUtils; diff --git a/syncer-webapp/pom.xml b/syncer-webapp/pom.xml index c7815066..8450ed10 100644 --- a/syncer-webapp/pom.xml +++ b/syncer-webapp/pom.xml @@ -9,7 +9,7 @@ syncer syncer-webapp - 3.3.3.7 + 3.3.3.8 redissyncer-server Demo project for Spring Boot