Skip to content

Commit

Permalink
fixed:fix the error of invalid username password pair when password #…
Browse files Browse the repository at this point in the history
… exists
  • Loading branch information
pingxingshikong committed Aug 8, 2024
1 parent f7e8fe1 commit 12c0360
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 55 deletions.
62 changes: 31 additions & 31 deletions syncer-jedis/src/main/java/syncer/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1286,17 +1286,17 @@ public void bitfield(final byte[] key, final byte[]... value) {
public void hstrlen(final byte[] key, final byte[] field) {
sendCommand(HSTRLEN, key, field);
}

public void xadd(final byte[] key, final byte[] id, final Map<byte[], byte[]> hash, long maxLen, boolean approximateLength) {
int maxLexArgs = 0;
if(maxLen < Long.MAX_VALUE) { // optional arguments
if(approximateLength) {
maxLexArgs = 3; // e.g. MAXLEN ~ 1000
maxLexArgs = 3; // e.g. MAXLEN ~ 1000
} else {
maxLexArgs = 2; // e.g. MAXLEN 1000
}
}

final byte[][] params = new byte[2 + maxLexArgs + hash.size() * 2][];
int index = 0;
params[index++] = key;
Expand All @@ -1307,23 +1307,23 @@ public void xadd(final byte[] key, final byte[] id, final Map<byte[], byte[]> ha
}
params[index++] = toByteArray(maxLen);
}

params[index++] = id;
for (final Entry<byte[], byte[]> entry : hash.entrySet()) {
params[index++] = entry.getKey();
params[index++] = entry.getValue();
}
sendCommand(XADD, params);
}

public void xlen(final byte[] key) {
sendCommand(XLEN, key);
}
public void xrange(final byte[] key, final byte[] start, final byte[] end, final long count) {

public void xrange(final byte[] key, final byte[] start, final byte[] end, final long count) {
sendCommand(XRANGE, key, start, end, Keyword.COUNT.raw, toByteArray(count));
}

public void xrevrange(final byte[] key, final byte[] end, final byte[] start, final int count) {
sendCommand(XREVRANGE, key, end, start, Keyword.COUNT.raw, toByteArray(count));
}
Expand All @@ -1338,18 +1338,18 @@ public void xread(final int count, final long block, final Map<byte[], byte[]> s
params[streamsIndex++] = Keyword.BLOCK.raw;
params[streamsIndex++] = toByteArray(block);
}

params[streamsIndex++] = Keyword.STREAMS.raw;
int idsIndex = streamsIndex + streams.size();

for (final Entry<byte[], byte[]> entry : streams.entrySet()) {
params[streamsIndex++] = entry.getKey();
params[idsIndex++] = entry.getValue();
}

sendCommand(XREAD, params);
}

public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
final byte[][] params = new byte[2 + ids.length][];
int index = 0;
Expand All @@ -1360,27 +1360,27 @@ public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
}
sendCommand(XACK, params);
}

public void xgroupCreate(final byte[] key, final byte[] groupname, final byte[] id, boolean makeStream) {
if(makeStream) {
sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id, Keyword.MKSTREAM.raw);
sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id, Keyword.MKSTREAM.raw);
} else {
sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id);
sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id);
}
}

public void xgroupSetID(final byte[] key, final byte[] groupname, final byte[] id) {
sendCommand(XGROUP, Keyword.SETID.raw, key, groupname, id);
sendCommand(XGROUP, Keyword.SETID.raw, key, groupname, id);
}

public void xgroupDestroy(final byte[] key, final byte[] groupname) {
sendCommand(XGROUP, Keyword.DESTROY.raw, key, groupname);
sendCommand(XGROUP, Keyword.DESTROY.raw, key, groupname);
}

public void xgroupDelConsumer(final byte[] key, final byte[] groupname, final byte[] consumerName) {
sendCommand(XGROUP, Keyword.DELCONSUMER.raw, key, groupname, consumerName);
sendCommand(XGROUP, Keyword.DELCONSUMER.raw, key, groupname, consumerName);
}

public void xdel(final byte[] key, final byte[]... ids) {
final byte[][] params = new byte[1 + ids.length][];
int index = 0;
Expand All @@ -1390,17 +1390,17 @@ public void xdel(final byte[] key, final byte[]... ids) {
}
sendCommand(XDEL, params);
}

public void xtrim(byte[] key, long maxLen, boolean approximateLength) {
if(approximateLength) {
sendCommand(XTRIM, key, Keyword.MAXLEN.raw, Protocol.BYTES_TILDE ,toByteArray(maxLen));
} else {
sendCommand(XTRIM, key, Keyword.MAXLEN.raw, toByteArray(maxLen));
}
}

