Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dual-read potential risk by using ThreadPool #945

Merged
merged 10 commits into from
Nov 21, 2023
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.46.10] - 2023-11-13
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
Fix dual-read potential risk that newLb may impact oldLb

## [29.46.9] - 2023-11-02
- Update FieldDef so that it will lazily cache the hashCode.

Expand Down Expand Up @@ -5557,7 +5560,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.9...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.10...master
[29.46.10]: https://github.com/linkedin/rest.li/compare/v29.46.9...v29.46.10
[29.46.9]: https://github.com/linkedin/rest.li/compare/v29.46.8...v29.46.9
[29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8
[29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public D2Client build()
_config.serviceDiscoveryEventEmitter,
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout
_config.xdsStreamReadyTimeout,
_config.loadBalancerExecutor
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -643,6 +644,11 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

public D2ClientBuilder setLoadBalancerExecutor(ExecutorService loadBalancerExecutor) {
_config.loadBalancerExecutor = loadBalancerExecutor;
return this;
}

brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
/**
* Single-threaded executor service for xDS communication.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class D2ClientConfig

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;
public ExecutorService loadBalancerExecutor = null;

public D2ClientConfig()
{
Expand Down Expand Up @@ -195,7 +197,8 @@ public D2ClientConfig()
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout)
Long xdsStreamReadyTimeout,
ExecutorService loadBalancerExecutor)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -261,5 +264,6 @@ public D2ClientConfig()
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
this.loadBalancerExecutor = loadBalancerExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.balancer.dualread;

import com.google.common.util.concurrent.MoreExecutors;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
Expand All @@ -34,6 +35,7 @@
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -59,16 +61,35 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities
private final LoadBalancerWithFacilities _oldLb;
private final LoadBalancerWithFacilities _newLb;
private final DualReadStateManager _dualReadStateManager;

private ExecutorService _executor;
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
private boolean _isNewLbReady;

@Deprecated
public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager)
{
this(oldLb, newLb, dualReadStateManager, null);
LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool");
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
}

public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager, ExecutorService executor)
{
_oldLb = oldLb;
_newLb = newLb;
_dualReadStateManager = dualReadStateManager;
_isNewLbReady = false;
if(executor == null)
{
// Using a direct executor here means the code is executed directly,
// blocking the caller. This means the old behavior is preserved.
_executor = MoreExecutors.newDirectExecutorService();
LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool executor");
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
_executor = executor;
}
}

@Override
Expand Down Expand Up @@ -109,35 +130,36 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
_newLb.getClient(request, requestContext, clientCallback);
break;
case DUAL_READ:
_newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e);
}

@Override
public void onSuccess(ServiceProperties result)
{
String clusterName = result.getClusterName();
_dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ);
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback<Pair<ClusterProperties, UriProperties>>()
_executor.execute(
() -> _newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e);
LOG.error("Double read failure. Unable to read service properties from: {}", serviceName, e);
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
public void onSuccess(ServiceProperties result)
{
LOG.debug("Dual read is successful. Get cluster and uri properties: " + result);
String clusterName = result.getClusterName();
_dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ);
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback<Pair<ClusterProperties, UriProperties>>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Dual read failure. Unable to read cluster properties from: {}", clusterName, e);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
{
LOG.debug("Dual read is successful. Get cluster and uri properties: {}", result);
}
});
}
});
}
});
}));
_oldLb.getClient(request, requestContext, clientCallback);
break;
case OLD_LB_ONLY:
Expand All @@ -155,7 +177,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback<Servic
_newLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case DUAL_READ:
_newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty());
_executor.execute(() -> _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty()));
_oldLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case OLD_LB_ONLY:
Expand All @@ -174,7 +196,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName,
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case DUAL_READ:
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty());
_executor.execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty()));
_oldLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case OLD_LB_ONLY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualRea
@Override
public LoadBalancerWithFacilities create(D2ClientConfig config)
{
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager);
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, config.loadBalancerExecutor);
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.46.9
version=29.46.10
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Loading