Skip to content

Commit

Permalink
Fixed race conditions and bugs in xds client (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamgupta1 authored Oct 10, 2023
1 parent 6c314b1 commit 5e63519
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 50 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.46.7] - 2023-10-10
- fix xDS client bugs and race conditions

## [29.46.6] - 2023-10-04
- simplify symlink subscription in xds flow

Expand Down Expand Up @@ -5548,7 +5551,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.6...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.7...master
[29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7
[29.46.6]: https://github.com/linkedin/rest.li/compare/v29.46.5...v29.46.6
[29.46.5]: https://github.com/linkedin/rest.li/compare/v29.45.1...v29.45.2
[29.46.4]: https://github.com/linkedin/rest.li/compare/v29.46.3...v29.46.4
Expand Down
17 changes: 16 additions & 1 deletion d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ public D2Client build()
_config.failoutConfigProviderFactory,
_config.failoutRedirectStrategy,
_config.serviceDiscoveryEventEmitter,
_config.dualReadStateManager
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -641,6 +643,19 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

/**
* Single-threaded executor service for xDS communication.
*/
public D2ClientBuilder setXdsExecutorService(ScheduledExecutorService xdsExecutorService) {
_config.xdsExecutorService = xdsExecutorService;
return this;
}

public D2ClientBuilder setXdsStreamReadyTimeout(long xdsStreamReadyTimeout) {
_config.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{
final Map<String, TransportClientFactory> clientFactories = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ public class D2ClientConfig
public ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter = new LogOnlyServiceDiscoveryEventEmitter(); // default to use log-only emitter
public DualReadStateManager dualReadStateManager = null;

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;

public D2ClientConfig()
{
}
Expand Down Expand Up @@ -190,7 +193,9 @@ public D2ClientConfig()
FailoutConfigProviderFactory failoutConfigProviderFactory,
FailoutRedirectStrategy failoutRedirectStrategy,
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager)
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -254,5 +259,7 @@ public D2ClientConfig()
this.failoutRedirectStrategy = failoutRedirectStrategy;
this.serviceDiscoveryEventEmitter = serviceDiscoveryEventEmitter;
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
}
}
151 changes: 113 additions & 38 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -39,6 +41,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -49,6 +52,7 @@
public class XdsClientImpl extends XdsClient
{
private static final Logger _log = LoggerFactory.getLogger(XdsClientImpl.class);
public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L;

private final Map<String, ResourceSubscriber> _d2NodeSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2SymlinkNodeSubscribers = new HashMap<>();
Expand All @@ -62,9 +66,19 @@ public class XdsClientImpl extends XdsClient
private BackoffPolicy _retryBackoffPolicy;
private AdsStream _adsStream;
private boolean _shutdown;
private ScheduledFuture<?> _retryRpcStreamFuture;
private ScheduledFuture<?> _readyTimeoutFuture;
private final long _readyTimeoutMillis;

@Deprecated
public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService)
{
this(node, managedChannel, executorService, DEFAULT_READY_TIMEOUT_MILLIS);
}

public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService,
long readyTimeoutMillis) {
_readyTimeoutMillis = readyTimeoutMillis;
_node = node;
_managedChannel = managedChannel;
_executorService = executorService;
Expand All @@ -83,11 +97,13 @@ void watchXdsResource(String resourceName, ResourceType type, ResourceWatcher wa
subscriber = new ResourceSubscriber(type, resourceName);
resourceSubscriberMap.put(resourceName, subscriber);

if (_adsStream == null)
if (_adsStream == null && !isInBackoff())
{
startRpcStream();
startRpcStreamLocal();
}
if (_adsStream != null) {
_adsStream.sendDiscoveryRequest(type, Collections.singletonList(resourceName));
}
_adsStream.sendDiscoveryRequest(type, Collections.singletonList(resourceName));
}
subscriber.addWatcher(watcher);
});
Expand All @@ -96,9 +112,32 @@ void watchXdsResource(String resourceName, ResourceType type, ResourceWatcher wa
@Override
public void startRpcStream()
{
_executorService.execute(() -> {
if (!isInBackoff()) {
startRpcStreamLocal();
}
});
}

// Start RPC stream. Must be called from the executor, and only if we're not backed off.
private void startRpcStreamLocal() {
if (_shutdown) {
_log.warn("RPC stream cannot be started after shutdown!");
return;
}
// Check rpc stream is null to ensure duplicate RPC retry tasks are no-op
if (_adsStream != null) {
_log.warn("Tried to create duplicate RPC stream, ignoring!");
return;
}
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(_managedChannel);
_adsStream = new AdsStream(stub);
_readyTimeoutFuture = _executorService.schedule(() -> {
_log.warn("ADS stream not ready within {} milliseconds", _readyTimeoutMillis);
// notify subscribers about the error and wait for the stream to be ready by keeping it open.
notifyStreamError(Status.DEADLINE_EXCEEDED);
}, _readyTimeoutMillis, TimeUnit.MILLISECONDS);
_adsStream.start();
_log.info("ADS stream started, connected to server: {}", _managedChannel.authority());
}
Expand All @@ -122,6 +161,37 @@ String getXdsServerAuthority()
return _managedChannel.authority();
}

