Skip to content

Commit

Permalink
Revert "offset"
Browse files Browse the repository at this point in the history
This reverts commit 1728704.
  • Loading branch information
Baoyi Chen committed Dec 9, 2023
1 parent 8f89366 commit e8cd7b7
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.moilioncircle.redis.replicator;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -202,7 +202,7 @@ public static Configuration defaultSetting() {
/**
* @since 3.8.1
*/
private Map<String, Object> cookies = new ConcurrentHashMap<>(4);
private Map<String, Object> context = new HashMap<>(4);

public int getConnectionTimeout() {
return connectionTimeout;
Expand Down Expand Up @@ -452,14 +452,12 @@ public Configuration setScanStep(int scanStep) {
return this;
}

@SuppressWarnings("unchecked")
public <T> T getCookie(String key) {
return (T) this.cookies.get(key);
public Map<String, Object> getContext() {
return context;
}

@SuppressWarnings("unchecked")
public <T> T setCookie(String key, Object value) {
return (T) this.cookies.put(key, value);
public void setContext(Map<String, Object> context) {
this.context = context;
}

public Configuration merge(SslConfiguration sslConfiguration) {
Expand Down Expand Up @@ -613,7 +611,7 @@ public String toString() {
", replStreamDB=" + replStreamDB +
", replOffset=" + replOffset +
", replFilters=" + Arrays.toString(replFilters) +
", cookies=" + cookies +
", context=" + context +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -143,8 +145,10 @@ protected SyncMode trySync(final String reply) throws IOException {
this.db = -1;
parseDump(this);
String[] ary = reply.split(" ");
configuration.setCookie(REPL_ID, ary[1]);
configuration.setCookie(REPL_OFFSET, ary[2]);
Map<String, Object> context = new HashMap<>(4);
context.put(REPL_ID, ary[1]);
context.put(REPL_OFFSET, ary[2]);
configuration.setContext(context);
return PSYNC;
} else if (reply.startsWith("CONTINUE")) {
String[] ary = reply.split(" ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.moilioncircle.redis.replicator.Constants.REPL_OFFSET;
import static com.moilioncircle.redis.replicator.Constants.REPL_STREAM_DB;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;

import java.io.IOException;
import java.util.List;
Expand All @@ -52,7 +53,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.moilioncircle.redis.replicator.Configuration;
import com.moilioncircle.redis.replicator.Replicator;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
Expand Down Expand Up @@ -162,10 +162,12 @@ public Event applyAux(RedisInputStream in, int version) throws IOException {
if (logger.isInfoEnabled()) {
logger.info("RDB {}: {}", auxKey, auxValue);
}
Configuration configuration = replicator.getConfiguration();
if (auxKey.equals("repl-id")) configuration.setCookie(REPL_ID, auxValue);
if (auxKey.equals("repl-offset")) configuration.setCookie(REPL_OFFSET, auxValue);
if (auxKey.equals("repl-stream-db")) configuration.setCookie(REPL_STREAM_DB, auxValue);
Map<String, Object> context = replicator.getConfiguration().getContext();
if (context != null) {
if (auxKey.equals("repl-id")) context.put(REPL_ID, auxValue);
if (auxKey.equals("repl-offset")) context.put(REPL_OFFSET, auxValue);
if (auxKey.equals("repl-stream-db")) context.put(REPL_STREAM_DB, auxValue);
}
return new AuxField(auxKey, auxValue);
} else {
if (logger.isWarnEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@
import static com.moilioncircle.redis.replicator.Constants.REPL_STREAM_DB;
import static com.moilioncircle.redis.replicator.Status.CONNECTED;
import static com.moilioncircle.redis.replicator.util.Tuples.of;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;

import java.io.IOException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -298,16 +297,13 @@ public long parse() throws IOException {
}

// we should set repl_id, repl_offset, repl_stream_db after full sync done to avoid losing data.
Configuration configuration = replicator.getConfiguration();

String replId = configuration.getCookie(REPL_ID);
if (replId != null) configuration.setReplId(replId);

String replOffset = configuration.getCookie(REPL_OFFSET);
if (replOffset != null) configuration.setReplOffset(parseLong(replOffset));

String replStreamDB = configuration.getCookie(REPL_STREAM_DB);
if (replStreamDB != null) configuration.setReplStreamDB(parseInt(replStreamDB));
Map<String, Object> context = replicator.getConfiguration().getContext();
if (context != null) {
Configuration conf = replicator.getConfiguration();
if (context.containsKey(REPL_ID)) conf.setReplId((String) context.get(REPL_ID));
if (context.containsKey(REPL_OFFSET)) conf.setReplOffset(Long.parseLong((String) context.get(REPL_OFFSET)));
if (context.containsKey(REPL_STREAM_DB)) conf.setReplStreamDB(Integer.parseInt((String) context.get(REPL_STREAM_DB)));
}
return offset;
}
}
Expand Down

0 comments on commit e8cd7b7

Please sign in to comment.