Skip to content

Commit

Permalink
Fix the issues of XDS flow control
Browse files Browse the repository at this point in the history
Signed-off-by: hanbingleixue <[email protected]>
  • Loading branch information
hanbingleixue committed Feb 8, 2025
1 parent 91e61f3 commit cf554f5
Show file tree
Hide file tree
Showing 20 changed files with 258 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -139,16 +140,17 @@ private static XdsInstanceCircuitBreakers parseInstanceCircuitBreakers(Cluster c
xdsInstanceCircuitBreakers.setConsecutiveGatewayFailure(outlierDetection.getConsecutiveGatewayFailure()
.getValue());
xdsInstanceCircuitBreakers.setConsecutive5xxFailure(outlierDetection.getConsecutive5Xx().getValue());
long interval = java.time.Duration.ofSeconds(outlierDetection.getInterval().getSeconds()).toMillis();
xdsInstanceCircuitBreakers.setInterval(interval);
long ejectionTime = java.time.Duration.ofSeconds(outlierDetection.getBaseEjectionTime().getSeconds())
.toMillis();
xdsInstanceCircuitBreakers.setBaseEjectionTime(ejectionTime);
xdsInstanceCircuitBreakers.setInterval(getDurationInMillis(outlierDetection.getInterval()));
xdsInstanceCircuitBreakers.setBaseEjectionTime(getDurationInMillis(outlierDetection.getBaseEjectionTime()));
xdsInstanceCircuitBreakers.setMaxEjectionPercent(outlierDetection.getMaxEjectionPercent().getValue());
xdsInstanceCircuitBreakers.setFailurePercentageMinimumHosts(outlierDetection.getFailurePercentageMinimumHosts()
.getValue());
xdsInstanceCircuitBreakers.setMinHealthPercent(cluster.getCommonLbConfig().
getHealthyPanicThreshold().getValue());
return xdsInstanceCircuitBreakers;
}

private static long getDurationInMillis(com.google.protobuf.Duration duration) {
return Duration.ofSeconds(duration.getSeconds()).toMillis() + Duration.ofNanos(duration.getNanos()).toMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ private static XdsRetryPolicy parseRetryPolicy(RetryPolicy retryPolicy) {
xdsRetryPolicy.setRetryConditions(Arrays.asList(retryPolicy.getRetryOn().split(CommonConstant.COMMA)));
}
xdsRetryPolicy.setMaxAttempts(retryPolicy.getNumRetries().getValue());
long perTryTimeout = Duration.ofSeconds(retryPolicy.getPerTryTimeout().getSeconds()).toMillis();
xdsRetryPolicy.setPerTryTimeout(perTryTimeout);
xdsRetryPolicy.setPerTryTimeout(getDurationInMillis(retryPolicy.getPerTryTimeout()));
return xdsRetryPolicy;
}

Expand Down Expand Up @@ -307,8 +306,7 @@ private static XdsAbort parseAbort(FaultAbort faultAbort) {

private static XdsDelay parseDelay(FaultDelay faultDelay) {
XdsDelay xdsDelay = new XdsDelay();
long fixedDelay = Duration.ofSeconds(faultDelay.getFixedDelay().getSeconds()).toMillis();
xdsDelay.setFixedDelay(fixedDelay);
xdsDelay.setFixedDelay(getDurationInMillis(faultDelay.getFixedDelay()));
io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent =
new io.sermant.core.service.xds.entity.FractionalPercent();
fractionalPercent.setNumerator(faultDelay.getPercentage().getNumerator());
Expand Down Expand Up @@ -448,4 +446,8 @@ private static Optional<XdsTokenBucket> parseTokenBucket(Struct tokenBucketStruc
xdsTokenBucket.setTokensPerFill((int) tokensPerFill);
return Optional.of(xdsTokenBucket);
}

private static long getDurationInMillis(com.google.protobuf.Duration duration) {
return Duration.ofSeconds(duration.getSeconds()).toMillis() + Duration.ofNanos(duration.getNanos()).toMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ public class CommonConst {
*/
public static final String DEFAULT_CONTENT_TYPE = "text/plain";

/**
* Minimum response code for a successful request
*/
public static final int MIN_SUCCESS_STATUS_CODE = 200;

/**
* Maximum response code for a successful request
*/
public static final int MAX_SUCCESS_STATUS_CODE = 399;

private CommonConst() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.config.CommonConst;
import io.sermant.flowcontrol.common.config.FlowControlConfig;
import io.sermant.flowcontrol.common.support.ReflectMethodCacheSupport;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;
Expand Down Expand Up @@ -89,15 +91,16 @@ public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) {
return false;
}
String statusCode = statusCodeOptional.get();
if (conditions.contains(statusCode)) {
return true;
if (isSuccess(statusCode)) {
return false;
}
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = RetryConditionType
.getRetryConditionWithResultByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(this, null, statusCode, result)) {
if (retryConditionOptional.get().isNeedRetry(this, null, statusCode, result)) {
return true;
}
}
Expand All @@ -110,17 +113,32 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
return false;
}
for (String conditionName : retryPolicy.getRetryConditions()) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = RetryConditionType
.getRetryConditionWithExceptionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(this, ex, null, null)) {
if (retryConditionOptional.get().isNeedRetry(this, ex, null, null)) {
return true;
}
}
return false;
}

