diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index 1edde5d572c..57792a3b0c8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -83,6 +83,7 @@ import org.apache.zookeeper.server.util.AuthUtil; import org.apache.zookeeper.server.util.RequestPathMetricsCollector; import org.apache.zookeeper.txn.ErrorTxn; +import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,8 +109,27 @@ public FinalRequestProcessor(ZooKeeperServer zks) { this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector(); } + private ProcessTxnResult processTxn(Request request) { + // Guard against error txn from leader with skip error txn disabled and old version quorum peer + // in rolling upgrade. + // + // Concurrency safety: this is the only write request processor to data tree. + if (request.isErrorTxn() && request.zxid <= zks.getZKDatabase().getDataTreeLastProcessedZxid()) { + ProcessTxnResult rc = new ProcessTxnResult(); + TxnHeader hdr = request.getHdr(); + rc.clientId = hdr.getClientId(); + rc.cxid = hdr.getCxid(); + rc.zxid = hdr.getZxid(); + rc.type = hdr.getType(); + rc.err = ((ErrorTxn) request.getTxn()).getErr(); + LOG.debug("Skip process error txn: {}", request); + return rc; + } + return zks.processTxn(request); + } + private ProcessTxnResult applyRequest(Request request) { - ProcessTxnResult rc = zks.processTxn(request); + ProcessTxnResult rc = processTxn(request); // ZOOKEEPER-558: // In some cases the server does not close the connection (e.g., closeconn buffer diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 28d1cc97b55..39a063f6b6f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -772,43 +772,43 @@ private void pRequestHelper(Request request) { case OpCode.create: case OpCode.create2: CreateRequest create2Request = request.readRequestRecord(CreateRequest::new); - pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request); + pRequest2Txn(request.type, zks.peekNextZxid(), request, create2Request); break; case OpCode.createTTL: CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new); - pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest); + pRequest2Txn(request.type, zks.peekNextZxid(), request, createTtlRequest); break; case OpCode.deleteContainer: DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new); - pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest); + pRequest2Txn(request.type, zks.peekNextZxid(), request, deleteContainerRequest); break; case OpCode.delete: DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new); - pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest); + pRequest2Txn(request.type, zks.peekNextZxid(), request, deleteRequest); break; case OpCode.setData: SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new); - pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest); + pRequest2Txn(request.type, zks.peekNextZxid(), request, setDataRequest); break; case OpCode.reconfig: ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new); - pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest); + pRequest2Txn(request.type, zks.peekNextZxid(), request, reconfigRequest); break; case OpCode.setACL: SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new); - pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest); + pRequest2Txn(request.type, zks.peekNextZxid(), request, setAclRequest); break; case OpCode.multi: + // Each op in a multi-op must have the same zxid! + long zxid = zks.peekNextZxid(); MultiOperationRecord multiRequest; try { multiRequest = request.readRequestRecord(MultiOperationRecord::new); } catch (IOException e) { - request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); + request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), OpCode.multi)); throw e; } List txns = new ArrayList<>(); - //Each op in a multi-op must have the same zxid! - long zxid = zks.getNextZxid(); KeeperException ke = null; //Store off current pending change records in case we need to rollback @@ -876,7 +876,7 @@ private void pRequestHelper(Request request) { case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { - pRequest2Txn(request.type, zks.getNextZxid(), request, null); + pRequest2Txn(request.type, zks.peekNextZxid(), request, null); } break; @@ -924,11 +924,20 @@ private void pRequestHelper(Request request) { String digest = request.requestDigest(); LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest); if (request.getHdr() == null) { - request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type)); + // This should only happen for quorum txn in request deserialization, so peek next zxid anyway. + request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.peekNextZxid(), Time.currentWallTime(), request.type)); } request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); + request.setException(KeeperException.create(Code.MARSHALLINGERROR)); + } finally { + TxnHeader hdr = request.getHdr(); + if (request.isErrorTxn() && ZooKeeperServer.skipErrorTxn()) { + hdr.setZxid(hdr.getZxid() - 1); + } else if (hdr != null && hdr.getZxid() > zks.getZxid()) { + zks.commitNextZxid(hdr.getZxid()); + } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index 10111c8a64f..b0135f260c2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -35,6 +35,7 @@ import org.apache.zookeeper.server.quorum.LearnerHandler; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.AuthUtil; +import org.apache.zookeeper.txn.ErrorTxn; import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; @@ -340,6 +341,13 @@ static boolean isValid(int type) { } } + public boolean isErrorTxn() { + return hdr != null + && hdr.getType() == OpCode.error + && txn instanceof ErrorTxn + && ((ErrorTxn) txn).getErr() != 0; + } + public boolean isQuorum() { switch (this.type) { case OpCode.exists: diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index e3476ff6dd7..538d8a76cbe 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -88,6 +88,7 @@ private ServerMetrics(MetricsProvider metricsProvider) { DIFF_COUNT = metricsContext.getCounter("diff_count"); SNAP_COUNT = metricsContext.getCounter("snap_count"); COMMIT_COUNT = metricsContext.getCounter("commit_count"); + SKIP_COUNT = metricsContext.getCounter("skip_count"); CONNECTION_REQUEST_COUNT = metricsContext.getCounter("connection_request_count"); CONNECTION_TOKEN_DEFICIT = metricsContext.getSummary("connection_token_deficit", DetailLevel.BASIC); CONNECTION_REJECTED = metricsContext.getCounter("connection_rejected"); @@ -210,8 +211,10 @@ private ServerMetrics(MetricsProvider metricsProvider) { PROPOSAL_LATENCY = metricsContext.getSummary("proposal_latency", DetailLevel.ADVANCED); PROPOSAL_ACK_CREATION_LATENCY = metricsContext.getSummary("proposal_ack_creation_latency", DetailLevel.ADVANCED); COMMIT_PROPAGATION_LATENCY = metricsContext.getSummary("commit_propagation_latency", DetailLevel.ADVANCED); + SKIP_PROPAGATION_LATENCY = metricsContext.getSummary("skip_propagation_latency", DetailLevel.ADVANCED); LEARNER_PROPOSAL_RECEIVED_COUNT = metricsContext.getCounter("learner_proposal_received_count"); LEARNER_COMMIT_RECEIVED_COUNT = metricsContext.getCounter("learner_commit_received_count"); + LEARNER_SKIP_RECEIVED_COUNT = metricsContext.getCounter("learner_skip_received_count"); /** * Learner handler quorum packet metrics. @@ -339,6 +342,7 @@ private ServerMetrics(MetricsProvider metricsProvider) { public final Counter DIFF_COUNT; public final Counter SNAP_COUNT; public final Counter COMMIT_COUNT; + public final Counter SKIP_COUNT; public final Counter CONNECTION_REQUEST_COUNT; public final Counter REVALIDATE_COUNT; @@ -376,8 +380,10 @@ private ServerMetrics(MetricsProvider metricsProvider) { public final Summary PROPOSAL_LATENCY; public final Summary PROPOSAL_ACK_CREATION_LATENCY; public final Summary COMMIT_PROPAGATION_LATENCY; + public final Summary SKIP_PROPAGATION_LATENCY; public final Counter LEARNER_PROPOSAL_RECEIVED_COUNT; public final Counter LEARNER_COMMIT_RECEIVED_COUNT; + public final Counter LEARNER_SKIP_RECEIVED_COUNT; public final Summary STARTUP_TXNS_LOADED; public final Summary STARTUP_TXNS_LOAD_TIME; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java index 352eb81da90..d792948bead 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java @@ -47,4 +47,10 @@ public TxnHeader getHeader() { public TxnDigest getDigest() { return digest; } + + public Request toRequest() { + Request request = new Request(header.getClientId(), header.getCxid(), header.getType(), header, txn, header.getZxid()); + request.setTxnDigest(digest); + return request; + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index f460b2d5bb4..a9c12ef7713 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -29,6 +29,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.ConcurrentModificationException; import java.util.Deque; import java.util.HashMap; import java.util.List; @@ -112,6 +113,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; public static final String SKIP_ACL = "zookeeper.skipACL"; public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota"; + public static final String SKIP_ERROR_TXN = "zookeeper.skipErrorTxn"; // When enabled, will check ACL constraints appertained to the requests first, // before sending the requests to the quorum. @@ -121,6 +123,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final boolean enforceQuota; + private static final boolean skipErrorTxn; + public static final String SASL_SUPER_USER = "zookeeper.superUser"; public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients"; @@ -158,6 +162,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota); } + skipErrorTxn = Boolean.parseBoolean(System.getProperty(SKIP_ERROR_TXN, "true")); + LOG.info("{} = {}", SKIP_ERROR_TXN, skipErrorTxn); + digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); @@ -169,6 +176,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); } + public static boolean skipErrorTxn() { + return skipErrorTxn; + } + // @VisibleForTesting public static boolean isEnableEagerACLCheck() { return enableEagerACLCheck; @@ -691,8 +702,16 @@ public SessionTracker getSessionTracker() { return sessionTracker; } - long getNextZxid() { - return hzxid.incrementAndGet(); + long peekNextZxid() { + return hzxid.get() + 1; + } + + void commitNextZxid(long nextZxid) { + long prevZxid = nextZxid - 1; + if (!hzxid.compareAndSet(prevZxid, nextZxid)) { + String msg = String.format("expect current zxid %d, got %d", prevZxid, hzxid.get()); + throw new ConcurrentModificationException(msg); + } } public void setZxid(long zxid) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 0eff9d24837..baa850fe0ec 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -159,6 +159,22 @@ protected void processPacket(QuorumPacket qp) throws Exception { case Leader.PING: ping(qp); break; + case Leader.SKIP: + // It is sad that we have no breadcrumbs to route packet back to origin. Basically, we have choices: + // 1. Attach breadcrumbs to outstanding request. + // This request protocol changes and protocol version bumping. + // 2. Cache all outstanding requests just like how we handle SYNC. + // This increase memory pressure on learner master. + // 3. Send SKIP blindly to all learners including self. + // This thunder learners and waste network traffics. + // + // Given that it is optional feature caching sounds overkill. Fallback to blindly send for a moment. + // Let's evaluate protocol change later. + zk.skip(qp); + if (om != null) { + om.proposalSkipped(qp); + } + break; case Leader.PROPOSAL: ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1); TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 3b9c827c340..716772117ae 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -33,7 +33,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -50,6 +49,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.security.sasl.SaslException; @@ -61,6 +61,7 @@ import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperCriticalThread; @@ -90,6 +91,7 @@ public static class Proposal extends SyncedLearnerTracker { public QuorumPacket packet; public Request request; + public List pendingRequests = new ArrayList<>(); @Override public String toString() { @@ -215,11 +217,12 @@ public void resetObserverConnectionStats() { } } - // Pending sync requests. Must access under 'this' lock. - private final Map> pendingSyncs = new HashMap<>(); + public int getNumPendingSyncs() { + return pendingSyncs.get(); + } - public synchronized int getNumPendingSyncs() { - return pendingSyncs.size(); + public int getNumPendingSkips() { + return pendingSkips.get(); } //Follower counter @@ -433,10 +436,18 @@ Optional createServerSocket(InetSocketAddress address, boolean por */ static final int INFORMANDACTIVATE = 19; + /** + * This message type is sent by a leader to request owner to abort that mutation request. + */ + static final int SKIP = 20; + final ConcurrentMap outstandingProposals = new ConcurrentHashMap<>(); private final ConcurrentLinkedQueue toBeApplied = new ConcurrentLinkedQueue<>(); + private final AtomicInteger pendingSyncs = new AtomicInteger(0); + private final AtomicInteger pendingSkips = new AtomicInteger(0); + // VisibleForTesting protected final Proposal newLeaderProposal = new Proposal(); @@ -980,11 +991,7 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol inform(p); } zk.commitProcessor.commit(p.request); - if (pendingSyncs.containsKey(zxid)) { - for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) { - sendSync(r); - } - } + sendPendings(p); return true; } @@ -1283,17 +1290,49 @@ public Proposal propose(Request request) throws XidRolloverException { return p; } + /** + * Abort mutation request without a proposal. + * + *

This method is supposed to be called from {@link ProposalRequestProcessor}, so there is no + * concurrent proposing and aborting. But {@link #processAck(long, long, SocketAddress)} is competing + * with us, so we have to lock most of this method still to order response, just like {@link #propose(Request)}. + */ + public void skip(Request request) { + synchronized (this) { + Proposal proposal = outstandingProposals.get(request.zxid); + if (proposal == null) { + sendSkip(request); + } else { + proposal.pendingRequests.add(request); + pendingSkips.incrementAndGet(); + } + } + ServerMetrics.getMetrics().SKIP_COUNT.add(1); + } + + private void sendSkip(Request request) { + Object owner = request.getOwner(); + if (owner instanceof LearnerHandler) { + byte[] data = request.getSerializeData(); + QuorumPacket pp = new QuorumPacket(Leader.SKIP, request.zxid, data, null); + ((LearnerHandler) owner).queuePacket(pp); + } else if (owner == ServerCnxn.me) { + zk.commitProcessor.commit(request); + } + } + /** * Process sync requests * * @param r the request */ - public synchronized void processSync(LearnerSyncRequest r) { - if (outstandingProposals.isEmpty()) { + Proposal proposal = outstandingProposals.get(lastProposed); + if (proposal == null) { sendSync(r); } else { - pendingSyncs.computeIfAbsent(lastProposed, k -> new ArrayList<>()).add(r); + proposal.pendingRequests.add(r); + pendingSyncs.incrementAndGet(); } } @@ -1305,6 +1344,18 @@ public void sendSync(LearnerSyncRequest r) { r.fh.queuePacket(qp); } + public void sendPendings(Proposal proposal) { + for (Request request : proposal.pendingRequests) { + if (request instanceof LearnerSyncRequest) { + sendSync((LearnerSyncRequest) request); + pendingSyncs.decrementAndGet(); + } else if (request.isErrorTxn()) { + sendSkip(request); + pendingSkips.decrementAndGet(); + } + } + } + /** * lets the leader know that a follower is capable of following and is done * syncing diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 2de080bd380..3884c53b48a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -113,6 +113,7 @@ protected void registerMetrics() { rootContext.registerGauge("pending_syncs", gaugeWithLeader( (leader) -> leader.getNumPendingSyncs() )); + rootContext.registerGauge("pending_skips", gaugeWithLeader(Leader::getNumPendingSkips)); rootContext.registerGauge("leader_uptime", gaugeWithLeader( (leader) -> leader.getUptime() )); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java index efd2e376208..64715cafb6a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -23,11 +23,15 @@ import java.util.Map; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.TxnLogEntry; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServerBean; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.util.SerializeUtils; /** * Parent class for all ZooKeeperServers for Learners @@ -52,6 +56,20 @@ public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSe */ public abstract Learner getLearner(); + /** + * Skip an outstanding request. + * + * @param qp quorum packet + * @throws IOException deserialization exception + */ + void skip(QuorumPacket qp) throws IOException { + ServerMetrics.getMetrics().LEARNER_SKIP_RECEIVED_COUNT.add(1); + TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData()); + Request request = logEntry.toRequest(); + request.logLatency(ServerMetrics.getMetrics().SKIP_PROPAGATION_LATENCY); + commitProcessor.commit(request); + } + /** * Returns the current state of the session tracker. This is only currently * used by a Learner to build a ping response packet. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java index d3aa41b5fc7..d59067fa477 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java @@ -199,6 +199,9 @@ protected void processPacket(QuorumPacket qp) throws Exception { case Leader.SYNC: ((ObserverZooKeeperServer) zk).sync(); break; + case Leader.SKIP: + zk.skip(qp); + break; case Leader.INFORM: ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1); logEntry = SerializeUtils.deserializeTxn(qp.getData()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java index 2369eabe36c..fbc62ef72e5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java @@ -404,6 +404,10 @@ private synchronized void sendPacket(final QuorumPacket pkt) { lastProposedZxid = pkt.getZxid(); } + synchronized void proposalSkipped(QuorumPacket qp) { + sendPacket(qp); + } + synchronized void proposalCommitted(long zxid) { QuorumPacket pkt = removeProposedPacket(zxid); if (pkt == null) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java index c1e2fe16e43..431548bf39d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java @@ -22,6 +22,7 @@ import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.quorum.Leader.XidRolloverException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,7 @@ public void initialize() { syncProcessor.start(); } + @Override public void processRequest(Request request) throws RequestProcessorException { /* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower @@ -80,6 +82,10 @@ public void processRequest(Request request) throws RequestProcessorException { nextProcessor.processRequest(request); } if (request.getHdr() != null) { + if (request.isErrorTxn() && ZooKeeperServer.skipErrorTxn()) { + zks.getLeader().skip(request); + return; + } // We need to sync and get consensus on any transactions try { zks.getLeader().propose(request); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SkipErrorTxnDisabledTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SkipErrorTxnDisabledTest.java new file mode 100644 index 00000000000..416b78f4db6 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SkipErrorTxnDisabledTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +public class SkipErrorTxnDisabledTest extends SkipErrorTxnTest { + @Override + protected boolean skipErrorTxn() { + return false; + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SkipErrorTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SkipErrorTxnTest.java new file mode 100644 index 00000000000..b47eb4038b3 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SkipErrorTxnTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.number.OrderingComparison.greaterThan; +import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.concurrent.CompletableFuture; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class SkipErrorTxnTest extends QuorumBase { + protected boolean skipErrorTxn() { + return true; + } + + @BeforeEach + @Override + public void setUp() throws Exception { + // TODO: setup an follower as observer master. + System.setProperty(ZooKeeperServer.SKIP_ERROR_TXN, String.valueOf(skipErrorTxn())); + setUp(true, false); + } + + void syncClient(ZooKeeper zk) { + CompletableFuture synced = new CompletableFuture<>(); + zk.sync("/", (rc, path, ctx) -> { + if (rc == 0) { + synced.complete(null); + } else { + synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); + } + }, null); + synced.join(); + } + + private void testWriteError(String hp) throws Exception { + ZooKeeper zk = createClient(hp); + syncClient(zk); + QuorumPeer leader = getLeaderQuorumPeer(); + long lastZxid = leader.getLastLoggedZxid(); + + // Issue an asynchronous request first, so we can test response order. + final CompletableFuture future = new CompletableFuture<>(); + zk.create("/a1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (int rc, String path, Object ctx, String name) -> { + if (rc == 0) { + future.complete(null); + } else { + future.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); + } + }, null); + + zk.sync("/", null, null); + + assertThrows(KeeperException.NoNodeException.class, () -> { + zk.setData("/a2", null, -1); + }); + future.join(); + + syncClient(zk); + long expectedZxid = lastZxid + 1; + if (skipErrorTxn()) { + assertThat(leader.getLastLoggedZxid(), is(expectedZxid)); + } else { + assertThat(leader.getLastLoggedZxid(), greaterThan(expectedZxid)); + } + } + + @Test + public void testLeaderWriteError() throws Exception { + testWriteError(getPeersMatching(QuorumPeer.ServerState.LEADING)); + } + + @Test + public void testFollowerWriteError() throws Exception { + testWriteError(getPeersMatching(QuorumPeer.ServerState.FOLLOWING)); + } + + @Test + public void testObserverWriteError() throws Exception { + testWriteError(getPeersMatching(QuorumPeer.ServerState.OBSERVING)); + } +}