Skip to content

Commit

Permalink
ZOOKEEPER-1675: Make sync a quorum operation
Browse files Browse the repository at this point in the history
Previously, `sync` + `read` could not guarantee up-to-date data as
`sync` will not touch quorum in case of no outstanding proposals.

Though, `create`/`setData` could be used as an rescue, but it is
apparently ugly and error-prone. `sync` fits the semantics naturally.

This pr reverts ZOOKEEPER-2137 which using `setData` to circumvent no
quorum `sync`.

Since `sync` is a public API, so feature gate `quorumSync` is encouraged
to be off in rolling upgrade and turn on after rolling upgrade.

Refs: ZOOKEEPER-1675, ZOOKEEPER-2136, ZOOKEEPER-3600
  • Loading branch information
kezhuw committed Sep 27, 2023
1 parent f42c01d commit 9364951
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 126 deletions.
6 changes: 6 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,12 @@ property, when available, is noted below.
leader.

The default value is false.
* *quorumSync*
(Java system property: **zookeeper.quorumSync**)
**New in 3.10.0**
When this property is set, `sync` will be a quorum operation.

The default value is false.

* *serializeLastProcessedZxid.enabled*
(Jave system property: **zookeeper.serializeLastProcessedZxid.enabled**)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ The [consistency](https://jepsen.io/consistency) guarantees of ZooKeeper lie bet

Write operations in ZooKeeper are *linearizable*. In other words, each `write` will appear to take effect atomically at some point between when the client issues the request and receives the corresponding response. This means that the writes performed by all the clients in ZooKeeper can be totally ordered in such a way that respects the real-time ordering of these writes. However, merely stating that write operations are linearizable is meaningless unless we also talk about read operations.

Read operations in ZooKeeper are *not linearizable* since they can return potentially stale data. This is because a `read` in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a `read`. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are *sequentially consistent*, because `read` operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a `sync` before issuing a `read`. This too does **not** strictly guarantee up-to-date data because `sync` is [not currently a quorum operation](https://issues.apache.org/jira/browse/ZOOKEEPER-1675). To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than `syncLimit * tickTime`. Note that this is [unlikely](https://www.amazon.com/ZooKeeper-Distributed-Coordination-Flavio-Junqueira/dp/1449361307) to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the `sync` is served by the “leader” with stale data, thereby allowing the following `read` to be stale as well. The stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a `write`) is performed before a `read`.
Read operations in ZooKeeper are *not linearizable* since they can return potentially stale data. This is because a `read` in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a `read`. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are *sequentially consistent*, because `read` operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a `sync` before issuing a `read`. This does **not work prior 3.10.0** because `sync` is [not a quorum operation before](https://issues.apache.org/jira/browse/ZOOKEEPER-1675). To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than `syncLimit * tickTime`. Note that this is [unlikely](https://www.amazon.com/ZooKeeper-Distributed-Coordination-Flavio-Junqueira/dp/1449361307) to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the `sync` is served by the “leader” with stale data, thereby allowing the following `read` to be stale as well. Prior to 3.10.0, the stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a `write`) is performed before a `read`.

Overall, the consistency guarantees of ZooKeeper are formally captured by the notion of [ordered sequential consistency](http://webee.technion.ac.il/people/idish/ftp/OSC-IPL17.pdf) or `OSC(U)` to be exact, which lies between sequential consistency and linearizability.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,9 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
*/
private static volatile int maxCommitBatchSize;

/**
* This flag indicates whether we need to wait for a response to come back from the
* leader or we just let the sync operation flow through like a read. The flag will
* be false if the CommitProcessor is in a Leader pipeline.
*/
boolean matchSyncs;

public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) {
public CommitProcessor(RequestProcessor nextProcessor, String id, ZooKeeperServerListener listener) {
super("CommitProcessor:" + id, listener);
this.nextProcessor = nextProcessor;
this.matchSyncs = matchSyncs;
}

private boolean isProcessingRequest() {
Expand All @@ -181,9 +173,8 @@ protected boolean needCommit(Request request) {
case OpCode.reconfig:
case OpCode.multi:
case OpCode.setACL:
return true;
case OpCode.sync:
return matchSyncs;
return true;
case OpCode.createSession:
case OpCode.closeSession:
return !request.isLocalSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Follower getFollower() {
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
Expand Down Expand Up @@ -117,9 +117,8 @@ public synchronized void sync() {
}

Request r = pendingSyncs.remove();
if (r instanceof LearnerSyncRequest) {
LearnerSyncRequest lsr = (LearnerSyncRequest) r;
lsr.fh.queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null));
if (r.getOwner() instanceof LearnerHandler) {
((LearnerHandler) r.getOwner()).queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null));
}
commitProcessor.commit(r);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -50,6 +49,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.security.sasl.SaslException;
Expand Down Expand Up @@ -86,14 +86,22 @@ public class Leader extends LearnerMaster {
LOG.info("TCP NoDelay set to: {}", nodelay);
}

// This must be an invalid session id.
static final long PING_SESSION_END = 0;

private static final int PING_PAYLOAD_TYPE_QUORUM_SYNC = 1;

public static class Proposal extends SyncedLearnerTracker {

public QuorumPacket packet;
public Request request;
public long pingXid = -1;
public final List<Request> pendingSyncs = new ArrayList<>();

@Override
public String toString() {
return packet.getType() + ", " + packet.getZxid() + ", " + request;
return packet.getType() + ", 0x" + Long.toHexString(packet.getZxid()) + ", " + request
+ ", " + pingXid + ", " + pendingSyncs;
}

}
Expand All @@ -115,6 +123,14 @@ public static int getAckLoggingFrequency() {
return ackLoggingFrequency;
}

public static final String QUORUM_SYNC = "zookeeper.quorumSync";
private static final boolean quorumSync;

static {
quorumSync = Boolean.parseBoolean(System.getProperty(QUORUM_SYNC));
LOG.info("{} = {}", QUORUM_SYNC, quorumSync);
}

final LeaderZooKeeperServer zk;

final QuorumPeer self;
Expand Down Expand Up @@ -215,11 +231,11 @@ public void resetObserverConnectionStats() {
}
}

// Pending sync requests. Must access under 'this' lock.
private final Map<Long, List<LearnerSyncRequest>> pendingSyncs = new HashMap<>();
long quorumPingXid = 0;
private final AtomicInteger pendingSyncs = new AtomicInteger();

public synchronized int getNumPendingSyncs() {
return pendingSyncs.size();
return pendingSyncs.get();
}

//Follower counter
Expand Down Expand Up @@ -652,10 +668,7 @@ void lead() throws IOException, InterruptedException {
}
}

newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
setupQuorumTracker(newLeaderProposal);

// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
Expand Down Expand Up @@ -743,11 +756,7 @@ void lead() throws IOException, InterruptedException {
// track synced learners to make sure we still have a
// quorum of current (and potentially next pending) view.
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
setupQuorumTracker(syncedAckSet);

syncedAckSet.addAck(self.getMyId());

Expand Down Expand Up @@ -915,22 +924,28 @@ private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
* @return True if committed, otherwise false.
**/
public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
// in order to be committed, a proposal must be accepted by a quorum.
//
// getting a quorum from all necessary configurations.
if (!p.hasAllQuorums()) {
return false;
}

// make sure that ops are committed in order. With reconfigurations it is now possible
// that different operations wait for different sets of acks, and we still want to enforce
// that they are committed in order. Currently we only permit one outstanding reconfiguration
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so it's not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen.
if (outstandingProposals.containsKey(zxid - 1)) {
return false;
}

// in order to be committed, a proposal must be accepted by a quorum.
//
// getting a quorum from all necessary configurations.
if (!p.hasAllQuorums()) {
return false;
Proposal previous = outstandingProposals.get(zxid - 1);
if (previous != null) {
if (previous.pingXid < 0) {
return false;
}
// Quorum sync leader is leading old version servers, it probably will never get enough acks.
outstandingProposals.remove(zxid - 1);
commitQuorumSync(previous);
}

// commit proposals in order
Expand Down Expand Up @@ -980,11 +995,7 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol
inform(p);
}
zk.commitProcessor.commit(p.request);
if (pendingSyncs.containsKey(zxid)) {
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
commitPendingSyncs(p);

return true;
}
Expand Down Expand Up @@ -1068,6 +1079,64 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA
}
}

void setupQuorumTracker(SyncedLearnerTracker tracker) {
QuorumVerifier verifier = self.getQuorumVerifier();
QuorumVerifier lastSeen = self.getLastSeenQuorumVerifier();
tracker.addQuorumVerifier(verifier);
if (lastSeen != null && lastSeen.getVersion() > verifier.getVersion()) {
tracker.addQuorumVerifier(lastSeen);
}
}

private synchronized void commitQuorumSync(Proposal p) {
sendSync(p.request);
pendingSyncs.decrementAndGet();
commitPendingSyncs(p);
}

private synchronized void processQuorumSyncAck(long sid, long zxid, long xid) {
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.debug("Receive dated quorum sync zxid 0x{}, xid {} with no ping", Long.toHexString(zxid), xid);
return;
}
if (p.pingXid < 0) {
// This is not possible as proposal will get new zxid.
LOG.error("Receive quorum sync zxid 0x{}, xid {}, proposal {}", Long.toHexString(zxid), xid, p);
return;
} else if (xid < p.pingXid) {
// It is possible to issue two quorum sync with no write in between.
LOG.debug("Receive dated quorum sync zxid 0x{}, xid {}, ping {}", Long.toHexString(zxid), xid, p);
return;
} else if (xid > p.pingXid) {
// It is not possible as new syncs are either collapsed into old one or
// started after old one completed.
LOG.error("Receive quorum sync zxid 0x{}, xid {}, dated ping {}", Long.toHexString(zxid), xid, p);
return;
}
p.addAck(sid);
if (LOG.isDebugEnabled()) {
LOG.debug("Receive quorum sync zxid 0x{}, xid {}, ping {}, acks {}", Long.toHexString(zxid), xid, p, p.ackSetsToString());
}
if (p.hasAllQuorums()) {
outstandingProposals.remove(zxid);
commitQuorumSync(p);
}
}

