diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index f0ebc13f4da..3defeb8b85f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -98,6 +98,14 @@ public static class Proposal extends SyncedLearnerTracker { public long pingXid = -1; public final List pendingSyncs = new ArrayList<>(); + boolean isRead() { + return pingXid > 0; + } + + boolean isWrite() { + return !isRead(); + } + @Override public String toString() { return packet.getType() + ", 0x" + Long.toHexString(packet.getZxid()) + ", " + request @@ -156,6 +164,14 @@ public List getLearners() { // list of followers that are ready to follow (i.e synced with the leader) private final HashSet forwardingFollowers = new HashSet<>(); + volatile int followersProtocolVersion = ProtocolVersion.CURRENT; + + void recalculateFollowersVersion() { + followersProtocolVersion = forwardingFollowers.stream() + .mapToInt(LearnerHandler::getVersion) + .min() + .orElse(ProtocolVersion.CURRENT); + } /** * Returns a copy of the current forwarding follower snapshot @@ -181,6 +197,7 @@ public List getNonVotingFollowers() { void addForwardingFollower(LearnerHandler lh) { synchronized (forwardingFollowers) { forwardingFollowers.add(lh); + recalculateFollowersVersion(); /* * Any changes on forwardiongFollowers could possible affect the need of Oracle. * */ @@ -255,6 +272,7 @@ public void addLearnerHandler(LearnerHandler learner) { public void removeLearnerHandler(LearnerHandler peer) { synchronized (forwardingFollowers) { forwardingFollowers.remove(peer); + recalculateFollowersVersion(); } synchronized (learners) { learners.remove(peer); @@ -932,10 +950,12 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol // concurrent reconfigs are allowed, this can happen. Proposal previous = outstandingProposals.get(zxid - 1); if (previous != null) { - if (previous.pingXid < 0) { + if (previous.isWrite()) { return false; } - // Quorum sync leader is leading old version servers, it probably will never get enough acks. + // It is possible in case of downgrading, leader probably will never get enough acks. + // + // These lines are probably should be reverted in new major version. outstandingProposals.remove(zxid - 1); commitQuorumSync(previous); } @@ -1009,6 +1029,9 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA if (LOG.isTraceEnabled()) { LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid)); for (Proposal p : outstandingProposals.values()) { + if (p.isRead()) { + continue; + } long packetZxid = p.packet.getZxid(); LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid)); } @@ -1040,6 +1063,9 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; + } else if (p.isRead()) { + // The write proposal already completed. + return; } if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) { @@ -1092,8 +1118,8 @@ private synchronized void processQuorumSyncAck(long sid, long zxid, long xid) { LOG.debug("Receive dated quorum sync zxid 0x{}, xid {} with no ping", Long.toHexString(zxid), xid); return; } - if (p.pingXid < 0) { - // This is not possible as proposal will get new zxid. + if (p.isWrite()) { + // This is not possible as write proposal will get new zxid. LOG.error("Receive quorum sync zxid 0x{}, xid {}, proposal {}", Long.toHexString(zxid), xid, p); return; } else if (xid < p.pingXid) { @@ -1375,8 +1401,15 @@ public synchronized void processSync(Request r) { if (p != null) { p.pendingSyncs.add(r); pendingSyncs.incrementAndGet(); + } else if (followersProtocolVersion < ProtocolVersion.VERSION_3_10_0) { + sendSync(r); } else { p = createQuorumSyncProposal(r); + if (p.hasAllQuorums()) { + // single server distributed mode. + sendSync(r); + return; + } outstandingProposals.put(lastProposed, p); pendingSyncs.incrementAndGet(); sendPacket(p.packet); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 96290821073..f3375bd92b4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -104,7 +104,7 @@ public Socket getSocket() { protected InputArchive leaderIs; protected OutputArchive leaderOs; /** the protocol version of the leader */ - protected int leaderProtocolVersion = 0x01; + protected int leaderProtocolVersion = ProtocolVersion.VERSION_ANCIENT; private static final int BUFFERED_MESSAGE_SIZE = 10; protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); @@ -491,7 +491,7 @@ protected long registerWithLeader(int pktType) throws IOException { /* * Add sid to payload */ - LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion()); + LearnerInfo li = new LearnerInfo(self.getMyId(), ProtocolVersion.CURRENT, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 5dbfb963230..a201543495e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -104,7 +104,7 @@ String getRemoteAddress() { return sock == null ? "" : sock.getRemoteSocketAddress().toString(); } - protected int version = 0x1; + protected int version = ProtocolVersion.VERSION_ANCIENT; int getVersion() { return version; @@ -524,7 +524,7 @@ public void run() { long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); - if (this.getVersion() < 0x10000) { + if (this.getVersion() < ProtocolVersion.VERSION_3_4_0) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); @@ -532,7 +532,7 @@ public void run() { learnerMaster.waitForEpochAck(this.getSid(), ss); } else { byte[] ver = new byte[4]; - ByteBuffer.wrap(ver).putInt(0x10000); + ByteBuffer.wrap(ver).putInt(ProtocolVersion.CURRENT); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); messageTracker.trackSent(Leader.LEADERINFO); @@ -596,7 +596,7 @@ public void run() { // the version of this quorumVerifier will be set by leader.lead() in case // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if // we got here, so the version was set - if (getVersion() < 0x10000) { + if (getVersion() < ProtocolVersion.VERSION_3_4_0) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProtocolVersion.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProtocolVersion.java new file mode 100644 index 00000000000..a852b1ef5eb --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProtocolVersion.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +public class ProtocolVersion { + private ProtocolVersion() {} + + /** + * Pre ZAB 1.0. + */ + public static final int VERSION_ANCIENT = 1; + + /** + * ZAB 1.0. + */ + public static final int VERSION_3_4_0 = 0x10000; + + /** + * Protocol changes: + * * Learner will piggyback whatever data leader attached in {@link Leader#PING} after session data. + * This way, leader is free to enhance {@link Leader#PING} in future without agreement from learner. + */ + public static final int VERSION_3_10_0 = 0x20000; + + /** + * Point to the newest coding version. + */ + public static final int CURRENT = VERSION_3_10_0; +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index dcf7819b12b..d41cd5858d2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -517,7 +517,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long assertEquals(1, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null); @@ -526,7 +526,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); assertEquals(2, l.self.getAcceptedEpoch()); assertEquals(1, l.self.getCurrentEpoch()); @@ -596,14 +596,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -730,14 +730,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -830,7 +830,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro assertEquals(0, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); @@ -839,7 +839,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); assertEquals(1, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); @@ -871,7 +871,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro assertEquals(0, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); @@ -880,7 +880,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); assertEquals(1, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); @@ -962,14 +962,14 @@ public void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -1074,7 +1074,7 @@ public void testLeaderBehind() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); /* we are going to say we last acked epoch 20 */ @@ -1083,7 +1083,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); @@ -1112,7 +1112,7 @@ public void testAbandonBeforeACKEpoch() throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = new byte[20]; ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes)); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); @@ -1120,7 +1120,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); Thread.sleep(l.self.getInitLimit() * l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced