Skip to content

Commit

Permalink
ZOOKEEPER-3594: Don't propose error transactions
Browse files Browse the repository at this point in the history
As discussions in ZOOKEEPER-3418 and ZOOKEEPER-3594, there is no reason
for us to propose error transactions. It is a waste of both network
traffic/latency and disk storage.

This pr introduces a new quorum packet type `SKIP`. Leader uses it to
signal request owner to abort pending request.

Since `SKIP` is a new packet type, so `skipErrorTxn` must not be turned
on during rolling upgrade.

JIRA: ZOOKEEPER-3594
  • Loading branch information
kezhuw committed Sep 27, 2023
1 parent f42c01d commit abb2747
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.zookeeper.server.util.AuthUtil;
import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -108,8 +109,27 @@ public FinalRequestProcessor(ZooKeeperServer zks) {
this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
}

private ProcessTxnResult processTxn(Request request) {
// Guard against error txn from leader with skip error txn disabled and old version quorum peer
// in rolling upgrade.
//
// Concurrency safety: this is the only write request processor to data tree.
if (request.isErrorTxn() && request.zxid <= zks.getZKDatabase().getDataTreeLastProcessedZxid()) {
ProcessTxnResult rc = new ProcessTxnResult();
TxnHeader hdr = request.getHdr();
rc.clientId = hdr.getClientId();
rc.cxid = hdr.getCxid();
rc.zxid = hdr.getZxid();
rc.type = hdr.getType();
rc.err = ((ErrorTxn) request.getTxn()).getErr();
LOG.debug("Skip process error txn: {}", request);
return rc;
}
return zks.processTxn(request);
}

private ProcessTxnResult applyRequest(Request request) {
ProcessTxnResult rc = zks.processTxn(request);
ProcessTxnResult rc = processTxn(request);

// ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,43 +772,43 @@ private void pRequestHelper(Request request) {
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
pRequest2Txn(request.type, zks.peekNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, deleteContainerRequest);
break;
case OpCode.delete:
DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, setAclRequest);
break;
case OpCode.multi:
// Each op in a multi-op must have the same zxid!
long zxid = zks.peekNextZxid();
MultiOperationRecord multiRequest;
try {
multiRequest = request.readRequestRecord(MultiOperationRecord::new);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;

//Store off current pending change records in case we need to rollback
Expand Down Expand Up @@ -876,7 +876,7 @@ private void pRequestHelper(Request request) {
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null);
pRequest2Txn(request.type, zks.peekNextZxid(), request, null);
}
break;

Expand Down Expand Up @@ -924,11 +924,20 @@ private void pRequestHelper(Request request) {
String digest = request.requestDigest();
LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
// This should only happen for quorum txn in request deserialization, so peek next zxid anyway.
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.peekNextZxid(), Time.currentWallTime(), request.type));
}

