Skip to content

Commit

Permalink
Fix the issues of XDS flow control
Browse files Browse the repository at this point in the history
Signed-off-by: hanbingleixue <[email protected]>
  • Loading branch information
hanbingleixue committed Feb 8, 2025
1 parent 91e61f3 commit edfa5ef
Show file tree
Hide file tree
Showing 23 changed files with 294 additions and 107 deletions.
2 changes: 1 addition & 1 deletion .github/actions/scenarios/mq-grayscale/rocketmq/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ runs:
with:
processor-keyword: grayscale-rocketmq
- name: if failure then upload error log
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: ${{ failure() || cancelled() }}
with:
name: (test-for-grayscale-rocketmq)-(${{ matrix.test-model }}})-logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -139,16 +140,17 @@ private static XdsInstanceCircuitBreakers parseInstanceCircuitBreakers(Cluster c
xdsInstanceCircuitBreakers.setConsecutiveGatewayFailure(outlierDetection.getConsecutiveGatewayFailure()
.getValue());
xdsInstanceCircuitBreakers.setConsecutive5xxFailure(outlierDetection.getConsecutive5Xx().getValue());
long interval = java.time.Duration.ofSeconds(outlierDetection.getInterval().getSeconds()).toMillis();
xdsInstanceCircuitBreakers.setInterval(interval);
long ejectionTime = java.time.Duration.ofSeconds(outlierDetection.getBaseEjectionTime().getSeconds())
.toMillis();
xdsInstanceCircuitBreakers.setBaseEjectionTime(ejectionTime);
xdsInstanceCircuitBreakers.setInterval(getDurationInMillis(outlierDetection.getInterval()));
xdsInstanceCircuitBreakers.setBaseEjectionTime(getDurationInMillis(outlierDetection.getBaseEjectionTime()));
xdsInstanceCircuitBreakers.setMaxEjectionPercent(outlierDetection.getMaxEjectionPercent().getValue());
xdsInstanceCircuitBreakers.setFailurePercentageMinimumHosts(outlierDetection.getFailurePercentageMinimumHosts()
.getValue());
xdsInstanceCircuitBreakers.setMinHealthPercent(cluster.getCommonLbConfig().
getHealthyPanicThreshold().getValue());
return xdsInstanceCircuitBreakers;
}

private static long getDurationInMillis(com.google.protobuf.Duration duration) {
return Duration.ofSeconds(duration.getSeconds()).toMillis() + Duration.ofNanos(duration.getNanos()).toMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ private static XdsRetryPolicy parseRetryPolicy(RetryPolicy retryPolicy) {
xdsRetryPolicy.setRetryConditions(Arrays.asList(retryPolicy.getRetryOn().split(CommonConstant.COMMA)));
}
xdsRetryPolicy.setMaxAttempts(retryPolicy.getNumRetries().getValue());
long perTryTimeout = Duration.ofSeconds(retryPolicy.getPerTryTimeout().getSeconds()).toMillis();
xdsRetryPolicy.setPerTryTimeout(perTryTimeout);
xdsRetryPolicy.setPerTryTimeout(getDurationInMillis(retryPolicy.getPerTryTimeout()));
return xdsRetryPolicy;
}