/**
* The client may be in backoff if there are RPC stream failures, and if it's waiting to establish the stream again.
* NOTE: Must be called from the executor.
* @return {@code true} if the client is in backoff
*/
private boolean isInBackoff() {
return _adsStream == null && _retryRpcStreamFuture != null && !_retryRpcStreamFuture.isDone();
}

/**
* Handles ready callbacks from the RPC stream. Must be called from the executor.
*/
private void readyHandler() {
_log.debug("Received ready callback from the ADS stream");
if (_adsStream == null || isInBackoff()) {
_log.warn("Unexpected state, ready called on null or backed off ADS stream!");
return;
}
// confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again.
if (_adsStream.isReady()) {
// if the ready timeout future is non-null, a reconnect notification hasn't been sent yet.
if (_readyTimeoutFuture != null) {
// timeout task will be cancelled only if it hasn't already executed.
boolean cancelledTimeout = _readyTimeoutFuture.cancel(false);
_log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout);
_readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers.
notifyStreamReconnect();
}
}
}

private void handleD2NodeResponse(DiscoveryResponseData data)
{
Map<String, D2NodeUpdate> updates = new HashMap<>();
Expand Down Expand Up @@ -213,7 +283,7 @@ private void handleResourceUpdate(Map<String, ? extends ResourceUpdate> updates,
}
}

private void handleStreamClosed(Status error) {
private void notifyStreamError(Status error) {
for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) {
subscriber.onError(error);
}
Expand All @@ -222,7 +292,7 @@ private void handleStreamClosed(Status error) {
}
}

private void handleStreamRestarted() {
private void notifyStreamReconnect() {
for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) {
subscriber.onReconnect();
}
Expand Down Expand Up @@ -328,10 +398,7 @@ private void onReconnect()
final class RpcRetryTask implements Runnable {
@Override
public void run() {
if (_shutdown) {
return;
}
startRpcStream();
startRpcStreamLocal();
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
Expand All @@ -342,7 +409,6 @@ public void run() {
_adsStream.sendDiscoveryRequest(type, resources);
}
}
handleStreamRestarted();
}
}

Expand Down Expand Up @@ -477,37 +543,45 @@ private AdsStream(@Nonnull AggregatedDiscoveryServiceGrpc.AggregatedDiscoverySer
_responseReceived = false;
}

public boolean isReady() {
return _requestWriter != null && ((ClientCallStreamObserver<?>) _requestWriter).isReady();
}

private void start()
{
StreamObserver<DeltaDiscoveryResponse> responseReader = new StreamObserver<DeltaDiscoveryResponse>()
{
@Override
public void onNext(DeltaDiscoveryResponse response)
{
_executorService.execute(() ->
{
_log.debug("Received {} response:\n{}", ResourceType.fromTypeUrl(response.getTypeUrl()), response);
DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response);
handleResponse(responseData);
});
}

@Override
public void onError(Throwable t)
{
_executorService.execute(() -> handleRpcError(t));
}

@Override
public void onCompleted()
{
_executorService.execute(() -> handleRpcCompleted());
}
};
StreamObserver<DeltaDiscoveryResponse> responseReader =
new ClientResponseObserver<DeltaDiscoveryRequest, DeltaDiscoveryResponse>() {
@Override
public void beforeStart(ClientCallStreamObserver<DeltaDiscoveryRequest> requestStream) {
requestStream.setOnReadyHandler(() -> _executorService.execute(XdsClientImpl.this::readyHandler));
}

@Override
public void onNext(DeltaDiscoveryResponse response)
{
_executorService.execute(() ->
{
_log.debug("Received {} response:\n{}", ResourceType.fromTypeUrl(response.getTypeUrl()), response);
DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response);
handleResponse(responseData);
});
}