public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map<byte[], byte[]> streams) {

int optional = 0;
if(count>0) {
optional += 2;
Expand All @@ -1411,8 +1411,8 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
if(noAck) {
optional += 1;
}


final byte[][] params = new byte[4 + optional + streams.size() * 2][];

int streamsIndex = 0;
Expand All @@ -1431,17 +1431,17 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
params[streamsIndex++] = Keyword.NOACK.raw;
}
params[streamsIndex++] = Keyword.STREAMS.raw;

int idsIndex = streamsIndex + streams.size();
for (final Entry<byte[], byte[]> entry : streams.entrySet()) {
params[streamsIndex++] = entry.getKey();
params[idsIndex++] = entry.getValue();
}

sendCommand(XREADGROUP, params);
}


public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) {
if(consumername == null) {
sendCommand(XPENDING, key, groupname, start, end, toByteArray(count));
Expand All @@ -1451,27 +1451,27 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int
}

public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids) {

ArrayList<byte[]> arguments = new ArrayList<>(10 + ids.length);

arguments.add(key);
arguments.add(groupname);
arguments.add(consumername);
arguments.add(toByteArray(minIdleTime));

for(byte[] id : ids) {
arguments.add(id);
arguments.add(id);
}
if(newIdleTime > 0) {
arguments.add(Keyword.IDLE.raw);
arguments.add(toByteArray(newIdleTime));
}
if(retries > 0) {
arguments.add(Keyword.RETRYCOUNT.raw);
arguments.add(toByteArray(retries));
arguments.add(toByteArray(retries));
}
if(force) {
arguments.add(Keyword.FORCE.raw);
arguments.add(Keyword.FORCE.raw);
}
sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][]));
}
Expand Down
18 changes: 15 additions & 3 deletions syncer-replica/src/main/java/syncer/replica/config/RedisURI.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import syncer.replica.type.FileType;
import syncer.replica.util.strings.Strings;


import java.io.*;
import java.net.MalformedURLException;
import java.net.URI;
Expand Down Expand Up @@ -157,15 +156,25 @@ private void parse(String uri) throws URISyntaxException {
}
}