@Override
void processPing(long sid, long zxid, byte[] payload) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(payload);
DataInputStream dis = new DataInputStream(bis);
int type = dis.readInt();
if (type != PING_PAYLOAD_TYPE_QUORUM_SYNC) {
String msg = String.format("invalid ping payload type: %d", type);
throw new IOException(msg);
}
long xid = dis.readLong();
processQuorumSyncAck(sid, zxid, xid);
}

static class ToBeAppliedRequestProcessor implements RequestProcessor {

private final RequestProcessor next;
Expand Down Expand Up @@ -1263,16 +1332,11 @@ public Proposal propose(Request request) throws XidRolloverException {
p.request = request;

synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier());

if (request.getHdr().getType() == OpCode.reconfig) {
self.setLastSeenQuorumVerifier(request.qv, true);
}

if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

setupQuorumTracker(p);
LOG.debug("Proposing:: {}", request);

lastProposed = p.packet.getZxid();
Expand All @@ -1283,26 +1347,69 @@ public Proposal propose(Request request) throws XidRolloverException {
return p;
}

private static byte[] createQuorumSyncPingPayload(long xid) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(stream);
try {
output.writeLong(PING_SESSION_END);
output.writeInt(PING_PAYLOAD_TYPE_QUORUM_SYNC);
output.writeLong(xid);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
return stream.toByteArray();
}

private Proposal createQuorumSyncProposal(Request r) {
quorumPingXid += 1;
byte[] payload = createQuorumSyncPingPayload(quorumPingXid);
QuorumPacket packet = new QuorumPacket(Leader.PING, lastProposed, payload, null);
Proposal p = new Proposal();
p.request = r;
p.packet = packet;
p.pingXid = quorumPingXid;
setupQuorumTracker(p);
p.addAck(self.getMyId());
return p;
}

/**
* Process sync requests
*
* @param r the request
*/

public synchronized void processSync(LearnerSyncRequest r) {
if (outstandingProposals.isEmpty()) {
public synchronized void processSync(Request r) {
Proposal p = outstandingProposals.get(lastProposed);
if (p != null) {
p.pendingSyncs.add(r);
pendingSyncs.incrementAndGet();
} else if (!quorumSync) {
sendSync(r);
} else {
pendingSyncs.computeIfAbsent(lastProposed, k -> new ArrayList<>()).add(r);
p = createQuorumSyncProposal(r);
outstandingProposals.put(lastProposed, p);
pendingSyncs.incrementAndGet();
sendPacket(p.packet);
}
}

private synchronized void commitPendingSyncs(Proposal p) {
for (Request r : p.pendingSyncs) {
pendingSyncs.decrementAndGet();
sendSync(r);
}
}

/**
* Sends a sync message to the appropriate server
*/
public void sendSync(LearnerSyncRequest r) {
QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
r.fh.queuePacket(qp);
public void sendSync(Request r) {
if (r.getOwner() instanceof LearnerHandler) {
QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
((LearnerHandler) r.getOwner()).queuePacket(qp);
} else {
zk.commitProcessor.commit(r);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Leader getLeader() {
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
Expand Down
Loading

0 comments on commit 9364951

Please sign in to comment.