From 9364951ccb389315a5238a2fa37364a9ea2a243f Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Wed, 10 May 2023 15:38:35 +0800 Subject: [PATCH] ZOOKEEPER-1675: Make sync a quorum operation Previously, `sync` + `read` could not guarantee up-to-date data as `sync` will not touch quorum in case of no outstanding proposals. Though, `create`/`setData` could be used as an rescue, but it is apparently ugly and error-prone. `sync` fits the semantics naturally. This pr reverts ZOOKEEPER-2137 which using `setData` to circumvent no quorum `sync`. Since `sync` is a public API, so feature gate `quorumSync` is encouraged to be off in rolling upgrade and turn on after rolling upgrade. Refs: ZOOKEEPER-1675, ZOOKEEPER-2136, ZOOKEEPER-3600 --- .../main/resources/markdown/zookeeperAdmin.md | 6 + .../resources/markdown/zookeeperInternals.md | 2 +- .../server/quorum/CommitProcessor.java | 13 +- .../quorum/FollowerZooKeeperServer.java | 7 +- .../zookeeper/server/quorum/Leader.java | 189 ++++++++++++++---- .../server/quorum/LeaderZooKeeperServer.java | 2 +- .../zookeeper/server/quorum/Learner.java | 6 + .../server/quorum/LearnerHandler.java | 15 +- .../server/quorum/LearnerMaster.java | 2 + .../server/quorum/LearnerSyncRequest.java | 35 ---- .../server/quorum/ObserverMaster.java | 4 + .../quorum/ObserverZooKeeperServer.java | 2 +- .../quorum/ProposalRequestProcessor.java | 17 +- .../CommitProcessorConcurrencyTest.java | 2 +- .../quorum/CommitProcessorMetricsTest.java | 2 +- .../server/quorum/CommitProcessorTest.java | 2 +- .../server/quorum/RaceConditionTest.java | 2 +- .../apache/zookeeper/test/QuorumSyncTest.java | 92 +++++++++ .../apache/zookeeper/test/ReconfigTest.java | 27 +-- 19 files changed, 301 insertions(+), 126 deletions(-) delete mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumSyncTest.java diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 7a99cd346c6..2821e4299a9 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1209,6 +1209,12 @@ property, when available, is noted below. leader. The default value is false. +* *quorumSync* + (Java system property: **zookeeper.quorumSync**) + **New in 3.10.0** + When this property is set, `sync` will be a quorum operation. + + The default value is false. * *serializeLastProcessedZxid.enabled* (Jave system property: **zookeeper.serializeLastProcessedZxid.enabled**) diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md b/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md index 171f33ddb4c..d27ba0c9cfc 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md @@ -275,7 +275,7 @@ The [consistency](https://jepsen.io/consistency) guarantees of ZooKeeper lie bet Write operations in ZooKeeper are *linearizable*. In other words, each `write` will appear to take effect atomically at some point between when the client issues the request and receives the corresponding response. This means that the writes performed by all the clients in ZooKeeper can be totally ordered in such a way that respects the real-time ordering of these writes. However, merely stating that write operations are linearizable is meaningless unless we also talk about read operations. -Read operations in ZooKeeper are *not linearizable* since they can return potentially stale data. This is because a `read` in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a `read`. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are *sequentially consistent*, because `read` operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a `sync` before issuing a `read`. This too does **not** strictly guarantee up-to-date data because `sync` is [not currently a quorum operation](https://issues.apache.org/jira/browse/ZOOKEEPER-1675). To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than `syncLimit * tickTime`. Note that this is [unlikely](https://www.amazon.com/ZooKeeper-Distributed-Coordination-Flavio-Junqueira/dp/1449361307) to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the `sync` is served by the “leader” with stale data, thereby allowing the following `read` to be stale as well. The stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a `write`) is performed before a `read`. +Read operations in ZooKeeper are *not linearizable* since they can return potentially stale data. This is because a `read` in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a `read`. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are *sequentially consistent*, because `read` operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a `sync` before issuing a `read`. This does **not work prior 3.10.0** because `sync` is [not a quorum operation before](https://issues.apache.org/jira/browse/ZOOKEEPER-1675). To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than `syncLimit * tickTime`. Note that this is [unlikely](https://www.amazon.com/ZooKeeper-Distributed-Coordination-Flavio-Junqueira/dp/1449361307) to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the `sync` is served by the “leader” with stale data, thereby allowing the following `read` to be stale as well. Prior to 3.10.0, the stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a `write`) is performed before a `read`. Overall, the consistency guarantees of ZooKeeper are formally captured by the notion of [ordered sequential consistency](http://webee.technion.ac.il/people/idish/ftp/OSC-IPL17.pdf) or `OSC(U)` to be exact, which lies between sequential consistency and linearizability. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java index 0445bbdefff..5dc77d845a0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java @@ -149,17 +149,9 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP */ private static volatile int maxCommitBatchSize; - /** - * This flag indicates whether we need to wait for a response to come back from the - * leader or we just let the sync operation flow through like a read. The flag will - * be false if the CommitProcessor is in a Leader pipeline. - */ - boolean matchSyncs; - - public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) { + public CommitProcessor(RequestProcessor nextProcessor, String id, ZooKeeperServerListener listener) { super("CommitProcessor:" + id, listener); this.nextProcessor = nextProcessor; - this.matchSyncs = matchSyncs; } private boolean isProcessingRequest() { @@ -181,9 +173,8 @@ protected boolean needCommit(Request request) { case OpCode.reconfig: case OpCode.multi: case OpCode.setACL: - return true; case OpCode.sync: - return matchSyncs; + return true; case OpCode.createSession: case OpCode.closeSession: return !request.isLocalSession(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 82e56c390d8..b194c91db60 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -69,7 +69,7 @@ public Follower getFollower() { @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); - commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); @@ -117,9 +117,8 @@ public synchronized void sync() { } Request r = pendingSyncs.remove(); - if (r instanceof LearnerSyncRequest) { - LearnerSyncRequest lsr = (LearnerSyncRequest) r; - lsr.fh.queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null)); + if (r.getOwner() instanceof LearnerHandler) { + ((LearnerHandler) r.getOwner()).queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null)); } commitProcessor.commit(r); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 3b9c827c340..2e0eefa5203 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -33,7 +33,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -50,6 +49,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.security.sasl.SaslException; @@ -86,14 +86,22 @@ public class Leader extends LearnerMaster { LOG.info("TCP NoDelay set to: {}", nodelay); } + // This must be an invalid session id. + static final long PING_SESSION_END = 0; + + private static final int PING_PAYLOAD_TYPE_QUORUM_SYNC = 1; + public static class Proposal extends SyncedLearnerTracker { public QuorumPacket packet; public Request request; + public long pingXid = -1; + public final List pendingSyncs = new ArrayList<>(); @Override public String toString() { - return packet.getType() + ", " + packet.getZxid() + ", " + request; + return packet.getType() + ", 0x" + Long.toHexString(packet.getZxid()) + ", " + request + + ", " + pingXid + ", " + pendingSyncs; } } @@ -115,6 +123,14 @@ public static int getAckLoggingFrequency() { return ackLoggingFrequency; } + public static final String QUORUM_SYNC = "zookeeper.quorumSync"; + private static final boolean quorumSync; + + static { + quorumSync = Boolean.parseBoolean(System.getProperty(QUORUM_SYNC)); + LOG.info("{} = {}", QUORUM_SYNC, quorumSync); + } + final LeaderZooKeeperServer zk; final QuorumPeer self; @@ -215,11 +231,11 @@ public void resetObserverConnectionStats() { } } - // Pending sync requests. Must access under 'this' lock. - private final Map> pendingSyncs = new HashMap<>(); + long quorumPingXid = 0; + private final AtomicInteger pendingSyncs = new AtomicInteger(); public synchronized int getNumPendingSyncs() { - return pendingSyncs.size(); + return pendingSyncs.get(); } //Follower counter @@ -652,10 +668,7 @@ void lead() throws IOException, InterruptedException { } } - newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier()); - if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { - newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier()); - } + setupQuorumTracker(newLeaderProposal); // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get @@ -743,11 +756,7 @@ void lead() throws IOException, InterruptedException { // track synced learners to make sure we still have a // quorum of current (and potentially next pending) view. SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker(); - syncedAckSet.addQuorumVerifier(self.getQuorumVerifier()); - if (self.getLastSeenQuorumVerifier() != null - && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { - syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); - } + setupQuorumTracker(syncedAckSet); syncedAckSet.addAck(self.getMyId()); @@ -915,6 +924,13 @@ private long getDesignatedLeader(Proposal reconfigProposal, long zxid) { * @return True if committed, otherwise false. **/ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { + // in order to be committed, a proposal must be accepted by a quorum. + // + // getting a quorum from all necessary configurations. + if (!p.hasAllQuorums()) { + return false; + } + // make sure that ops are committed in order. With reconfigurations it is now possible // that different operations wait for different sets of acks, and we still want to enforce // that they are committed in order. Currently we only permit one outstanding reconfiguration @@ -922,15 +938,14 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol // pending all wait for a quorum of old and new config, so it's not possible to get enough acks // for an operation without getting enough acks for preceding ops. But in the future if multiple // concurrent reconfigs are allowed, this can happen. - if (outstandingProposals.containsKey(zxid - 1)) { - return false; - } - - // in order to be committed, a proposal must be accepted by a quorum. - // - // getting a quorum from all necessary configurations. - if (!p.hasAllQuorums()) { - return false; + Proposal previous = outstandingProposals.get(zxid - 1); + if (previous != null) { + if (previous.pingXid < 0) { + return false; + } + // Quorum sync leader is leading old version servers, it probably will never get enough acks. + outstandingProposals.remove(zxid - 1); + commitQuorumSync(previous); } // commit proposals in order @@ -980,11 +995,7 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol inform(p); } zk.commitProcessor.commit(p.request); - if (pendingSyncs.containsKey(zxid)) { - for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) { - sendSync(r); - } - } + commitPendingSyncs(p); return true; } @@ -1068,6 +1079,64 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA } } + void setupQuorumTracker(SyncedLearnerTracker tracker) { + QuorumVerifier verifier = self.getQuorumVerifier(); + QuorumVerifier lastSeen = self.getLastSeenQuorumVerifier(); + tracker.addQuorumVerifier(verifier); + if (lastSeen != null && lastSeen.getVersion() > verifier.getVersion()) { + tracker.addQuorumVerifier(lastSeen); + } + } + + private synchronized void commitQuorumSync(Proposal p) { + sendSync(p.request); + pendingSyncs.decrementAndGet(); + commitPendingSyncs(p); + } + + private synchronized void processQuorumSyncAck(long sid, long zxid, long xid) { + Proposal p = outstandingProposals.get(zxid); + if (p == null) { + LOG.debug("Receive dated quorum sync zxid 0x{}, xid {} with no ping", Long.toHexString(zxid), xid); + return; + } + if (p.pingXid < 0) { + // This is not possible as proposal will get new zxid. + LOG.error("Receive quorum sync zxid 0x{}, xid {}, proposal {}", Long.toHexString(zxid), xid, p); + return; + } else if (xid < p.pingXid) { + // It is possible to issue two quorum sync with no write in between. + LOG.debug("Receive dated quorum sync zxid 0x{}, xid {}, ping {}", Long.toHexString(zxid), xid, p); + return; + } else if (xid > p.pingXid) { + // It is not possible as new syncs are either collapsed into old one or + // started after old one completed. + LOG.error("Receive quorum sync zxid 0x{}, xid {}, dated ping {}", Long.toHexString(zxid), xid, p); + return; + } + p.addAck(sid); + if (LOG.isDebugEnabled()) { + LOG.debug("Receive quorum sync zxid 0x{}, xid {}, ping {}, acks {}", Long.toHexString(zxid), xid, p, p.ackSetsToString()); + } + if (p.hasAllQuorums()) { + outstandingProposals.remove(zxid); + commitQuorumSync(p); + } + } + + @Override + void processPing(long sid, long zxid, byte[] payload) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(payload); + DataInputStream dis = new DataInputStream(bis); + int type = dis.readInt(); + if (type != PING_PAYLOAD_TYPE_QUORUM_SYNC) { + String msg = String.format("invalid ping payload type: %d", type); + throw new IOException(msg); + } + long xid = dis.readLong(); + processQuorumSyncAck(sid, zxid, xid); + } + static class ToBeAppliedRequestProcessor implements RequestProcessor { private final RequestProcessor next; @@ -1263,16 +1332,11 @@ public Proposal propose(Request request) throws XidRolloverException { p.request = request; synchronized (this) { - p.addQuorumVerifier(self.getQuorumVerifier()); - if (request.getHdr().getType() == OpCode.reconfig) { self.setLastSeenQuorumVerifier(request.qv, true); } - if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) { - p.addQuorumVerifier(self.getLastSeenQuorumVerifier()); - } - + setupQuorumTracker(p); LOG.debug("Proposing:: {}", request); lastProposed = p.packet.getZxid(); @@ -1283,26 +1347,69 @@ public Proposal propose(Request request) throws XidRolloverException { return p; } + private static byte[] createQuorumSyncPingPayload(long xid) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(stream); + try { + output.writeLong(PING_SESSION_END); + output.writeInt(PING_PAYLOAD_TYPE_QUORUM_SYNC); + output.writeLong(xid); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + return stream.toByteArray(); + } + + private Proposal createQuorumSyncProposal(Request r) { + quorumPingXid += 1; + byte[] payload = createQuorumSyncPingPayload(quorumPingXid); + QuorumPacket packet = new QuorumPacket(Leader.PING, lastProposed, payload, null); + Proposal p = new Proposal(); + p.request = r; + p.packet = packet; + p.pingXid = quorumPingXid; + setupQuorumTracker(p); + p.addAck(self.getMyId()); + return p; + } + /** * Process sync requests * * @param r the request */ - - public synchronized void processSync(LearnerSyncRequest r) { - if (outstandingProposals.isEmpty()) { + public synchronized void processSync(Request r) { + Proposal p = outstandingProposals.get(lastProposed); + if (p != null) { + p.pendingSyncs.add(r); + pendingSyncs.incrementAndGet(); + } else if (!quorumSync) { sendSync(r); } else { - pendingSyncs.computeIfAbsent(lastProposed, k -> new ArrayList<>()).add(r); + p = createQuorumSyncProposal(r); + outstandingProposals.put(lastProposed, p); + pendingSyncs.incrementAndGet(); + sendPacket(p.packet); + } + } + + private synchronized void commitPendingSyncs(Proposal p) { + for (Request r : p.pendingSyncs) { + pendingSyncs.decrementAndGet(); + sendSync(r); } } /** * Sends a sync message to the appropriate server */ - public void sendSync(LearnerSyncRequest r) { - QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null); - r.fh.queuePacket(qp); + public void sendSync(Request r) { + if (r.getOwner() instanceof LearnerHandler) { + QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null); + ((LearnerHandler) r.getOwner()).queuePacket(qp); + } else { + zk.commitProcessor.commit(r); + } } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 2de080bd380..7a6f43e0ddf 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -66,7 +66,7 @@ public Leader getLeader() { protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); - commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index d534b8f45e9..96290821073 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -842,6 +842,12 @@ protected void ping(QuorumPacket qp) throws IOException { dos.writeLong(entry.getKey()); dos.writeInt(entry.getValue()); } + // Piggyback whatever leader/master sent + byte[] data = qp.getData(); + if (data != null && data.length != 0) { + dos.write(data); + } + QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); writePacket(pingReply, true); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index e9d5cd4e5e8..5dbfb963230 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -40,7 +40,6 @@ import javax.security.sasl.SaslException; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; -import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestRecord; @@ -687,6 +686,13 @@ public void run() { DataInputStream dis = new DataInputStream(bis); while (dis.available() > 0) { long sess = dis.readLong(); + if (sess == Leader.PING_SESSION_END) { + int n = dis.available(); + byte[] payload = new byte[n]; + dis.read(payload); + learnerMaster.processPing(this.sid, qp.getZxid(), payload); + break; + } int to = dis.readInt(); learnerMaster.touch(sess, to); } @@ -701,12 +707,7 @@ public void run() { cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); - Request si; - if (type == OpCode.sync) { - si = new LearnerSyncRequest(this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); - } else { - si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); - } + Request si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); si.setOwner(this); learnerMaster.submitLearnerRequest(si); requestsReceived.incrementAndGet(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java index 9bf6032af68..8ac77f9f027 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java @@ -188,6 +188,8 @@ public LearnerSyncThrottler getLearnerDiffSyncThrottler() { */ abstract void processAck(long sid, long zxid, SocketAddress localSocketAddress); + abstract void processPing(long sid, long zxid, byte[] payload) throws IOException; + /** * mark session as alive * @param sess session id diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java deleted file mode 100644 index 6892d3dd8a7..00000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import java.util.List; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.server.Request; -import org.apache.zookeeper.server.RequestRecord; - -public class LearnerSyncRequest extends Request { - - LearnerHandler fh; - public LearnerSyncRequest( - LearnerHandler fh, long sessionId, int xid, int type, RequestRecord request, List authInfo) { - super(null, sessionId, xid, type, request, authInfo); - this.fh = fh; - } - -} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java index 2369eabe36c..710f282746a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java @@ -232,6 +232,10 @@ public void processAck(long sid, long zxid, SocketAddress localSocketAddress) { throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid)); } + @Override + public void processPing(long sid, long zxid, byte[] payload) throws IOException { + } + @Override public void touch(long sess, int to) { zks.getSessionTracker().touchSession(sess, to); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index 30f759fb897..6453bdc3920 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -90,7 +90,7 @@ protected void setupRequestProcessors() { // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); - commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java index c1e2fe16e43..af47829ca8d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server.quorum; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; @@ -66,15 +67,13 @@ public void initialize() { } public void processRequest(Request request) throws RequestProcessorException { - /* In the following IF-THEN-ELSE block, we process syncs on the leader. - * If the sync is coming from a follower, then the follower - * handler adds it to syncHandler. Otherwise, if it is a client of - * the leader that issued the sync command, then syncHandler won't - * contain the handler. In this case, we add it to syncHandler, and - * call processRequest on the next processor. - */ - if (request instanceof LearnerSyncRequest) { - zks.getLeader().processSync((LearnerSyncRequest) request); + if (request.type == ZooDefs.OpCode.sync) { + if (!request.isFromLearner()) { + // Submit to commit processor first since no-quorum-sync could + // commit sync immediately without a consensus. + nextProcessor.processRequest(request); + } + zks.getLeader().processSync(request); } else { if (shouldForwardToNextProcessor(request)) { nextProcessor.processRequest(request); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java index 21970f3ea4d..4e958a82a42 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java @@ -109,7 +109,7 @@ public void processRequest(Request request) throws RequestProcessorException { public void shutdown() { } - }, "0", false, new ZooKeeperServerListener() { + }, "0", new ZooKeeperServerListener() { @Override public void notifyStopping(String threadName, int errorCode) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java index 4a45983555a..45d5d6cf40c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java @@ -79,7 +79,7 @@ private class TestCommitProcessor extends CommitProcessor { int numWorkerThreads; public TestCommitProcessor(RequestProcessor finalProcessor, int numWorkerThreads) { - super(finalProcessor, "1", true, null); + super(finalProcessor, "1", null); this.numWorkerThreads = numWorkerThreads; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java index 2de0987afed..607fa8f2806 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java @@ -304,7 +304,7 @@ protected void setupRequestProcessors() { // ValidateProcessor is set up in a similar fashion to ToBeApplied // processor, so it can do pre/post validating of requests ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor); - commitProcessor = new CommitProcessor(validateProcessor, "1", true, null); + commitProcessor = new CommitProcessor(validateProcessor, "1", null); validateProcessor.setCommitProcessor(commitProcessor); commitProcessor.start(); MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(commitProcessor); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java index 5216eb70324..f6160b8a88d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java @@ -194,7 +194,7 @@ protected void setupRequestProcessors() { */ RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); - commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumSyncTest.java new file mode 100644 index 00000000000..de9a5a3f8f8 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumSyncTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import java.util.concurrent.CompletableFuture; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LearnerHandler; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class QuorumSyncTest extends QuorumBase { + @BeforeEach + @Override + public void setUp() throws Exception { + System.setProperty("zookeeper.quorumSync", "true"); + super.setUp(); + } + + @Test + public void testReadAfterSync() throws Exception { + int leaderPort = getLeaderClientPort(); + + ZooKeeper leaderReader = createClient("127.0.0.1:" + leaderPort); + ZooKeeper followerWriter = createClient(getPeersMatching(QuorumPeer.ServerState.FOLLOWING)); + + followerWriter.create("/test", "test0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // given: dying leader + Leader leader = getLeaderQuorumPeer().leader; + for (LearnerHandler f : leader.getForwardingFollowers()) { + f.getSocket().shutdownInput(); + } + + // and: write succeed in new epoch + while (true) { + try { + followerWriter.setData("/test", "test1".getBytes(), -1); + break; + } catch (KeeperException.ConnectionLossException ignored) { + ignored.printStackTrace(); + } + } + + while (true) { + try { + // when: sync succeed + syncClient(leaderReader); + + // then: read up-to-date data + byte[] test1 = leaderReader.getData("/test", null, null); + assertArrayEquals("test1".getBytes(), test1); + break; + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + } + + void syncClient(ZooKeeper zk) { + CompletableFuture synced = new CompletableFuture<>(); + zk.sync("/", (rc, path, ctx) -> { + if (rc == 0) { + synced.complete(null); + } else { + synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); + } + }, null); + synced.join(); + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index 20ec82d0989..b425b7f0fb8 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.DummyWatcher; @@ -74,6 +75,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback { @BeforeEach public void setup() { + System.setProperty("zookeeper.quorumSync", "true"); System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/); QuorumPeerConfig.setReconfigEnabled(true); } @@ -86,6 +88,17 @@ public void tearDown() throws Exception { } } + private static void syncClient(ZooKeeper zk) throws KeeperException { + CompletableFuture future = new CompletableFuture<>(); + zk.sync("/", (int rc, String path, Object ctx) -> { + future.complete(KeeperException.Code.get(rc)); + }, null); + KeeperException.Code rc = future.join(); + if (rc != KeeperException.Code.OK) { + throw KeeperException.create(rc); + } + } + public static String reconfig( ZooKeeperAdmin zkAdmin, List joiningServers, @@ -140,17 +153,10 @@ public static String testServerHasConfig( ZooKeeper zk, List joiningServers, List leavingServers) throws KeeperException, InterruptedException { - boolean testNodeExists = false; byte[] config = null; for (int j = 0; j < 30; j++) { try { - if (!testNodeExists) { - createZNode(zk, "/dummy", "dummy"); - testNodeExists = true; - } - // Use setData instead of sync API to force a view update. - // Check ZOOKEEPER-2137 for details. - zk.setData("/dummy", "dummy".getBytes(), -1); + syncClient(zk); config = zk.getConfig(false, new Stat()); break; } catch (KeeperException.ConnectionLossException e) { @@ -189,15 +195,12 @@ public static void testNormalOperation(ZooKeeper writer, ZooKeeper reader, boole try { if (createNodes) { createZNode(writer, "/test", "test"); - createZNode(reader, "/dummy", "dummy"); createNodes = false; } String data = "test" + j; writer.setData("/test", data.getBytes(), -1); - // Use setData instead of sync API to force a view update. - // Check ZOOKEEPER-2137 for details. - reader.setData("/dummy", "dummy".getBytes(), -1); + syncClient(reader); byte[] res = reader.getData("/test", null, new Stat()); assertEquals(data, new String(res)); break;