@Override
public void onError(Throwable t)
{
_executorService.execute(() -> handleRpcError(t));
}

@Override
public void onCompleted()
{
_executorService.execute(() -> handleRpcCompleted());
}
};
_requestWriter = _stub.withWaitForReady().deltaAggregatedResources(responseReader);
}


/**
* Sends a client-initiated discovery request.
*/
Expand Down Expand Up @@ -576,6 +650,7 @@ private void handleRpcCompleted()
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("ADS stream closed by server"));
}

// Must be called from the executor.
private void handleRpcStreamClosed(Status error)
{
if (_closed)
Expand All @@ -585,7 +660,7 @@ private void handleRpcStreamClosed(Status error)
_log.error("ADS stream closed with status {}: {}. Cause: {}", error.getCode(), error.getDescription(),
error.getCause());
_closed = true;
handleStreamClosed(error);
notifyStreamError(error);
cleanUp();
if (_responseReceived || _retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
Expand All @@ -597,7 +672,7 @@ private void handleRpcStreamClosed(Status error)
delayNanos = _retryBackoffPolicy.nextBackoffNanos();
}
_log.info("Retry ADS stream in {} ns", delayNanos);
_executorService.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS);
_retryRpcStreamFuture = _executorService.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS);
}

private void close(Exception error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class XdsToD2PropertiesAdaptor
private static final String D2_URI_NODE_PREFIX = "/d2/uris/";
private static final char SYMLINK_NODE_IDENTIFIER = '$';
private static final char PATH_SEPARATOR = '/';
private static final String NON_EXISTENT_CLUSTER = "NonExistentCluster";

private final XdsClient _xdsClient;
private final List<XdsConnectionListener> _xdsConnectionListeners;
Expand All @@ -82,7 +83,7 @@ public class XdsToD2PropertiesAdaptor
private final Object _symlinkAndActualNodeLock = new Object();
private final ServiceDiscoveryEventEmitter _eventEmitter;

private boolean _isAvailable;
private Boolean _isAvailable;
private PropertyEventBus<UriProperties> _uriEventBus;
private PropertyEventBus<ServiceProperties> _serviceEventBus;
private PropertyEventBus<ClusterProperties> _clusterEventBus;
Expand All @@ -97,7 +98,8 @@ public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualRe
_clusterPropertiesJsonSerializer = new ClusterPropertiesJsonSerializer();
_uriPropertiesJsonSerializer = new UriPropertiesJsonSerializer();
_uriPropertiesMerger = new UriPropertiesMerger();
_isAvailable = false;
// set to null so that the first notification on connection establishment success/failure is always sent
_isAvailable = null;
_watchedClusterResources = new ConcurrentHashMap<>();
_watchedSymlinkResources = new ConcurrentHashMap<>();
_watchedServiceResources = new ConcurrentHashMap<>();
Expand All @@ -108,7 +110,10 @@ public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualRe
public void start()
{
_xdsClient.startRpcStream();
notifyAvailabilityChanges(true);
// Watch any resource to get notified of xds connection updates, including initial connection establishment.
// TODO: Note, this is a workaround since the xDS client implementation currently integrates connection
// error/success notifications along with the resource updates. This can be improved in a future refactor.
listenToCluster(NON_EXISTENT_CLUSTER);
}

public void shutdown()
Expand Down Expand Up @@ -361,7 +366,7 @@ private void notifyAvailabilityChanges(boolean isAvailable)
{
synchronized (_xdsConnectionListeners)
{
if (_isAvailable != isAvailable)
if (_isAvailable == null || _isAvailable != isAvailable)
{
_isAvailable = isAvailable;

Expand Down
Loading

0 comments on commit 5e63519

Please sign in to comment.