Skip to content

Commit

Permalink
review: quorum sync only if all followers are upgraded
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhuw committed Sep 30, 2023
1 parent 9f790f4 commit 76285b5
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public static class Proposal extends SyncedLearnerTracker {
public long pingXid = -1;
public final List<Request> pendingSyncs = new ArrayList<>();

boolean isRead() {
return pingXid > 0;
}

boolean isWrite() {
return !isRead();
}

@Override
public String toString() {
return packet.getType() + ", 0x" + Long.toHexString(packet.getZxid()) + ", " + request
Expand Down Expand Up @@ -156,6 +164,14 @@ public List<LearnerHandler> getLearners() {

// list of followers that are ready to follow (i.e synced with the leader)
private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<>();
volatile int followersProtocolVersion = ProtocolVersion.CURRENT;

void recalculateFollowersVersion() {
followersProtocolVersion = forwardingFollowers.stream()
.mapToInt(LearnerHandler::getVersion)
.min()
.orElse(ProtocolVersion.CURRENT);
}

/**
* Returns a copy of the current forwarding follower snapshot
Expand All @@ -181,6 +197,7 @@ public List<LearnerHandler> getNonVotingFollowers() {
void addForwardingFollower(LearnerHandler lh) {
synchronized (forwardingFollowers) {
forwardingFollowers.add(lh);
recalculateFollowersVersion();
/*
* Any changes on forwardiongFollowers could possible affect the need of Oracle.
* */
Expand Down Expand Up @@ -255,6 +272,7 @@ public void addLearnerHandler(LearnerHandler learner) {
public void removeLearnerHandler(LearnerHandler peer) {
synchronized (forwardingFollowers) {
forwardingFollowers.remove(peer);
recalculateFollowersVersion();
}
synchronized (learners) {
learners.remove(peer);
Expand Down Expand Up @@ -932,10 +950,12 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol
// concurrent reconfigs are allowed, this can happen.
Proposal previous = outstandingProposals.get(zxid - 1);
if (previous != null) {
if (previous.pingXid < 0) {
if (previous.isWrite()) {
return false;
}
// Quorum sync leader is leading old version servers, it probably will never get enough acks.
// It is possible in case of downgrading, leader probably will never get enough acks.
//
// These lines are probably should be reverted in new major version.
outstandingProposals.remove(zxid - 1);
commitQuorumSync(previous);
}
Expand Down Expand Up @@ -1009,6 +1029,9 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
if (p.isRead()) {
continue;
}
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
}
Expand Down Expand Up @@ -1040,6 +1063,9 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
return;
} else if (p.isRead()) {
// The write proposal already completed.
return;
}

if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) {
Expand Down Expand Up @@ -1092,8 +1118,8 @@ private synchronized void processQuorumSyncAck(long sid, long zxid, long xid) {
LOG.debug("Receive dated quorum sync zxid 0x{}, xid {} with no ping", Long.toHexString(zxid), xid);
return;
}
if (p.pingXid < 0) {
// This is not possible as proposal will get new zxid.
if (p.isWrite()) {
// This is not possible as write proposal will get new zxid.
LOG.error("Receive quorum sync zxid 0x{}, xid {}, proposal {}", Long.toHexString(zxid), xid, p);
return;
} else if (xid < p.pingXid) {
Expand Down Expand Up @@ -1375,8 +1401,15 @@ public synchronized void processSync(Request r) {
if (p != null) {
p.pendingSyncs.add(r);
pendingSyncs.incrementAndGet();
} else if (followersProtocolVersion < ProtocolVersion.VERSION_3_10_0) {
sendSync(r);
} else {
p = createQuorumSyncProposal(r);
if (p.hasAllQuorums()) {
// single server distributed mode.
sendSync(r);
return;
}
outstandingProposals.put(lastProposed, p);
pendingSyncs.incrementAndGet();
sendPacket(p.packet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Socket getSocket() {
protected InputArchive leaderIs;
protected OutputArchive leaderOs;
/** the protocol version of the leader */
protected int leaderProtocolVersion = 0x01;
protected int leaderProtocolVersion = ProtocolVersion.VERSION_ANCIENT;

private static final int BUFFERED_MESSAGE_SIZE = 10;
protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
Expand Down Expand Up @@ -491,7 +491,7 @@ protected long registerWithLeader(int pktType) throws IOException {
/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
LearnerInfo li = new LearnerInfo(self.getMyId(), ProtocolVersion.CURRENT, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ String getRemoteAddress() {
return sock == null ? "<null>" : sock.getRemoteSocketAddress().toString();
}

protected int version = 0x1;
protected int version = ProtocolVersion.VERSION_ANCIENT;

int getVersion() {
return version;
Expand Down Expand Up @@ -524,15 +524,15 @@ public void run() {
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

if (this.getVersion() < 0x10000) {
if (this.getVersion() < ProtocolVersion.VERSION_3_4_0) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
ByteBuffer.wrap(ver).putInt(ProtocolVersion.CURRENT);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
Expand Down Expand Up @@ -596,7 +596,7 @@ public void run() {
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
if (getVersion() < ProtocolVersion.VERSION_3_4_0) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum;

public class ProtocolVersion {
private ProtocolVersion() {}

/**
* Pre ZAB 1.0.
*/
public static final int VERSION_ANCIENT = 1;

/**
* ZAB 1.0.
*/
public static final int VERSION_3_4_0 = 0x10000;

/**
* Protocol changes:
* * Learner will piggyback whatever data leader attached in {@link Leader#PING} after session data.
* This way, leader is free to enhance {@link Leader#PING} in future without agreement from learner.
*/
public static final int VERSION_3_10_0 = 0x20000;

/**
* Point to the newest coding version.
*/
public static final int CURRENT = VERSION_3_10_0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long
assertEquals(1, l.self.getCurrentEpoch());

/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null);
Expand All @@ -526,7 +526,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT);
assertEquals(2, l.self.getAcceptedEpoch());
assertEquals(1, l.self.getCurrentEpoch());

Expand Down Expand Up @@ -596,14 +596,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT);
assertEquals(learnInfo.getServerid(), 0);

// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT);
qp.setData(protoBytes);
oa.writeRecord(qp, null);

Expand Down Expand Up @@ -730,14 +730,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT);
assertEquals(learnInfo.getServerid(), 0);

// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT);
qp.setData(protoBytes);
oa.writeRecord(qp, null);

Expand Down Expand Up @@ -830,7 +830,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro
assertEquals(0, l.self.getCurrentEpoch());

/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
Expand All @@ -839,7 +839,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT);
assertEquals(1, l.self.getAcceptedEpoch());
assertEquals(0, l.self.getCurrentEpoch());

Expand Down Expand Up @@ -871,7 +871,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro
assertEquals(0, l.self.getAcceptedEpoch());
assertEquals(0, l.self.getCurrentEpoch());

LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
Expand All @@ -880,7 +880,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT);
assertEquals(1, l.self.getAcceptedEpoch());
assertEquals(0, l.self.getCurrentEpoch());

Expand Down Expand Up @@ -962,14 +962,14 @@ public void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o)
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT);
assertEquals(learnInfo.getServerid(), 0);

// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT);
qp.setData(protoBytes);
oa.writeRecord(qp, null);

Expand Down Expand Up @@ -1074,7 +1074,7 @@ public void testLeaderBehind() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
/* we are going to say we last acked epoch 20 */
Expand All @@ -1083,7 +1083,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT);
qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
Expand Down Expand Up @@ -1112,15 +1112,15 @@ public void testAbandonBeforeACKEpoch() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT);
Thread.sleep(l.self.getInitLimit() * l.self.getTickTime() + 5000);

// The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced
Expand Down

0 comments on commit 76285b5

Please sign in to comment.