String authPassword="";
if(uri.contains("authPassword=")){
authPassword=uri.substring(uri.indexOf("authPassword=")+13);
if(authPassword.contains("&")){
authPassword=authPassword.substring(0,authPassword.indexOf("&"));
}
}
if (this.userInfo != null) {
int idx = this.userInfo.indexOf(':');
if (idx < 0) {
this.user = this.userInfo;
} else if (idx == 0) {
this.password = this.userInfo.substring(idx + 1);
// this.password = this.userInfo.substring(idx + 1);
this.password =authPassword;
} else /*(idx > 0)*/{
this.user = this.userInfo.substring(0, idx);
String password = this.userInfo.substring(idx + 1);
// String password = this.userInfo.substring(idx + 1);
String password =authPassword;
if (password != null && password.length() != 0) {
this.password = password;
}
Expand Down Expand Up @@ -201,6 +210,9 @@ private void parse(String uri) throws URISyntaxException {
if (key.length() > 0 && value.length() > 0) {
parameters.put(decode(key.toString()), decode(value.toString()));
}
if(!"".equals(authPassword)) {
parameters.put("authPassword", authPassword);
}
}

private static String decode(String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ protected void doOpen() throws IOException, IncrementException {
CommandName name = CommandName.name(Strings.toString(raw[0]));
final CommandParser<? extends Command> parser;
if ((parser = commands.get(name)) == null) {
log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw));
if(!"opinfo".equalsIgnoreCase(name.name)){
log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw));
}
config.addOffset(offset[0]);
offset[0] = 0L;
continue;
Expand All @@ -195,4 +197,4 @@ protected void doOpen() throws IOException, IncrementException {
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ protected void doOpen() throws IOException, IncrementException {
CommandName name = CommandName.name(Strings.toString(raw[0]));
final CommandParser<? extends Command> parser;
if ((parser = commands.get(name)) == null) {
log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw));
if(!"opinfo".equalsIgnoreCase(name.name)){
log.warn("command [{}] not register. raw command:{}", name, Strings.format(raw));
}
config.addOffset(offset[0]);
offset[0] = 0L;
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,4 @@ public void close() throws IOException {
public void broken(String reason) throws IOException {
replication.broken(reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public boolean open() throws IOException, IncrementException, RedisAuthErrorExce
CommandName name = CommandName.name(Strings.toString(raws[0]));
final CommandParser<? extends Command> parser;
if(Objects.isNull(parser=socketReplication.getCommandParser(name))){
log.warn("[TASKID {}] command [{}] not register. raw command:{}",config.getTaskId(),name,Strings.format(raws));
if(!"opinfo".equalsIgnoreCase(name.name)){
log.warn("[TASKID {}] command [{}] not register. raw command:{}",config.getTaskId(),name,Strings.format(raws));
}
config.addOffset(offset[0]);
offset[0] = 0L;
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ public void run(Replication replication, KeyValueEventEntity eventEntity, TaskMo
if(valueString.getBatch()==0){
String res=client.set(duNum,valueString.getKey(), valueString.getValue());
iSyncerCompensator.set(duNum,valueString.getKey(), valueString.getValue(),res);
log.info("string set key 2 set key: {} ",stringKey);

}else {
Long res=client.append(duNum,valueString.getKey(), valueString.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public void run(Replication replication, KeyValueEventEntity eventEntity, TaskMo
if(valueString.getBatch()==0){
String res=client.set(duNum,valueString.getKey(), valueString.getValue());
iSyncerCompensator.set(duNum,valueString.getKey(), valueString.getValue(),res);
log.info("string set key 2 set key:{}",stringKey);

}else {
Long res=client.append(duNum,valueString.getKey(), valueString.getValue());
Expand All @@ -152,7 +151,6 @@ public void run(Replication replication, KeyValueEventEntity eventEntity, TaskMo
String res=client.set(duNum,valueString.getKey(), valueString.getValue(),ms);
iSyncerCompensator.set(duNum,valueString.getKey(), valueString.getValue(),res);
// log.info("string set key 2 set {} {}",valueString.getKey(),valueString.getValue());
log.info("string set key 2 set key: {} ms:{}",stringKey,ms);
}else {
Long res=client.append(duNum,valueString.getKey(), valueString.getValue());
iSyncerCompensator.append(duNum,valueString.getKey(), valueString.getValue(),res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import syncer.common.util.TemplateUtils;
import syncer.common.util.ThreadPoolUtils;
import syncer.common.util.file.FileUtils;
import syncer.replica.config.RedisURI;
Expand All @@ -31,7 +30,6 @@
import syncer.transmission.model.TaskModel;
import syncer.transmission.util.manger.DefaultSyncerStatusManger;
import syncer.transmission.util.redis.RedisReplIdCheck;
import syncer.transmission.util.redis.RedisUrlCheck;
import syncer.transmission.util.taskStatus.SingleTaskDataManagerUtils;

import java.io.*;
Expand Down Expand Up @@ -299,4 +297,4 @@ public void run() {
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package syncer.transmission.task;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,15 +27,13 @@
import syncer.replica.listener.TaskStatusListener;
import syncer.replica.listener.ValueDumpIterableEventListener;
import syncer.replica.parser.syncer.ValueDumpIterableRdbParser;
import syncer.replica.parser.syncer.datatype.DumpKeyValuePairEvent;
import syncer.replica.register.DefaultCommandRegister;
import syncer.replica.replication.RedisReplication;
import syncer.replica.replication.Replication;
import syncer.replica.status.TaskStatus;
import syncer.replica.type.SyncType;
import syncer.replica.util.SyncTypeUtils;
import syncer.replica.util.TaskRunTypeEnum;
import syncer.replica.util.strings.Strings;
import syncer.transmission.checkpoint.breakpoint.BreakPoint;
import syncer.transmission.client.RedisClient;
import syncer.transmission.client.RedisClientFactory;
Expand All @@ -56,7 +53,6 @@
import syncer.transmission.util.taskStatus.SingleTaskDataManagerUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import lombok.extern.slf4j.Slf4j;
import syncer.jedis.Protocol;
import syncer.replica.config.RedisURI;
import syncer.replica.constant.RedisType;
import syncer.replica.datatype.command.DefaultCommand;
import syncer.replica.entity.RedisDB;
import syncer.replica.event.Event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import syncer.transmission.util.sql.SqlOPUtils;
import syncer.transmission.util.taskStatus.SingleTaskDataManagerUtils;

import java.net.URISyntaxException;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -87,4 +86,4 @@ public synchronized static void updateKeyCount(String taskId, RedisURI suri){

}

}
}
Loading

0 comments on commit 12c0360

Please sign in to comment.