Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dual-read potential risk by using ThreadPool #945

Merged
merged 10 commits into from
Nov 21, 2023
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ When updating the changelog, remember to be very clear about what behavior has c
and what APIs have changed, if applicable.

## [Unreleased]
## [29.48.0] - 2023-11-13
- Fix dual-read potential risk that newLb may impact oldLb

## [29.47.0] - 2023-11-13
- Use Node instead of D2Node and D2URIMap instead of NodeMap for xDS flow
Expand Down Expand Up @@ -5560,7 +5562,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.47.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.48.0...master
[29.48.0]: https://github.com/linkedin/rest.li/compare/v29.47.0...v29.48.0
[29.47.0]: https://github.com/linkedin/rest.li/compare/v29.46.9...v29.47.0
[29.46.9]: https://github.com/linkedin/rest.li/compare/v29.46.8...v29.46.9
[29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public D2Client build()
_config.serviceDiscoveryEventEmitter,
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout
_config.xdsStreamReadyTimeout,
_config.dualReadNewLbExecutor
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -643,6 +644,11 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

public D2ClientBuilder setDualReadNewLbExecutor(ExecutorService dualReadNewLbExecutor) {
_config.dualReadNewLbExecutor = dualReadNewLbExecutor;
return this;
}

/**
* Single-threaded executor service for xDS communication.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class D2ClientConfig

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;
public ExecutorService dualReadNewLbExecutor = null;

public D2ClientConfig()
{
Expand Down Expand Up @@ -195,7 +197,8 @@ public D2ClientConfig()
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout)
Long xdsStreamReadyTimeout,
ExecutorService dualReadNewLbExecutor)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -261,5 +264,6 @@ public D2ClientConfig()
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
this.dualReadNewLbExecutor = dualReadNewLbExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.balancer.dualread;

import com.google.common.util.concurrent.MoreExecutors;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
Expand All @@ -34,6 +35,7 @@
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -59,16 +61,34 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities
private final LoadBalancerWithFacilities _oldLb;
private final LoadBalancerWithFacilities _newLb;
private final DualReadStateManager _dualReadStateManager;

private ExecutorService _newLbExecutor;
private boolean _isNewLbReady;

@Deprecated
public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager)
{
this(oldLb, newLb, dualReadStateManager, null);
}

public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager, ExecutorService newLbExecutor)
{
_oldLb = oldLb;
_newLb = newLb;
_dualReadStateManager = dualReadStateManager;
_isNewLbReady = false;
if (newLbExecutor == null)
{
// Using a direct executor here means the code is executed directly,
// blocking the caller. This means the old behavior is preserved.
_newLbExecutor = MoreExecutors.newDirectExecutorService();
LOG.warn("The newLbExecutor is null, will use a direct executor instead.");
}
else
{
_newLbExecutor = newLbExecutor;
}
}

@Override
Expand Down Expand Up @@ -109,35 +129,36 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
_newLb.getClient(request, requestContext, clientCallback);
break;
case DUAL_READ:
_newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e);
}

@Override
public void onSuccess(ServiceProperties result)
{
String clusterName = result.getClusterName();
_dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ);
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback<Pair<ClusterProperties, UriProperties>>()
_newLbExecutor.execute(
() -> _newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e);
LOG.error("Dual read failure. Unable to read service properties from: {}", serviceName, e);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
public void onSuccess(ServiceProperties result)
{
LOG.debug("Dual read is successful. Get cluster and uri properties: " + result);
String clusterName = result.getClusterName();
_dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ);
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback<Pair<ClusterProperties, UriProperties>>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Dual read failure. Unable to read cluster properties from: {}", clusterName, e);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
{
LOG.debug("Dual read is successful. Get cluster and uri properties: {}", result);
}
});
}
});
}
});
}));
_oldLb.getClient(request, requestContext, clientCallback);
break;
case OLD_LB_ONLY:
Expand All @@ -155,7 +176,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback<Servic
_newLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case DUAL_READ:
_newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty());
_newLbExecutor.execute(() -> _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty()));
_oldLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case OLD_LB_ONLY:
Expand All @@ -174,7 +195,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName,
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case DUAL_READ:
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty());
_newLbExecutor.execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty()));
_oldLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case OLD_LB_ONLY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualRea
@Override
public LoadBalancerWithFacilities create(D2ClientConfig config)
{
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager);
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, config.dualReadNewLbExecutor);
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.47.0
version=29.48.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Loading