Expand Down Expand Up @@ -307,8 +306,7 @@ private static XdsAbort parseAbort(FaultAbort faultAbort) {

private static XdsDelay parseDelay(FaultDelay faultDelay) {
XdsDelay xdsDelay = new XdsDelay();
long fixedDelay = Duration.ofSeconds(faultDelay.getFixedDelay().getSeconds()).toMillis();
xdsDelay.setFixedDelay(fixedDelay);
xdsDelay.setFixedDelay(getDurationInMillis(faultDelay.getFixedDelay()));
io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent =
new io.sermant.core.service.xds.entity.FractionalPercent();
fractionalPercent.setNumerator(faultDelay.getPercentage().getNumerator());
Expand Down Expand Up @@ -448,4 +446,8 @@ private static Optional<XdsTokenBucket> parseTokenBucket(Struct tokenBucketStruc
xdsTokenBucket.setTokensPerFill((int) tokensPerFill);
return Optional.of(xdsTokenBucket);
}

private static long getDurationInMillis(com.google.protobuf.Duration duration) {
return Duration.ofSeconds(duration.getSeconds()).toMillis() + Duration.ofNanos(duration.getNanos()).toMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ public class CommonConst {
*/
public static final String DEFAULT_CONTENT_TYPE = "text/plain";

/**
* Minimum response code for a successful request
*/
public static final int MIN_SUCCESS_STATUS_CODE = 200;

/**
* Maximum response code for a successful request
*/
public static final int MAX_SUCCESS_STATUS_CODE = 399;

private CommonConst() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.config.CommonConst;
import io.sermant.flowcontrol.common.config.FlowControlConfig;
import io.sermant.flowcontrol.common.support.ReflectMethodCacheSupport;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;
Expand Down Expand Up @@ -89,15 +91,16 @@ public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) {
return false;
}
String statusCode = statusCodeOptional.get();
if (conditions.contains(statusCode)) {
return true;
if (isSuccess(statusCode)) {
return false;
}
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = RetryConditionType
.getRetryConditionWithResultByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(this, null, statusCode, result)) {
if (retryConditionOptional.get().isNeedRetry(this, null, statusCode, result)) {
return true;
}
}
Expand All @@ -110,17 +113,32 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
return false;
}
for (String conditionName : retryPolicy.getRetryConditions()) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = RetryConditionType
.getRetryConditionWithExceptionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(this, ex, null, null)) {
if (retryConditionOptional.get().isNeedRetry(this, ex, null, null)) {
return true;
}
}
return false;
}

/**
* Determine if the request is successful
*
* @param statusCode status code
* @return if the request is successful,true : success false: failure
*/
public static boolean isSuccess(String statusCode) {
if (StringUtils.isEmpty(statusCode)) {
return false;
}
int code = Integer.parseInt(statusCode);
return code >= CommonConst.MIN_SUCCESS_STATUS_CODE && code <= CommonConst.MAX_SUCCESS_STATUS_CODE;
}

/**
* implemented by subclasses, if subclass implement {@link #isNeedRetry(Set, Object)}, no need to implement the get
* code method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public interface RetryCondition {
* @param statusCode response status code
* @return The result of the decision
*/
boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result);
boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package io.sermant.flowcontrol.common.xds.retry;

import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.condition.ClientErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ConnectErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.GatewayErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificHeaderNameErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificStatusCodeErrorRetryCondition;

