Skip to content

Commit

Permalink
Fix dual-read potential risk by using ThreadPool (#945)
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing authored Nov 21, 2023
1 parent a33a1e1 commit a45e661
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 28 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ When updating the changelog, remember to be very clear about what behavior has c
and what APIs have changed, if applicable.

## [Unreleased]
## [29.48.0] - 2023-11-13
- Fix dual-read potential risk that newLb may impact oldLb

## [29.47.0] - 2023-11-13
- Use Node instead of D2Node and D2URIMap instead of NodeMap for xDS flow
Expand Down Expand Up @@ -5560,7 +5562,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.47.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.48.0...master
[29.48.0]: https://github.com/linkedin/rest.li/compare/v29.47.0...v29.48.0
[29.47.0]: https://github.com/linkedin/rest.li/compare/v29.46.9...v29.47.0
[29.46.9]: https://github.com/linkedin/rest.li/compare/v29.46.8...v29.46.9
[29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public D2Client build()
_config.serviceDiscoveryEventEmitter,
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout
_config.xdsStreamReadyTimeout,
_config.dualReadNewLbExecutor
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -643,6 +644,11 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

public D2ClientBuilder setDualReadNewLbExecutor(ExecutorService dualReadNewLbExecutor) {
_config.dualReadNewLbExecutor = dualReadNewLbExecutor;
return this;
}

/**
* Single-threaded executor service for xDS communication.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class D2ClientConfig

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;
public ExecutorService dualReadNewLbExecutor = null;

public D2ClientConfig()
{
Expand Down Expand Up @@ -195,7 +197,8 @@ public D2ClientConfig()
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout)
Long xdsStreamReadyTimeout,
ExecutorService dualReadNewLbExecutor)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -261,5 +264,6 @@ public D2ClientConfig()
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
this.dualReadNewLbExecutor = dualReadNewLbExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.balancer.dualread;

import com.google.common.util.concurrent.MoreExecutors;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
Expand All @@ -34,6 +35,7 @@
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -59,16 +61,34 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities
private final LoadBalancerWithFacilities _oldLb;
private final LoadBalancerWithFacilities _newLb;
private final DualReadStateManager _dualReadStateManager;

private ExecutorService _newLbExecutor;
private boolean _isNewLbReady;

@Deprecated
public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager)
{
this(oldLb, newLb, dualReadStateManager, null);
}

public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager, ExecutorService newLbExecutor)
{
_oldLb = oldLb;
_newLb = newLb;
_dualReadStateManager = dualReadStateManager;
_isNewLbReady = false;
if (newLbExecutor == null)
{
// Using a direct executor here means the code is executed directly,
// blocking the caller. This means the old behavior is preserved.
_newLbExecutor = MoreExecutors.newDirectExecutorService();
LOG.warn("The newLbExecutor is null, will use a direct executor instead.");
}
else
{
_newLbExecutor = newLbExecutor;
}
}

@Override
Expand Down Expand Up @@ -109,35 +129,36 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
_newLb.getClient(request, requestContext, clientCallback);
break;
case DUAL_READ:
_newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e);
}

@Override
public void onSuccess(ServiceProperties result)
{
String clusterName = result.getClusterName();
_dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ);
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback<Pair<ClusterProperties, UriProperties>>()
_newLbExecutor.execute(
() -> _newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e);
LOG.error("Dual read failure. Unable to read service properties from: {}", serviceName, e);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
public void onSuccess(ServiceProperties result)
{
LOG.debug("Dual read is successful. Get cluster and uri properties: " + result);
String clusterName = result.getClusterName();
_dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ);
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback<Pair<ClusterProperties, UriProperties>>()
{
@Override
public void onError(Throwable e)
{
LOG.error("Dual read failure. Unable to read cluster properties from: {}", clusterName, e);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
{
LOG.debug("Dual read is successful. Get cluster and uri properties: {}", result);
}
});
}
});
}
});
}));
_oldLb.getClient(request, requestContext, clientCallback);
break;
case OLD_LB_ONLY:
Expand All @@ -155,7 +176,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback<Servic
_newLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case DUAL_READ:
_newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty());
_newLbExecutor.execute(() -> _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty()));
_oldLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case OLD_LB_ONLY:
Expand All @@ -174,7 +195,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName,
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case DUAL_READ:
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty());
_newLbExecutor.execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty()));
_oldLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case OLD_LB_ONLY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualRea
@Override
public LoadBalancerWithFacilities create(D2ClientConfig config)
{
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager);
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, config.dualReadNewLbExecutor);
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.47.0
version=29.48.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit a45e661

Please sign in to comment.