From eeb4ce7eb9bd2c1dea53238f6778f1c2d78463f5 Mon Sep 17 00:00:00 2001 From: zhonchen Date: Thu, 9 Nov 2023 10:01:46 -0800 Subject: [PATCH 1/9] fix dual_read fallback issue --- .../dualread/DualReadLoadBalancer.java | 39 ++------- .../NewBalanceGetPropertiesTask.java | 85 +++++++++++++++++++ .../NewLoadBalancerTaskthreadPool.java | 29 +++++++ .../DualReadZkAndXdsLoadBalancerFactory.java | 6 +- 4 files changed, 126 insertions(+), 33 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java create mode 100644 d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index c32ebbe64b..63a91e9a87 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -30,6 +30,8 @@ import com.linkedin.d2.balancer.util.hashing.HashRingProvider; import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider; import com.linkedin.d2.discovery.event.PropertyEventThread; +import com.linkedin.d2.xds.LoadBalanceTaskPool.NewLoadBalancerTaskthreadPool; +import com.linkedin.d2.xds.LoadBalanceTaskPool.NewBalanceGetPropertiesTask; import com.linkedin.r2.message.Request; import com.linkedin.r2.message.RequestContext; import com.linkedin.r2.transport.common.TransportClientFactory; @@ -59,15 +61,15 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities private final LoadBalancerWithFacilities _oldLb; private final LoadBalancerWithFacilities _newLb; private final DualReadStateManager _dualReadStateManager; - + private final NewLoadBalancerTaskthreadPool _newNewLoadBalancerTaskThreadPool; private boolean _isNewLbReady; public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb, - @Nonnull DualReadStateManager dualReadStateManager) - { + @Nonnull DualReadStateManager dualReadStateManager, NewLoadBalancerTaskthreadPool newLoadBalancerTaskThreadPool) { _oldLb = oldLb; _newLb = newLb; _dualReadStateManager = dualReadStateManager; + _newNewLoadBalancerTaskThreadPool = newLoadBalancerTaskThreadPool; _isNewLbReady = false; } @@ -109,35 +111,8 @@ public void getClient(Request request, RequestContext requestContext, Callback() - { - @Override - public void onError(Throwable e) - { - LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e); - } - - @Override - public void onSuccess(ServiceProperties result) - { - String clusterName = result.getClusterName(); - _dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ); - _newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback>() - { - @Override - public void onError(Throwable e) - { - LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e); - } - - @Override - public void onSuccess(Pair result) - { - LOG.debug("Dual read is successful. Get cluster and uri properties: " + result); - } - }); - } - }); + _newNewLoadBalancerTaskThreadPool.execute( + new NewBalanceGetPropertiesTask(_newLb, _dualReadStateManager, serviceName)); _oldLb.getClient(request, requestContext, clientCallback); break; case OLD_LB_ONLY: diff --git a/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java b/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java new file mode 100644 index 0000000000..6df976356f --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java @@ -0,0 +1,85 @@ +package com.linkedin.d2.xds.LoadBalanceTaskPool; + +import com.linkedin.common.callback.Callback; +import com.linkedin.d2.balancer.LoadBalancerWithFacilities; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class NewBalanceGetPropertiesTask implements Runnable { + private LoadBalancerWithFacilities _newLb; + private DualReadStateManager _dualReadStateManager; + private String serviceName; + private static final Logger LOG = LoggerFactory.getLogger(NewBalanceGetPropertiesTask.class); + + public NewBalanceGetPropertiesTask(LoadBalancerWithFacilities _newLb, DualReadStateManager _dualReadStateManager, + String serviceName) { + this._newLb = _newLb; + this._dualReadStateManager = _dualReadStateManager; + this.serviceName = serviceName; + } + + @Override + public void run() { + _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { + @Override + public void onError(Throwable e) { + LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e); + } + + @Override + public void onSuccess(ServiceProperties result) { + String clusterName = result.getClusterName(); + _dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ); + _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { + @Override + public void onError(Throwable e) { + LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e); + } + + @Override + public void onSuccess(ServiceProperties result) { + String clusterName = result.getClusterName(); + _dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ); + _newLb.getLoadBalancedClusterAndUriProperties(clusterName, + new Callback>() { + @Override + public void onError(Throwable e) { + LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e); + } + + @Override + public void onSuccess(Pair result) { + //TODO change back to debug + LOG.info("Dual read is successful. Get cluster and uri properties: " + result); + } + }); + } + }); + } + }); + } + + @Override + public String toString() { + return "NewBalanceGetPropertiesTask{" + "serviceName='" + serviceName + '\'' + '}'; + } +} + +class NewBalanceTaskRejectedPolicy implements RejectedExecutionHandler { + private static final Logger LOG = LoggerFactory.getLogger(NewBalanceTaskRejectedPolicy.class); + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + // TODO log the rejected task info + // TODO emit the rejected task info to a metric + LOG.info(r.toString() + " is rejected"); + } +} + diff --git a/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java b/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java new file mode 100644 index 0000000000..fbac827a5f --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java @@ -0,0 +1,29 @@ +package com.linkedin.d2.xds.LoadBalanceTaskPool; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +public class NewLoadBalancerTaskthreadPool { + private ThreadPoolExecutor _threadPoolExecutor; + private int _corePoolSize = 2; + private int _maximumPoolSize = 3; + private long _keepAliveTime = 200; + private int _queueSize = 1000; + + public NewLoadBalancerTaskthreadPool() { + RejectedExecutionHandler rejectedExecutionHandler = new NewBalanceTaskRejectedPolicy(); + _threadPoolExecutor = new ThreadPoolExecutor(_corePoolSize, _maximumPoolSize, _keepAliveTime, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(_queueSize), rejectedExecutionHandler); + } + + public void execute(Runnable task) { + _threadPoolExecutor.execute(task); + } + + public void shutdown() { + _threadPoolExecutor.shutdown(); + } +} \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java index de969095a4..afa518fc1a 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java @@ -20,6 +20,7 @@ import com.linkedin.d2.balancer.dualread.DualReadLoadBalancer; import com.linkedin.d2.balancer.dualread.DualReadModeProvider; import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.xds.LoadBalanceTaskPool.NewLoadBalancerTaskthreadPool; import javax.annotation.Nonnull; @@ -33,17 +34,20 @@ public class DualReadZkAndXdsLoadBalancerFactory implements LoadBalancerWithFaci private final LoadBalancerWithFacilitiesFactory _zkLbFactory; private final LoadBalancerWithFacilitiesFactory _xdsLbFactory; private final DualReadStateManager _dualReadStateManager; + private final NewLoadBalancerTaskthreadPool _newNewLoadBalancerTaskThreadPool; public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualReadStateManager) { _zkLbFactory = new ZKFSLoadBalancerWithFacilitiesFactory(); _xdsLbFactory = new XdsLoadBalancerWithFacilitiesFactory(); _dualReadStateManager = dualReadStateManager; + _newNewLoadBalancerTaskThreadPool = new NewLoadBalancerTaskthreadPool(); } @Override public LoadBalancerWithFacilities create(D2ClientConfig config) { - return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager); + return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, + _newNewLoadBalancerTaskThreadPool); } } \ No newline at end of file From 3edca15134023dae0345c0f5114d14ff1e151886 Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Sat, 11 Nov 2023 12:33:34 -0800 Subject: [PATCH 2/9] refactor the code structure and resolve comments --- .../linkedin/d2/balancer/D2ClientBuilder.java | 9 +- .../linkedin/d2/balancer/D2ClientConfig.java | 6 +- .../dualread/DualReadLoadBalancer.java | 65 ++++++++++++-- .../NewBalanceGetPropertiesTask.java | 85 ------------------- .../NewLoadBalancerTaskthreadPool.java | 29 ------- .../DualReadZkAndXdsLoadBalancerFactory.java | 8 +- 6 files changed, 72 insertions(+), 130 deletions(-) delete mode 100644 d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java delete mode 100644 d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 1aa24f63d5..7c3714831c 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -65,6 +65,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.net.ssl.SSLContext; @@ -201,7 +202,8 @@ public D2Client build() _config.serviceDiscoveryEventEmitter, _config.dualReadStateManager, _config.xdsExecutorService, - _config.xdsStreamReadyTimeout + _config.xdsStreamReadyTimeout, + _config.loadBalancerThreadPool ); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? @@ -643,6 +645,11 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat return this; } + public D2ClientBuilder setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) { + _config.loadBalancerThreadPool = loadBalancerThreadPool; + return this; + } + /** * Single-threaded executor service for xDS communication. */ diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index 5645451847..2be9613746 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.net.ssl.SSLContext; @@ -127,6 +128,7 @@ public class D2ClientConfig public ScheduledExecutorService xdsExecutorService = null; public Long xdsStreamReadyTimeout = null; + public ThreadPoolExecutor loadBalancerThreadPool = null; public D2ClientConfig() { @@ -195,7 +197,8 @@ public D2ClientConfig() ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter, DualReadStateManager dualReadStateManager, ScheduledExecutorService xdsExecutorService, - Long xdsStreamReadyTimeout) + Long xdsStreamReadyTimeout, + ThreadPoolExecutor loadBalancerThreadPool) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -261,5 +264,6 @@ public D2ClientConfig() this.dualReadStateManager = dualReadStateManager; this.xdsExecutorService = xdsExecutorService; this.xdsStreamReadyTimeout = xdsStreamReadyTimeout; + this.loadBalancerThreadPool = loadBalancerThreadPool; } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index 63a91e9a87..0bd74f3e56 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -30,12 +30,13 @@ import com.linkedin.d2.balancer.util.hashing.HashRingProvider; import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider; import com.linkedin.d2.discovery.event.PropertyEventThread; -import com.linkedin.d2.xds.LoadBalanceTaskPool.NewLoadBalancerTaskthreadPool; -import com.linkedin.d2.xds.LoadBalanceTaskPool.NewBalanceGetPropertiesTask; import com.linkedin.r2.message.Request; import com.linkedin.r2.message.RequestContext; import com.linkedin.r2.transport.common.TransportClientFactory; import com.linkedin.r2.transport.common.bridge.client.TransportClient; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -55,21 +56,20 @@ * In DUAL_READ mode, it reads from both the old and the new load balancer, but relies on the data from old * load balancer only. */ -public class DualReadLoadBalancer implements LoadBalancerWithFacilities -{ +public class DualReadLoadBalancer implements LoadBalancerWithFacilities { private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancer.class); private final LoadBalancerWithFacilities _oldLb; private final LoadBalancerWithFacilities _newLb; private final DualReadStateManager _dualReadStateManager; - private final NewLoadBalancerTaskthreadPool _newNewLoadBalancerTaskThreadPool; + private ThreadPoolExecutor _loadBalancerThreadPool; private boolean _isNewLbReady; public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb, - @Nonnull DualReadStateManager dualReadStateManager, NewLoadBalancerTaskthreadPool newLoadBalancerTaskThreadPool) { + @Nonnull DualReadStateManager dualReadStateManager) + { _oldLb = oldLb; _newLb = newLb; _dualReadStateManager = dualReadStateManager; - _newNewLoadBalancerTaskThreadPool = newLoadBalancerTaskThreadPool; _isNewLbReady = false; } @@ -111,8 +111,31 @@ public void getClient(Request request, RequestContext requestContext, Callback _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { + @Override + public void onError(Throwable e) { + LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e); + } + + @Override + public void onSuccess(ServiceProperties result) { + String clusterName = result.getClusterName(); + _dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ); + _newLb.getLoadBalancedClusterAndUriProperties(clusterName, + new Callback>() { + @Override + public void onError(Throwable e) { + LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e); + } + + @Override + public void onSuccess(Pair result) { + LOG.debug("Dual read is successful. Get cluster and uri properties: " + result); + } + }); + } + })); _oldLb.getClient(request, requestContext, clientCallback); break; case OLD_LB_ONLY: @@ -257,6 +280,29 @@ private DualReadModeProvider.DualReadMode getDualReadMode(String d2ServiceName) return _dualReadStateManager.getServiceDualReadMode(d2ServiceName); } + + /** + * Get the thread pool for load balancer tasks. If not set, a default thread pool will be used. + */ + public ThreadPoolExecutor getLoadBalancerThreadPool() { + return _loadBalancerThreadPool; + } + + /** + * Set the thread pool for load balancer tasks. If not set, a default thread pool will be used. + */ + public void setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) { + if (loadBalancerThreadPool != null) { + this._loadBalancerThreadPool = loadBalancerThreadPool; + } else { + LOG.info("LoadBalancerTaskThreadPool is null, using default thread pool"); + this._loadBalancerThreadPool = + new ThreadPoolExecutor(2, 3, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), (r, executor) -> { + LOG.error("LoadBalancerTaskThreadPool rejected execution, isNewReady: " + _isNewLbReady); + }); + } + } + @Override public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback) { @@ -266,5 +312,6 @@ public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback) }); _oldLb.shutdown(callback); + _loadBalancerThreadPool.shutdown(); } } diff --git a/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java b/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java deleted file mode 100644 index 6df976356f..0000000000 --- a/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewBalanceGetPropertiesTask.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.linkedin.d2.xds.LoadBalanceTaskPool; - -import com.linkedin.common.callback.Callback; -import com.linkedin.d2.balancer.LoadBalancerWithFacilities; -import com.linkedin.d2.balancer.dualread.DualReadModeProvider; -import com.linkedin.d2.balancer.dualread.DualReadStateManager; -import com.linkedin.d2.balancer.properties.ClusterProperties; -import com.linkedin.d2.balancer.properties.ServiceProperties; -import com.linkedin.d2.balancer.properties.UriProperties; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class NewBalanceGetPropertiesTask implements Runnable { - private LoadBalancerWithFacilities _newLb; - private DualReadStateManager _dualReadStateManager; - private String serviceName; - private static final Logger LOG = LoggerFactory.getLogger(NewBalanceGetPropertiesTask.class); - - public NewBalanceGetPropertiesTask(LoadBalancerWithFacilities _newLb, DualReadStateManager _dualReadStateManager, - String serviceName) { - this._newLb = _newLb; - this._dualReadStateManager = _dualReadStateManager; - this.serviceName = serviceName; - } - - @Override - public void run() { - _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { - @Override - public void onError(Throwable e) { - LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e); - } - - @Override - public void onSuccess(ServiceProperties result) { - String clusterName = result.getClusterName(); - _dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ); - _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { - @Override - public void onError(Throwable e) { - LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e); - } - - @Override - public void onSuccess(ServiceProperties result) { - String clusterName = result.getClusterName(); - _dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ); - _newLb.getLoadBalancedClusterAndUriProperties(clusterName, - new Callback>() { - @Override - public void onError(Throwable e) { - LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e); - } - - @Override - public void onSuccess(Pair result) { - //TODO change back to debug - LOG.info("Dual read is successful. Get cluster and uri properties: " + result); - } - }); - } - }); - } - }); - } - - @Override - public String toString() { - return "NewBalanceGetPropertiesTask{" + "serviceName='" + serviceName + '\'' + '}'; - } -} - -class NewBalanceTaskRejectedPolicy implements RejectedExecutionHandler { - private static final Logger LOG = LoggerFactory.getLogger(NewBalanceTaskRejectedPolicy.class); - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - // TODO log the rejected task info - // TODO emit the rejected task info to a metric - LOG.info(r.toString() + " is rejected"); - } -} - diff --git a/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java b/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java deleted file mode 100644 index fbac827a5f..0000000000 --- a/d2/src/main/java/com/linkedin/d2/xds/LoadBalanceTaskPool/NewLoadBalancerTaskthreadPool.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.linkedin.d2.xds.LoadBalanceTaskPool; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - - -public class NewLoadBalancerTaskthreadPool { - private ThreadPoolExecutor _threadPoolExecutor; - private int _corePoolSize = 2; - private int _maximumPoolSize = 3; - private long _keepAliveTime = 200; - private int _queueSize = 1000; - - public NewLoadBalancerTaskthreadPool() { - RejectedExecutionHandler rejectedExecutionHandler = new NewBalanceTaskRejectedPolicy(); - _threadPoolExecutor = new ThreadPoolExecutor(_corePoolSize, _maximumPoolSize, _keepAliveTime, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(_queueSize), rejectedExecutionHandler); - } - - public void execute(Runnable task) { - _threadPoolExecutor.execute(task); - } - - public void shutdown() { - _threadPoolExecutor.shutdown(); - } -} \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java index afa518fc1a..0655c71dde 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java @@ -20,7 +20,6 @@ import com.linkedin.d2.balancer.dualread.DualReadLoadBalancer; import com.linkedin.d2.balancer.dualread.DualReadModeProvider; import com.linkedin.d2.balancer.dualread.DualReadStateManager; -import com.linkedin.d2.xds.LoadBalanceTaskPool.NewLoadBalancerTaskthreadPool; import javax.annotation.Nonnull; @@ -34,20 +33,19 @@ public class DualReadZkAndXdsLoadBalancerFactory implements LoadBalancerWithFaci private final LoadBalancerWithFacilitiesFactory _zkLbFactory; private final LoadBalancerWithFacilitiesFactory _xdsLbFactory; private final DualReadStateManager _dualReadStateManager; - private final NewLoadBalancerTaskthreadPool _newNewLoadBalancerTaskThreadPool; public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualReadStateManager) { _zkLbFactory = new ZKFSLoadBalancerWithFacilitiesFactory(); _xdsLbFactory = new XdsLoadBalancerWithFacilitiesFactory(); _dualReadStateManager = dualReadStateManager; - _newNewLoadBalancerTaskThreadPool = new NewLoadBalancerTaskthreadPool(); } @Override public LoadBalancerWithFacilities create(D2ClientConfig config) { - return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, - _newNewLoadBalancerTaskThreadPool); + DualReadLoadBalancer loadBalancer = new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager); + loadBalancer.setLoadBalancerThreadPool(config.loadBalancerThreadPool); + return loadBalancer; } } \ No newline at end of file From 5df65c538fc524b01a1b9eb9f31ce1a6dc742cc0 Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 13 Nov 2023 10:21:11 -0800 Subject: [PATCH 3/9] Add fallback logic in getLoadBalancedServiceProperties and getLoadBalancedClusterAndUriProperties --- .../linkedin/d2/balancer/dualread/DualReadLoadBalancer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index 0bd74f3e56..dd3c3cae56 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -153,7 +153,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty())); _oldLb.getLoadBalancedServiceProperties(serviceName, clientCallback); break; case OLD_LB_ONLY: @@ -172,7 +172,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName, _newLb.getLoadBalancedClusterAndUriProperties(clusterName, callback); break; case DUAL_READ: - _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty()); + getLoadBalancerThreadPool().execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty())); _oldLb.getLoadBalancedClusterAndUriProperties(clusterName, callback); break; case OLD_LB_ONLY: From 7c0a35892f41f53f52f35f9a1191f164fcbafd1a Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 13 Nov 2023 11:39:32 -0800 Subject: [PATCH 4/9] Resolve comments --- .../linkedin/d2/balancer/D2ClientBuilder.java | 7 +-- .../linkedin/d2/balancer/D2ClientConfig.java | 8 +-- .../dualread/DualReadLoadBalancer.java | 60 ++++++++----------- .../DualReadZkAndXdsLoadBalancerFactory.java | 4 +- 4 files changed, 33 insertions(+), 46 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 7c3714831c..7d246bbe20 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -65,7 +65,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.net.ssl.SSLContext; @@ -203,7 +202,7 @@ public D2Client build() _config.dualReadStateManager, _config.xdsExecutorService, _config.xdsStreamReadyTimeout, - _config.loadBalancerThreadPool + _config.loadBalancerExecutor ); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? @@ -645,8 +644,8 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat return this; } - public D2ClientBuilder setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) { - _config.loadBalancerThreadPool = loadBalancerThreadPool; + public D2ClientBuilder setLoadBalancerExecutor(ExecutorService loadBalancerExecutor) { + _config.loadBalancerExecutor = loadBalancerExecutor; return this; } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index 2be9613746..5734c133d0 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -43,8 +43,8 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.net.ssl.SSLContext; @@ -128,7 +128,7 @@ public class D2ClientConfig public ScheduledExecutorService xdsExecutorService = null; public Long xdsStreamReadyTimeout = null; - public ThreadPoolExecutor loadBalancerThreadPool = null; + public ExecutorService loadBalancerExecutor = null; public D2ClientConfig() { @@ -198,7 +198,7 @@ public D2ClientConfig() DualReadStateManager dualReadStateManager, ScheduledExecutorService xdsExecutorService, Long xdsStreamReadyTimeout, - ThreadPoolExecutor loadBalancerThreadPool) + ExecutorService loadBalancerExecutor) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -264,6 +264,6 @@ public D2ClientConfig() this.dualReadStateManager = dualReadStateManager; this.xdsExecutorService = xdsExecutorService; this.xdsStreamReadyTimeout = xdsStreamReadyTimeout; - this.loadBalancerThreadPool = loadBalancerThreadPool; + this.loadBalancerExecutor = loadBalancerExecutor; } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index dd3c3cae56..1662ba335c 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -16,6 +16,7 @@ package com.linkedin.d2.balancer.dualread; +import com.google.common.util.concurrent.MoreExecutors; import com.linkedin.common.callback.Callback; import com.linkedin.common.callback.Callbacks; import com.linkedin.common.util.None; @@ -34,9 +35,7 @@ import com.linkedin.r2.message.RequestContext; import com.linkedin.r2.transport.common.TransportClientFactory; import com.linkedin.r2.transport.common.bridge.client.TransportClient; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; import javax.annotation.Nonnull; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -61,16 +60,31 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities { private final LoadBalancerWithFacilities _oldLb; private final LoadBalancerWithFacilities _newLb; private final DualReadStateManager _dualReadStateManager; - private ThreadPoolExecutor _loadBalancerThreadPool; + private ExecutorService _executor; private boolean _isNewLbReady; + @Deprecated public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb, - @Nonnull DualReadStateManager dualReadStateManager) + @Nonnull DualReadStateManager dualReadStateManager) { + this(oldLb, newLb, dualReadStateManager, null); + LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool"); + } + + public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb, + @Nonnull DualReadStateManager dualReadStateManager, ExecutorService executor) { _oldLb = oldLb; _newLb = newLb; _dualReadStateManager = dualReadStateManager; _isNewLbReady = false; + if(executor == null){ + // Using a direct executor here means the code is executed directly, + // blocking the caller. This means the old behavior is preserved. + _executor = MoreExecutors.newDirectExecutorService(); + LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool executor"); + }else{ + _executor = executor; + } } @Override @@ -111,11 +125,11 @@ public void getClient(Request request, RequestContext requestContext, Callback _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { @Override public void onError(Throwable e) { - LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e); + LOG.error("Double read failure. Unable to read service properties from: {}", serviceName, e); } @Override @@ -126,12 +140,12 @@ public void onSuccess(ServiceProperties result) { new Callback>() { @Override public void onError(Throwable e) { - LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e); + LOG.error("Dual read failure. Unable to read cluster properties from: {}", clusterName, e); } @Override public void onSuccess(Pair result) { - LOG.debug("Dual read is successful. Get cluster and uri properties: " + result); + LOG.debug("Dual read is successful. Get cluster and uri properties: {}", result); } }); } @@ -153,7 +167,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty())); + _executor.execute(() -> _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty())); _oldLb.getLoadBalancedServiceProperties(serviceName, clientCallback); break; case OLD_LB_ONLY: @@ -172,7 +186,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName, _newLb.getLoadBalancedClusterAndUriProperties(clusterName, callback); break; case DUAL_READ: - getLoadBalancerThreadPool().execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty())); + _executor.execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty())); _oldLb.getLoadBalancedClusterAndUriProperties(clusterName, callback); break; case OLD_LB_ONLY: @@ -280,29 +294,6 @@ private DualReadModeProvider.DualReadMode getDualReadMode(String d2ServiceName) return _dualReadStateManager.getServiceDualReadMode(d2ServiceName); } - - /** - * Get the thread pool for load balancer tasks. If not set, a default thread pool will be used. - */ - public ThreadPoolExecutor getLoadBalancerThreadPool() { - return _loadBalancerThreadPool; - } - - /** - * Set the thread pool for load balancer tasks. If not set, a default thread pool will be used. - */ - public void setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) { - if (loadBalancerThreadPool != null) { - this._loadBalancerThreadPool = loadBalancerThreadPool; - } else { - LOG.info("LoadBalancerTaskThreadPool is null, using default thread pool"); - this._loadBalancerThreadPool = - new ThreadPoolExecutor(2, 3, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), (r, executor) -> { - LOG.error("LoadBalancerTaskThreadPool rejected execution, isNewReady: " + _isNewLbReady); - }); - } - } - @Override public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback) { @@ -312,6 +303,5 @@ public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback) }); _oldLb.shutdown(callback); - _loadBalancerThreadPool.shutdown(); } } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java index 0655c71dde..024e9c02c1 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java @@ -44,8 +44,6 @@ public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualRea @Override public LoadBalancerWithFacilities create(D2ClientConfig config) { - DualReadLoadBalancer loadBalancer = new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager); - loadBalancer.setLoadBalancerThreadPool(config.loadBalancerThreadPool); - return loadBalancer; + return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, config.loadBalancerExecutor); } } \ No newline at end of file From 315bafb3ae9ff1c4c62fd2770214384254d7980f Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 13 Nov 2023 12:04:45 -0800 Subject: [PATCH 5/9] Reformat the code --- .../dualread/DualReadLoadBalancer.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index 1662ba335c..970e804dbe 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -55,7 +55,8 @@ * In DUAL_READ mode, it reads from both the old and the new load balancer, but relies on the data from old * load balancer only. */ -public class DualReadLoadBalancer implements LoadBalancerWithFacilities { +public class DualReadLoadBalancer implements LoadBalancerWithFacilities +{ private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancer.class); private final LoadBalancerWithFacilities _oldLb; private final LoadBalancerWithFacilities _newLb; @@ -65,7 +66,8 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities { @Deprecated public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb, - @Nonnull DualReadStateManager dualReadStateManager) { + @Nonnull DualReadStateManager dualReadStateManager) + { this(oldLb, newLb, dualReadStateManager, null); LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool"); } @@ -77,12 +79,15 @@ public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFa _newLb = newLb; _dualReadStateManager = dualReadStateManager; _isNewLbReady = false; - if(executor == null){ + if(executor == null) + { // Using a direct executor here means the code is executed directly, // blocking the caller. This means the old behavior is preserved. _executor = MoreExecutors.newDirectExecutorService(); LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool executor"); - }else{ + } + else + { _executor = executor; } } @@ -126,25 +131,30 @@ public void getClient(Request request, RequestContext requestContext, Callback _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { + () -> _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() + { @Override - public void onError(Throwable e) { + public void onError(Throwable e) + { LOG.error("Double read failure. Unable to read service properties from: {}", serviceName, e); } @Override - public void onSuccess(ServiceProperties result) { + public void onSuccess(ServiceProperties result) + { String clusterName = result.getClusterName(); _dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ); - _newLb.getLoadBalancedClusterAndUriProperties(clusterName, - new Callback>() { + _newLb.getLoadBalancedClusterAndUriProperties(clusterName, new Callback>() + { @Override - public void onError(Throwable e) { + public void onError(Throwable e) + { LOG.error("Dual read failure. Unable to read cluster properties from: {}", clusterName, e); } @Override - public void onSuccess(Pair result) { + public void onSuccess(Pair result) + { LOG.debug("Dual read is successful. Get cluster and uri properties: {}", result); } }); From 7322bc550d0b6d7cfd2e3a953242c217b863c168 Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 13 Nov 2023 14:24:39 -0800 Subject: [PATCH 6/9] Add change log --- CHANGELOG.md | 6 +++++- gradle.properties | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21a72fe58b..80d9a9ed07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.46.10] - 2023-11-13 +Fix dual-read potential risk that newLb may impact oldLb + ## [29.46.9] - 2023-11-02 - Update FieldDef so that it will lazily cache the hashCode. @@ -5557,7 +5560,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.9...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.10...master +[29.46.10]: https://github.com/linkedin/rest.li/compare/v29.46.9...v29.46.10 [29.46.9]: https://github.com/linkedin/rest.li/compare/v29.46.8...v29.46.9 [29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8 [29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7 diff --git a/gradle.properties b/gradle.properties index 33ae9d13ae..02c6ef69fd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.46.9 +version=29.46.10 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true From 1dd9d1b05260a8124587da21b25b89c11d763659 Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 13 Nov 2023 17:24:06 -0800 Subject: [PATCH 7/9] Resolve comments --- CHANGELOG.md | 6 +++--- .../linkedin/d2/balancer/D2ClientBuilder.java | 6 +++--- .../linkedin/d2/balancer/D2ClientConfig.java | 6 +++--- .../dualread/DualReadLoadBalancer.java | 20 +++++++++---------- .../DualReadZkAndXdsLoadBalancerFactory.java | 2 +- gradle.properties | 2 +- 6 files changed, 21 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80d9a9ed07..428fdd9fbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ and what APIs have changed, if applicable. ## [Unreleased] -## [29.46.10] - 2023-11-13 +## [29.47.0] - 2023-11-13 Fix dual-read potential risk that newLb may impact oldLb ## [29.46.9] - 2023-11-02 @@ -5560,8 +5560,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.10...master -[29.46.10]: https://github.com/linkedin/rest.li/compare/v29.46.9...v29.46.10 +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.47.0...master +[29.47.0]: https://github.com/linkedin/rest.li/compare/v29.46.9...v29.47.0 [29.46.9]: https://github.com/linkedin/rest.li/compare/v29.46.8...v29.46.9 [29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8 [29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 7d246bbe20..ea4838c15c 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -202,7 +202,7 @@ public D2Client build() _config.dualReadStateManager, _config.xdsExecutorService, _config.xdsStreamReadyTimeout, - _config.loadBalancerExecutor + _config.dualReadNewLbExecutor ); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? @@ -644,8 +644,8 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat return this; } - public D2ClientBuilder setLoadBalancerExecutor(ExecutorService loadBalancerExecutor) { - _config.loadBalancerExecutor = loadBalancerExecutor; + public D2ClientBuilder setDualReadNewLbExecutor(ExecutorService dualReadNewLbExecutor) { + _config.dualReadNewLbExecutor = dualReadNewLbExecutor; return this; } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index 5734c133d0..8f7fc44b52 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -128,7 +128,7 @@ public class D2ClientConfig public ScheduledExecutorService xdsExecutorService = null; public Long xdsStreamReadyTimeout = null; - public ExecutorService loadBalancerExecutor = null; + public ExecutorService dualReadNewLbExecutor = null; public D2ClientConfig() { @@ -198,7 +198,7 @@ public D2ClientConfig() DualReadStateManager dualReadStateManager, ScheduledExecutorService xdsExecutorService, Long xdsStreamReadyTimeout, - ExecutorService loadBalancerExecutor) + ExecutorService dualReadNewLbExecutor) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -264,6 +264,6 @@ public D2ClientConfig() this.dualReadStateManager = dualReadStateManager; this.xdsExecutorService = xdsExecutorService; this.xdsStreamReadyTimeout = xdsStreamReadyTimeout; - this.loadBalancerExecutor = loadBalancerExecutor; + this.dualReadNewLbExecutor = dualReadNewLbExecutor; } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index 970e804dbe..bfe387f3bd 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -61,7 +61,7 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities private final LoadBalancerWithFacilities _oldLb; private final LoadBalancerWithFacilities _newLb; private final DualReadStateManager _dualReadStateManager; - private ExecutorService _executor; + private ExecutorService _newLbExecutor; private boolean _isNewLbReady; @Deprecated @@ -73,22 +73,22 @@ public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFa } public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb, - @Nonnull DualReadStateManager dualReadStateManager, ExecutorService executor) + @Nonnull DualReadStateManager dualReadStateManager, ExecutorService newLbExecutor) { _oldLb = oldLb; _newLb = newLb; _dualReadStateManager = dualReadStateManager; _isNewLbReady = false; - if(executor == null) + if(newLbExecutor == null) { // Using a direct executor here means the code is executed directly, // blocking the caller. This means the old behavior is preserved. - _executor = MoreExecutors.newDirectExecutorService(); - LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool executor"); + _newLbExecutor = MoreExecutors.newDirectExecutorService(); + LOG.warn("The newLbExecutor is null, will use a direct executor instead."); } else { - _executor = executor; + _newLbExecutor = newLbExecutor; } } @@ -130,13 +130,13 @@ public void getClient(Request request, RequestContext requestContext, Callback _newLb.getLoadBalancedServiceProperties(serviceName, new Callback() { @Override public void onError(Throwable e) { - LOG.error("Double read failure. Unable to read service properties from: {}", serviceName, e); + LOG.error("Dual read failure. Unable to read service properties from: {}", serviceName, e); } @Override @@ -177,7 +177,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty())); + _newLbExecutor.execute(() -> _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty())); _oldLb.getLoadBalancedServiceProperties(serviceName, clientCallback); break; case OLD_LB_ONLY: @@ -196,7 +196,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName, _newLb.getLoadBalancedClusterAndUriProperties(clusterName, callback); break; case DUAL_READ: - _executor.execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty())); + _newLbExecutor.execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty())); _oldLb.getLoadBalancedClusterAndUriProperties(clusterName, callback); break; case OLD_LB_ONLY: diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java index 024e9c02c1..5f96357144 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/DualReadZkAndXdsLoadBalancerFactory.java @@ -44,6 +44,6 @@ public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualRea @Override public LoadBalancerWithFacilities create(D2ClientConfig config) { - return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, config.loadBalancerExecutor); + return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, config.dualReadNewLbExecutor); } } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 02c6ef69fd..567d162c59 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.46.10 +version=29.47.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true From 9c77acdbd43644ffe1d59a6a59a5d5028dd4dc46 Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 20 Nov 2023 08:27:17 -0800 Subject: [PATCH 8/9] resolve comment --- .../com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index bfe387f3bd..0c505a9d52 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -69,7 +69,6 @@ public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFa @Nonnull DualReadStateManager dualReadStateManager) { this(oldLb, newLb, dualReadStateManager, null); - LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool"); } public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb, From c098f16018f4438332df5fffa053d6ae73a8a646 Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 20 Nov 2023 16:11:48 -0800 Subject: [PATCH 9/9] resolve nit --- .../com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index 0c505a9d52..0dfbc5e22d 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -78,7 +78,7 @@ public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFa _newLb = newLb; _dualReadStateManager = dualReadStateManager; _isNewLbReady = false; - if(newLbExecutor == null) + if (newLbExecutor == null) { // Using a direct executor here means the code is executed directly, // blocking the caller. This means the old behavior is preserved.