diff --git a/server/src/test/java/com/github/dtprj/dongting/raft/server/ConfigChangeTest.java b/server/src/test/java/com/github/dtprj/dongting/raft/server/ConfigChangeTest.java new file mode 100644 index 00000000..fb9e5b8b --- /dev/null +++ b/server/src/test/java/com/github/dtprj/dongting/raft/server/ConfigChangeTest.java @@ -0,0 +1,72 @@ +/* + * Copyright The Dongting Project + * + * The Dongting Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.github.dtprj.dongting.raft.server; + +import com.github.dtprj.dongting.net.HostPort; +import com.github.dtprj.dongting.raft.RaftNode; +import com.github.dtprj.dongting.raft.test.TestUtil; +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * @author huangli + */ +public class ConfigChangeTest extends ServerTestBase { + + @Test + void test() throws Exception { + String servers = "2,127.0.0.1:4002;3,127.0.0.1:4003"; + String members = "2,3"; + ServerInfo s2 = createServer(2, servers, members, ""); + ServerInfo s3 = createServer(3, servers, members, ""); + waitStart(s2); + waitStart(s3); + + int leaderId = waitLeaderElectAndGetLeaderId(s2, s3); + + RaftNode n4 = new RaftNode(4, new HostPort("127.0.0.1", 4004)); + s2.raftServer.addNode(n4, 1000); + s3.raftServer.addNode(n4, 1000); + + ServerInfo leader = leaderId == 2 ? s2 : s3; + ServerInfo follower = leaderId == 2 ? s3 : s2; + CompletableFuture f = leader.group.leaderPrepareJointConsensus(Set.of(2, 3, 4), new HashSet<>()); + long prepareIndex = f.get(5, TimeUnit.SECONDS); + + put(leader, "k1", "v1"); // make follower apply the prepare operation as soon as possible + // commit will check follower apply to prepareIndex + TestUtil.waitUtil(() -> follower.group.getGroupComponents().getRaftStatus() + .getShareStatus().lastApplied >= prepareIndex); + + f = leader.group.leaderCommitJointConsensus(prepareIndex); + f.get(5, TimeUnit.SECONDS); + + ServerInfo s4 = createServer(4, "2,127.0.0.1:4002;3,127.0.0.1:4003;4,127.0.0.1:4004", "2,3,4", ""); + waitStart(s4); + + assertTrue(() -> s4.group.getGroupComponents().getRaftStatus().getShareStatus().lastApplied >= prepareIndex); + + waitStop(s2); + waitStop(s3); + waitStop(s4); + } +} diff --git a/server/src/test/java/com/github/dtprj/dongting/raft/server/ServerTestBase.java b/server/src/test/java/com/github/dtprj/dongting/raft/server/ServerTestBase.java index de4afeba..3d9464b7 100644 --- a/server/src/test/java/com/github/dtprj/dongting/raft/server/ServerTestBase.java +++ b/server/src/test/java/com/github/dtprj/dongting/raft/server/ServerTestBase.java @@ -16,6 +16,7 @@ package com.github.dtprj.dongting.raft.server; import com.github.dtprj.dongting.buf.DefaultPoolFactory; +import com.github.dtprj.dongting.common.ByteArray; import com.github.dtprj.dongting.common.DtTime; import com.github.dtprj.dongting.dtkv.server.DtKV; import com.github.dtprj.dongting.dtkv.server.KvConfig; @@ -33,6 +34,7 @@ import com.github.dtprj.dongting.raft.store.StoreAccessor; import com.github.dtprj.dongting.raft.store.TestDir; import com.github.dtprj.dongting.raft.test.MockExecutors; +import com.github.dtprj.dongting.raft.test.TestUtil; import java.io.File; import java.io.RandomAccessFile; @@ -40,8 +42,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32C; import static com.github.dtprj.dongting.util.Tick.tick; @@ -166,4 +170,44 @@ protected void waitStart(ServerInfo si) throws Exception { protected void waitStop(ServerInfo si) { si.raftServer.stop(new DtTime(5, TimeUnit.SECONDS)); } + + protected int waitLeaderElectAndGetLeaderId(ServerInfo... servers) { + AtomicInteger leaderId = new AtomicInteger(); + TestUtil.waitUtil(() -> { + int leader = 0; + for (ServerInfo server : servers) { + if (server.raftServer.getRaftGroup(1).isLeader()) { + leader++; + leaderId.set(server.nodeId); + } + } + if (leader > 1) { + throw new RuntimeException("more than one leader"); + } + return leader == 1; + }); + return leaderId.get(); + } + + protected long put(ServerInfo leader, String key, String value) { + RaftInput ri = new RaftInput(DtKV.BIZ_TYPE_PUT, new ByteArray(key.getBytes()), + new ByteArray(value.getBytes()), new DtTime(3, TimeUnit.SECONDS), false); + CompletableFuture f = new CompletableFuture<>(); + leader.group.submitLinearTask(ri, new RaftCallback() { + @Override + public void success(long raftIndex, Object result) { + f.complete(raftIndex); + } + + @Override + public void fail(Throwable ex) { + f.completeExceptionally(ex); + } + }); + try { + return f.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/server/src/test/java/com/github/dtprj/dongting/raft/server/VoteTest.java b/server/src/test/java/com/github/dtprj/dongting/raft/server/VoteTest.java index 30c5f28e..2fcf1ffc 100644 --- a/server/src/test/java/com/github/dtprj/dongting/raft/server/VoteTest.java +++ b/server/src/test/java/com/github/dtprj/dongting/raft/server/VoteTest.java @@ -15,11 +15,8 @@ */ package com.github.dtprj.dongting.raft.server; -import com.github.dtprj.dongting.raft.test.TestUtil; import org.junit.jupiter.api.Test; -import java.util.concurrent.atomic.AtomicInteger; - import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -27,21 +24,6 @@ */ public class VoteTest extends ServerTestBase { - private int waitLeaderElectAndGetLeaderId(ServerInfo... servers) { - AtomicInteger leaderId = new AtomicInteger(); - TestUtil.waitUtil(() -> { - int leader = 0; - for (ServerInfo server : servers) { - if (server.raftServer.getRaftGroup(1).isLeader()) { - leader++; - leaderId.set(server.nodeId); - } - } - return leader == 1; - }); - return leaderId.get(); - } - @Test void test() throws Exception { String servers = "1,127.0.0.1:4001;2,127.0.0.1:4002;3,127.0.0.1:4003;4,127.0.0.1:4004";