request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
request.setException(KeeperException.create(Code.MARSHALLINGERROR));
} finally {
TxnHeader hdr = request.getHdr();
if (request.isErrorTxn() && ZooKeeperServer.skipErrorTxn()) {
hdr.setZxid(hdr.getZxid() - 1);
} else if (hdr != null && hdr.getZxid() > zks.getZxid()) {
zks.commitNextZxid(hdr.getZxid());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.AuthUtil;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -340,6 +341,13 @@ static boolean isValid(int type) {
}
}

public boolean isErrorTxn() {
return hdr != null
&& hdr.getType() == OpCode.error
&& txn instanceof ErrorTxn
&& ((ErrorTxn) txn).getErr() != 0;
}

public boolean isQuorum() {
switch (this.type) {
case OpCode.exists:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
DIFF_COUNT = metricsContext.getCounter("diff_count");
SNAP_COUNT = metricsContext.getCounter("snap_count");
COMMIT_COUNT = metricsContext.getCounter("commit_count");
SKIP_COUNT = metricsContext.getCounter("skip_count");
CONNECTION_REQUEST_COUNT = metricsContext.getCounter("connection_request_count");
CONNECTION_TOKEN_DEFICIT = metricsContext.getSummary("connection_token_deficit", DetailLevel.BASIC);
CONNECTION_REJECTED = metricsContext.getCounter("connection_rejected");
Expand Down Expand Up @@ -210,8 +211,10 @@ private ServerMetrics(MetricsProvider metricsProvider) {
PROPOSAL_LATENCY = metricsContext.getSummary("proposal_latency", DetailLevel.ADVANCED);
PROPOSAL_ACK_CREATION_LATENCY = metricsContext.getSummary("proposal_ack_creation_latency", DetailLevel.ADVANCED);
COMMIT_PROPAGATION_LATENCY = metricsContext.getSummary("commit_propagation_latency", DetailLevel.ADVANCED);
SKIP_PROPAGATION_LATENCY = metricsContext.getSummary("skip_propagation_latency", DetailLevel.ADVANCED);
LEARNER_PROPOSAL_RECEIVED_COUNT = metricsContext.getCounter("learner_proposal_received_count");
LEARNER_COMMIT_RECEIVED_COUNT = metricsContext.getCounter("learner_commit_received_count");
LEARNER_SKIP_RECEIVED_COUNT = metricsContext.getCounter("learner_skip_received_count");

/**
* Learner handler quorum packet metrics.
Expand Down Expand Up @@ -339,6 +342,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter DIFF_COUNT;
public final Counter SNAP_COUNT;
public final Counter COMMIT_COUNT;
public final Counter SKIP_COUNT;
public final Counter CONNECTION_REQUEST_COUNT;

public final Counter REVALIDATE_COUNT;
Expand Down Expand Up @@ -376,8 +380,10 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Summary PROPOSAL_LATENCY;
public final Summary PROPOSAL_ACK_CREATION_LATENCY;
public final Summary COMMIT_PROPAGATION_LATENCY;
public final Summary SKIP_PROPAGATION_LATENCY;
public final Counter LEARNER_PROPOSAL_RECEIVED_COUNT;
public final Counter LEARNER_COMMIT_RECEIVED_COUNT;
public final Counter LEARNER_SKIP_RECEIVED_COUNT;

public final Summary STARTUP_TXNS_LOADED;
public final Summary STARTUP_TXNS_LOAD_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ public TxnHeader getHeader() {
public TxnDigest getDigest() {
return digest;
}

public Request toRequest() {
Request request = new Request(header.getClientId(), header.getCxid(), header.getType(), header, txn, header.getZxid());
request.setTxnDigest(digest);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
public static final String SKIP_ACL = "zookeeper.skipACL";
public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
public static final String SKIP_ERROR_TXN = "zookeeper.skipErrorTxn";

// When enabled, will check ACL constraints appertained to the requests first,
// before sending the requests to the quorum.
Expand All @@ -121,6 +123,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {

public static final boolean enforceQuota;

private static final boolean skipErrorTxn;

public static final String SASL_SUPER_USER = "zookeeper.superUser";

public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
Expand Down Expand Up @@ -158,6 +162,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
}

skipErrorTxn = Boolean.parseBoolean(System.getProperty(SKIP_ERROR_TXN, "true"));
LOG.info("{} = {}", SKIP_ERROR_TXN, skipErrorTxn);

digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);

Expand All @@ -169,6 +176,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
}

public static boolean skipErrorTxn() {
return skipErrorTxn;
}

// @VisibleForTesting
public static boolean isEnableEagerACLCheck() {
return enableEagerACLCheck;
Expand Down Expand Up @@ -691,8 +702,16 @@ public SessionTracker getSessionTracker() {
return sessionTracker;
}

long getNextZxid() {
return hzxid.incrementAndGet();
long peekNextZxid() {
return hzxid.get() + 1;
}

void commitNextZxid(long nextZxid) {
long prevZxid = nextZxid - 1;
if (!hzxid.compareAndSet(prevZxid, nextZxid)) {
String msg = String.format("expect current zxid %d, got %d", prevZxid, hzxid.get());
throw new ConcurrentModificationException(msg);
}
}

public void setZxid(long zxid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,22 @@ protected void processPacket(QuorumPacket qp) throws Exception {
case Leader.PING:
ping(qp);
break;
case Leader.SKIP:
// It is sad that we have no breadcrumbs to route packet back to origin. Basically, we have choices:
// 1. Attach breadcrumbs to outstanding request.
// This request protocol changes and protocol version bumping.
// 2. Cache all outstanding requests just like how we handle SYNC.
// This increase memory pressure on learner master.
// 3. Send SKIP blindly to all learners including self.
// This thunder learners and waste network traffics.
//
// Given that it is optional feature caching sounds overkill. Fallback to blindly send for a moment.
// Let's evaluate protocol change later.
zk.skip(qp);
if (om != null) {
om.proposalSkipped(qp);
}
break;
case Leader.PROPOSAL:
ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
Expand Down
Loading

0 comments on commit abb2747

Please sign in to comment.