import io.sermant.flowcontrol.common.xds.retry.condition.ConnectFailureRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.GatewayErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.Retriable4xxRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.RetriableHeadersRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.RetriableStatusCodesRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerErrorRetryCondition;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -38,42 +39,45 @@ public enum RetryConditionType {
/**
* The type of conditional judgment for server errors
*/
SERVER_ERROR("5xx", new ServerErrorCondition()),
FIVE_XX("5xx", new ServerErrorRetryCondition(), 2),

/**
* The type of conditional judgment for client errors
*/
CLIENT_ERROR("retriable-4xx", new ClientErrorCondition()),
RETRIABLE_4XX("retriable-4xx", new Retriable4xxRetryCondition(), 0),

/**
* The type of conditional judgment for gateway errors
*/
GATEWAY_ERROR("gateway-error", new GatewayErrorCondition()),
GATEWAY_ERROR("gateway-error", new GatewayErrorRetryCondition(), 0),

/**
* The type of conditional judgment for reset errors
*/
RESET_ERROR("reset", new ResetErrorCondition()),
RESET("reset", new ResetRetryCondition(), 1),

/**
* The type of conditional judgment for resetting errors before request
*/
RESET_BEFORE_REQUEST_ERROR("reset-before-request", new ResetBeforeRequestErrorCondition()),
RESET_BEFORE_REQUEST("reset-before-request", new ResetBeforeRequestRetryCondition(), 1),

/**
* The type of conditional judgment for connect errors
*/
CONNECT_ERROR("connect-failure", new ConnectErrorRetryCondition()),
CONNECT_FAILURE("connect-failure", new ConnectFailureRetryCondition(), 1),

/**
* The type of conditional judgment for Specify response code
*/
SPECIFIC_STATUS_CODE_ERROR("retriable-status-codes", new SpecificStatusCodeErrorRetryCondition()),
RETRIABLE_STATUS_CODES("retriable-status-codes", new RetriableStatusCodesRetryCondition(), 0),

/**
* The type of conditional judgment for Specify response headers
*/
SPECIFIC_HEADER_NAME_ERROR("retriable-headers", new SpecificHeaderNameErrorRetryCondition());
RETRIABLE_HEADERS("retriable-headers", new RetriableHeadersRetryCondition(), 0);

private static final Map<String, RetryConditionType> RETRY_CONDITION_TYPE_ENUM_MAP
= new HashMap<>();

/**
* the name of retry condition
Expand All @@ -85,9 +89,24 @@ public enum RetryConditionType {
*/
private final RetryCondition retryCondition;

RetryConditionType(String conditionName, RetryCondition retryCondition) {
/**
* condition Type,
* 0: Retry condition based on the result.
* 1: Retry condition based on the exception.
* 2: Retry condition based on both the status code and the exception.
*/
private final int type;

static {
for (RetryConditionType retryConditionType : RetryConditionType.values()) {
RETRY_CONDITION_TYPE_ENUM_MAP.put(retryConditionType.conditionName, retryConditionType);
}
}

RetryConditionType(String conditionName, RetryCondition retryCondition, int type) {
this.conditionName = conditionName;
this.retryCondition = retryCondition;
this.type = type;
}

public String getConditionName() {
Expand All @@ -99,16 +118,43 @@ public RetryCondition getRetryCondition() {
}

/**
* get the instance of implements class by condition name
* get the instance of Retry condition by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionByName(String conditionName) {
for (RetryConditionType retryConditionType : RetryConditionType.values()) {
if (StringUtils.equals(retryConditionType.getConditionName(), conditionName)) {
return Optional.of(retryConditionType.getRetryCondition());
}
RetryConditionType retryConditionType = RETRY_CONDITION_TYPE_ENUM_MAP.get(conditionName);
if (retryConditionType != null) {
return Optional.of(retryConditionType.getRetryCondition());
}
return Optional.empty();
}

/**
* get the instance of Retry condition based on the result by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionWithResultByName(String conditionName) {
RetryConditionType retryConditionType = RETRY_CONDITION_TYPE_ENUM_MAP.get(conditionName);
if (retryConditionType != null && retryConditionType.type != 1) {
return Optional.of(retryConditionType.getRetryCondition());
}
return Optional.empty();
}

/**
* get the instance of Retry condition based on the result by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionWithExceptionByName(String conditionName) {
RetryConditionType retryConditionType = RETRY_CONDITION_TYPE_ENUM_MAP.get(conditionName);
if (retryConditionType != null && retryConditionType.type != 0) {
return Optional.of(retryConditionType.getRetryCondition());
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;

import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
Expand All @@ -34,9 +33,9 @@
* @author zhp
* @since 2024-11-29
*/
public class ConnectErrorRetryCondition implements RetryCondition {
public class ConnectFailureRetryCondition implements RetryCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
if (ex == null) {
return false;
}
Expand All @@ -51,10 +50,15 @@ public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object re
}

private boolean isConnectErrorException(Throwable ex) {
if (ex instanceof InterruptedIOException && StringUtils.contains(ex.getMessage(), "timeout")) {
return true;
if (isRequestTimeoutException(ex)) {
return false;
}
return ex instanceof SocketTimeoutException || ex instanceof ConnectException || ex instanceof TimeoutException
|| ex instanceof NoRouteToHostException;
}

private boolean isRequestTimeoutException(Throwable ex) {
return (ex instanceof SocketTimeoutException || ex instanceof TimeoutException)
&& !StringUtils.isEmpty(ex.getMessage()) && ex.getMessage().contains("Read timed out");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
* @author zhp
* @since 2024-11-29
*/
public class GatewayErrorCondition implements RetryCondition {
public class GatewayErrorRetryCondition implements RetryCondition {
private static final Set<String> GATE_WAY_FAILURE_CODE = new HashSet<>(Arrays.asList("502", "503", "504"));

@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return !StringUtils.isEmpty(statusCode) && GATE_WAY_FAILURE_CODE.contains(statusCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
* @author zhp
* @since 2024-11-29
*/
public class ResetBeforeRequestErrorCondition extends ResetErrorCondition {
public class ResetBeforeRequestRetryCondition extends ResetRetryCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return XdsThreadLocalUtil.getSendByteFlag() && super.needRetry(retry, ex, statusCode, result);
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return XdsThreadLocalUtil.getSendByteFlag() && super.isNeedRetry(retry, ex, statusCode, result);
}
}
Loading

0 comments on commit edfa5ef

Please sign in to comment.