Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-3594: Don't propose error transactions #2070

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.zookeeper.server.util.AuthUtil;
import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -108,8 +109,27 @@ public FinalRequestProcessor(ZooKeeperServer zks) {
this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
}

private ProcessTxnResult processTxn(Request request) {
// Guard against error txn from leader with skip error txn disabled and old version quorum peer
// in rolling upgrade.
//
// Concurrency safety: this is the only write request processor to data tree.
if (request.isErrorTxn() && request.zxid <= zks.getZKDatabase().getDataTreeLastProcessedZxid()) {
ProcessTxnResult rc = new ProcessTxnResult();
TxnHeader hdr = request.getHdr();
rc.clientId = hdr.getClientId();
rc.cxid = hdr.getCxid();
rc.zxid = hdr.getZxid();
rc.type = hdr.getType();
rc.err = ((ErrorTxn) request.getTxn()).getErr();
LOG.debug("Skip process error txn: {}", request);
return rc;
}
return zks.processTxn(request);
}

private ProcessTxnResult applyRequest(Request request) {
ProcessTxnResult rc = zks.processTxn(request);
ProcessTxnResult rc = processTxn(request);

// ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,43 +772,43 @@ private void pRequestHelper(Request request) {
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
pRequest2Txn(request.type, zks.peekNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, deleteContainerRequest);
break;
case OpCode.delete:
DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
pRequest2Txn(request.type, zks.peekNextZxid(), request, setAclRequest);
break;
case OpCode.multi:
// Each op in a multi-op must have the same zxid!
long zxid = zks.peekNextZxid();
MultiOperationRecord multiRequest;
try {
multiRequest = request.readRequestRecord(MultiOperationRecord::new);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;

//Store off current pending change records in case we need to rollback
Expand Down Expand Up @@ -876,7 +876,7 @@ private void pRequestHelper(Request request) {
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null);
pRequest2Txn(request.type, zks.peekNextZxid(), request, null);
}
break;

Expand Down Expand Up @@ -924,11 +924,20 @@ private void pRequestHelper(Request request) {
String digest = request.requestDigest();
LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
// This should only happen for quorum txn in request deserialization, so peek next zxid anyway.
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.peekNextZxid(), Time.currentWallTime(), request.type));
}