/**
* Determine if the request is successful
*
* @param statusCode status code
* @return if the request is successful,true : success false: failure
*/
public static boolean isSuccess(String statusCode) {
if (StringUtils.isEmpty(statusCode)) {
return false;
}
int code = Integer.parseInt(statusCode);
return code >= CommonConst.MIN_SUCCESS_STATUS_CODE && code <= CommonConst.MAX_SUCCESS_STATUS_CODE;
}

/**
* implemented by subclasses, if subclass implement {@link #isNeedRetry(Set, Object)}, no need to implement the get
* code method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public interface RetryCondition {
* @param statusCode response status code
* @return The result of the decision
*/
boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result);
boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package io.sermant.flowcontrol.common.xds.retry;

import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.condition.ClientErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ConnectErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.GatewayErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificHeaderNameErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificStatusCodeErrorRetryCondition;

import io.sermant.flowcontrol.common.xds.retry.condition.ConnectFailureRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.GatewayErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.Retriable4xxRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.RetriableHeadersRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.RetriableStatusCodesRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerErrorRetryCondition;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -38,42 +39,45 @@ public enum RetryConditionType {
/**
* The type of conditional judgment for server errors
*/
SERVER_ERROR("5xx", new ServerErrorCondition()),
FIVE_XX("5xx", new ServerErrorRetryCondition(), 2),

/**
* The type of conditional judgment for client errors
*/
CLIENT_ERROR("retriable-4xx", new ClientErrorCondition()),
RETRIABLE_4XX("retriable-4xx", new Retriable4xxRetryCondition(), 0),

/**
* The type of conditional judgment for gateway errors
*/
GATEWAY_ERROR("gateway-error", new GatewayErrorCondition()),
GATEWAY_ERROR("gateway-error", new GatewayErrorRetryCondition(), 0),

/**
* The type of conditional judgment for reset errors
*/
RESET_ERROR("reset", new ResetErrorCondition()),
RESET("reset", new ResetRetryCondition(), 1),

/**
* The type of conditional judgment for resetting errors before request
*/
RESET_BEFORE_REQUEST_ERROR("reset-before-request", new ResetBeforeRequestErrorCondition()),
RESET_BEFORE_REQUEST("reset-before-request", new ResetBeforeRequestRetryCondition(), 1),

/**
* The type of conditional judgment for connect errors
*/
CONNECT_ERROR("connect-failure", new ConnectErrorRetryCondition()),
CONNECT_FAILURE("connect-failure", new ConnectFailureRetryCondition(), 1),

/**
* The type of conditional judgment for Specify response code
*/
SPECIFIC_STATUS_CODE_ERROR("retriable-status-codes", new SpecificStatusCodeErrorRetryCondition()),
RETRIABLE_STATUS_CODES("retriable-status-codes", new RetriableStatusCodesRetryCondition(), 0),

/**
* The type of conditional judgment for Specify response headers
*/
SPECIFIC_HEADER_NAME_ERROR("retriable-headers", new SpecificHeaderNameErrorRetryCondition());
RETRIABLE_HEADERS("retriable-headers", new RetriableHeadersRetryCondition(), 0);

private static final Map<String, RetryConditionType> RETRY_CONDITION_TYPE_ENUM_MAP
= new HashMap<>();

/**
* the name of retry condition
Expand All @@ -85,9 +89,24 @@ public enum RetryConditionType {
*/
private final RetryCondition retryCondition;

RetryConditionType(String conditionName, RetryCondition retryCondition) {
/**
* condition Type,
* 0: Retry condition based on the result.
* 1: Retry condition based on the exception.
* 2: Retry condition based on both the status code and the exception.
*/
private final int type;

static {
for (RetryConditionType retryConditionType : RetryConditionType.values()) {
RETRY_CONDITION_TYPE_ENUM_MAP.put(retryConditionType.conditionName, retryConditionType);
}
}

RetryConditionType(String conditionName, RetryCondition retryCondition, int type) {
this.conditionName = conditionName;
this.retryCondition = retryCondition;
this.type = type;
}

public String getConditionName() {
Expand All @@ -99,16 +118,43 @@ public RetryCondition getRetryCondition() {
}

/**
* get the instance of implements class by condition name
* get the instance of Retry condition by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionByName(String conditionName) {
for (RetryConditionType retryConditionType : RetryConditionType.values()) {
if (StringUtils.equals(retryConditionType.getConditionName(), conditionName)) {
return Optional.of(retryConditionType.getRetryCondition());
}
RetryConditionType retryConditionType = RETRY_CONDITION_TYPE_ENUM_MAP.get(conditionName);
if (retryConditionType != null) {
return Optional.of(retryConditionType.getRetryCondition());
}
return Optional.empty();
}

/**
* get the instance of Retry condition based on the result by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionWithResultByName(String conditionName) {
RetryConditionType retryConditionType = RETRY_CONDITION_TYPE_ENUM_MAP.get(conditionName);
if (retryConditionType != null && retryConditionType.type != 1) {
return Optional.of(retryConditionType.getRetryCondition());
}
return Optional.empty();
}

/**
* get the instance of Retry condition based on the result by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionWithExceptionByName(String conditionName) {
RetryConditionType retryConditionType = RETRY_CONDITION_TYPE_ENUM_MAP.get(conditionName);
if (retryConditionType != null && retryConditionType.type != 0) {
return Optional.of(retryConditionType.getRetryCondition());
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;

import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
Expand All @@ -34,9 +33,9 @@
* @author zhp
* @since 2024-11-29
*/
public class ConnectErrorRetryCondition implements RetryCondition {
public class ConnectFailureRetryCondition implements RetryCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
if (ex == null) {
return false;
}
Expand All @@ -51,10 +50,15 @@ public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object re
}

private boolean isConnectErrorException(Throwable ex) {
if (ex instanceof InterruptedIOException && StringUtils.contains(ex.getMessage(), "timeout")) {
return true;
if (isRequestTimeoutException(ex)) {
return false;
}
return ex instanceof SocketTimeoutException || ex instanceof ConnectException || ex instanceof TimeoutException
|| ex instanceof NoRouteToHostException;
}

private boolean isRequestTimeoutException(Throwable ex) {
return (ex instanceof SocketTimeoutException || ex instanceof TimeoutException)
&& !StringUtils.isEmpty(ex.getMessage()) && ex.getMessage().contains("Read timed out");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
* @author zhp
* @since 2024-11-29
*/
public class GatewayErrorCondition implements RetryCondition {
public class GatewayErrorRetryCondition implements RetryCondition {
private static final Set<String> GATE_WAY_FAILURE_CODE = new HashSet<>(Arrays.asList("502", "503", "504"));

@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return !StringUtils.isEmpty(statusCode) && GATE_WAY_FAILURE_CODE.contains(statusCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
* @author zhp
* @since 2024-11-29
*/
public class ResetBeforeRequestErrorCondition extends ResetErrorCondition {
public class ResetBeforeRequestRetryCondition extends ResetRetryCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return XdsThreadLocalUtil.getSendByteFlag() && super.needRetry(retry, ex, statusCode, result);
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return XdsThreadLocalUtil.getSendByteFlag() && super.isNeedRetry(retry, ex, statusCode, result);
}
}
Loading

0 comments on commit cf554f5

Please sign in to comment.