request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
request.setException(KeeperException.create(Code.MARSHALLINGERROR));
} finally {
TxnHeader hdr = request.getHdr();
if (request.isErrorTxn() && ZooKeeperServer.skipErrorTxn()) {
hdr.setZxid(hdr.getZxid() - 1);
} else if (hdr != null && hdr.getZxid() > zks.getZxid()) {
zks.commitNextZxid(hdr.getZxid());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.AuthUtil;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -340,6 +341,13 @@ static boolean isValid(int type) {
}
}

public boolean isErrorTxn() {
return hdr != null
&& hdr.getType() == OpCode.error
&& txn instanceof ErrorTxn
&& ((ErrorTxn) txn).getErr() != 0;
}

public boolean isQuorum() {
switch (this.type) {
case OpCode.exists:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
DIFF_COUNT = metricsContext.getCounter("diff_count");
SNAP_COUNT = metricsContext.getCounter("snap_count");
COMMIT_COUNT = metricsContext.getCounter("commit_count");
SKIP_COUNT = metricsContext.getCounter("skip_count");
CONNECTION_REQUEST_COUNT = metricsContext.getCounter("connection_request_count");
CONNECTION_TOKEN_DEFICIT = metricsContext.getSummary("connection_token_deficit", DetailLevel.BASIC);
CONNECTION_REJECTED = metricsContext.getCounter("connection_rejected");
Expand Down Expand Up @@ -210,8 +211,10 @@ private ServerMetrics(MetricsProvider metricsProvider) {
PROPOSAL_LATENCY = metricsContext.getSummary("proposal_latency", DetailLevel.ADVANCED);
PROPOSAL_ACK_CREATION_LATENCY = metricsContext.getSummary("proposal_ack_creation_latency", DetailLevel.ADVANCED);
COMMIT_PROPAGATION_LATENCY = metricsContext.getSummary("commit_propagation_latency", DetailLevel.ADVANCED);
SKIP_PROPAGATION_LATENCY = metricsContext.getSummary("skip_propagation_latency", DetailLevel.ADVANCED);
LEARNER_PROPOSAL_RECEIVED_COUNT = metricsContext.getCounter("learner_proposal_received_count");
LEARNER_COMMIT_RECEIVED_COUNT = metricsContext.getCounter("learner_commit_received_count");
LEARNER_SKIP_RECEIVED_COUNT = metricsContext.getCounter("learner_skip_received_count");

/**
* Learner handler quorum packet metrics.
Expand Down Expand Up @@ -339,6 +342,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter DIFF_COUNT;
public final Counter SNAP_COUNT;
public final Counter COMMIT_COUNT;
public final Counter SKIP_COUNT;
public final Counter CONNECTION_REQUEST_COUNT;

public final Counter REVALIDATE_COUNT;
Expand Down Expand Up @@ -376,8 +380,10 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Summary PROPOSAL_LATENCY;
public final Summary PROPOSAL_ACK_CREATION_LATENCY;
public final Summary COMMIT_PROPAGATION_LATENCY;
public final Summary SKIP_PROPAGATION_LATENCY;
public final Counter LEARNER_PROPOSAL_RECEIVED_COUNT;
public final Counter LEARNER_COMMIT_RECEIVED_COUNT;
public final Counter LEARNER_SKIP_RECEIVED_COUNT;

public final Summary STARTUP_TXNS_LOADED;
public final Summary STARTUP_TXNS_LOAD_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ public TxnHeader getHeader() {
public TxnDigest getDigest() {
return digest;
}

public Request toRequest() {
Request request = new Request(header.getClientId(), header.getCxid(), header.getType(), header, txn, header.getZxid());
request.setTxnDigest(digest);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
public static final String SKIP_ACL = "zookeeper.skipACL";
public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
public static final String SKIP_ERROR_TXN = "zookeeper.skipErrorTxn";

// When enabled, will check ACL constraints appertained to the requests first,
// before sending the requests to the quorum.
Expand All @@ -121,6 +123,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {

public static final boolean enforceQuota;

private static final boolean skipErrorTxn;

public static final String SASL_SUPER_USER = "zookeeper.superUser";

public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
Expand Down Expand Up @@ -158,6 +162,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
}

skipErrorTxn = Boolean.parseBoolean(System.getProperty(SKIP_ERROR_TXN, "false"));
LOG.info("{} = {}", SKIP_ERROR_TXN, skipErrorTxn);

digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);

Expand All @@ -169,6 +176,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
}

public static boolean skipErrorTxn() {
return skipErrorTxn;
}

// @VisibleForTesting
public static boolean isEnableEagerACLCheck() {
return enableEagerACLCheck;
Expand Down Expand Up @@ -691,8 +702,16 @@ public SessionTracker getSessionTracker() {
return sessionTracker;
}

long getNextZxid() {
return hzxid.incrementAndGet();
long peekNextZxid() {
return hzxid.get() + 1;
}

void commitNextZxid(long nextZxid) {
long prevZxid = nextZxid - 1;
if (!hzxid.compareAndSet(prevZxid, nextZxid)) {
String msg = String.format("expect current zxid %d, got %d", prevZxid, hzxid.get());
throw new ConcurrentModificationException(msg);
}
}

public void setZxid(long zxid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,22 @@ protected void processPacket(QuorumPacket qp) throws Exception {
case Leader.PING:
ping(qp);
break;
case Leader.SKIP:
// It is sad that we have no breadcrumbs to route packet back to origin. Basically, we have choices:
// 1. Attach breadcrumbs to outstanding request.
// This request protocol changes and protocol version bumping.
Copy link
Member Author

@kezhuw kezhuw Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could also cover #2069 if we want to go this way. I prefer this actually. We will inevitably introduce protocol changes in long term future. If so, why now ? But it still could be a separate issue before 3.10.0.

// 2. Cache all outstanding requests just like how we handle SYNC.
// This increase memory pressure on learner master.
// 3. Send SKIP blindly to all learners including self.
// This thunder learners and waste network traffics.
//
// Given that it is optional feature caching sounds overkill. Fallback to blindly send for a moment.
// Let's evaluate protocol change later.
zk.skip(qp);
if (om != null) {
om.proposalSkipped(qp);
}
break;
case Leader.PROPOSAL:
ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
Expand Down
Loading
Loading