Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: update watcher API as per gRFC A88 #7977

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (sc *ServerConfig) ServerFeatures() []string {
//
// This feature controls the behavior of the xDS client when the server deletes
// a previously sent Listener or Cluster resource. If set, the xDS client will
// not invoke the watchers' OnResourceDoesNotExist() method when a resource is
// not invoke the watchers' ResourceError() method when a resource is
// deleted, nor will it remove the existing resource value from its cache.
func (sc *ServerConfig) ServerFeaturesIgnoreResourceDeletion() bool {
for _, sf := range sc.serverFeatures {
Expand Down
38 changes: 19 additions & 19 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,49 +71,49 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigResourceData, onDone func()) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterResourceData, onDone func()) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsResourceData, onDone func()) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) AmbientError(_ error, onDone func()) {
onDone()
}

Expand All @@ -127,31 +127,31 @@ func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc)
// for ADS stream level flow control), and was causing CSDS to not receive any
// updates from the xDS client.
type blockingListenerWatcher struct {
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan xdsresource.OnDoneFunc // Channel to write the onDone callback to.
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan func() // Channel to write the onDone callback to.
}

func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher {
return &blockingListenerWatcher{
testCtxDone: testCtxDone,
onDoneCh: make(chan xdsresource.OnDoneFunc, 1),
onDoneCh: make(chan func(), 1),
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) ResourceError(_ error, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) AmbientError(_ error, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

// writeOnDone attempts to write the onDone callback on the onDone channel. It
// returns when it can successfully write to the channel or when the test is
// done, which is signalled by testCtxDone being closed.
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.OnDoneFunc, onDone xdsresource.OnDoneFunc) {
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan func(), onDone func()) {
select {
case <-testCtxDone:
case onDoneCh <- onDone:
Expand Down
61 changes: 33 additions & 28 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@
if b.lbCfg != nil {
root = b.lbCfg.ClusterName
}
b.onClusterError(root, err)
if b.childLB != nil {
b.onClusterAmbientError(root, err)
return
}
b.onClusterResourceError(root, err)
})
}

Expand Down Expand Up @@ -474,20 +478,32 @@
// If the security config is invalid, for example, if the provider
// instance is not found in the bootstrap config, we need to put the
// channel in transient failure.
b.onClusterError(name, b.annotateErrorWithNodeID(fmt.Errorf("received Cluster resource contains invalid security config: %v", err)))
if b.childLB != nil {
b.onClusterAmbientError(name, b.annotateErrorWithNodeID(fmt.Errorf("received Cluster resource contains invalid security config: %v", err)))

Check warning on line 482 in xds/internal/balancer/cdsbalancer/cdsbalancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/cdsbalancer/cdsbalancer.go#L482

Added line #L482 was not covered by tests
} else {
b.onClusterResourceError(name, b.annotateErrorWithNodeID(fmt.Errorf("received Cluster resource contains invalid security config: %v", err)))
}
return
}
}

clustersSeen := make(map[string]bool)
dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen)
if err != nil {
b.onClusterError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("failed to generate discovery mechanisms: %v", err)))
if b.childLB != nil {
b.onClusterAmbientError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("failed to generate discovery mechanisms: %v", err)))

Check warning on line 494 in xds/internal/balancer/cdsbalancer/cdsbalancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/cdsbalancer/cdsbalancer.go#L494

Added line #L494 was not covered by tests
} else {
b.onClusterResourceError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("failed to generate discovery mechanisms: %v", err)))
}
return
}
if ok {
if len(dms) == 0 {
b.onClusterError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph has no leaf clusters")))
if b.childLB != nil {
b.onClusterAmbientError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph has no leaf clusters")))

Check warning on line 503 in xds/internal/balancer/cdsbalancer/cdsbalancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/cdsbalancer/cdsbalancer.go#L503

Added line #L503 was not covered by tests
} else {
b.onClusterResourceError(b.lbCfg.ClusterName, b.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph has no leaf clusters")))
}
return
}
// Child policy is built the first time we resolve the cluster graph.
Expand Down Expand Up @@ -542,38 +558,27 @@
}
}

// Handles an error Cluster update from the xDS client. Propagates the error
// down to the child policy if one exists, or puts the channel in
// TRANSIENT_FAILURE.
// Handles an ambient error Cluster update from the xDS client to not stop
// using the previously seen resource.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterError(name string, err error) {
b.logger.Warningf("Cluster resource %q received error update: %v", name, err)
func (b *cdsBalancer) onClusterAmbientError(name string, err error) {
b.logger.Warningf("Cluster resource %q received ambient error update: %v", name, err)

if b.childLB != nil {
if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection {
// Connection errors will be sent to the child balancers directly.
// There's no need to forward them.
b.childLB.ResolverError(err)
}
} else {
// If child balancer was never created, fail the RPCs with
// errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(fmt.Errorf("%q: %v", name, err)),
})
if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection && b.childLB != nil {
// Connection errors will be sent to the child balancers directly.
// There's no need to forward them.
b.childLB.ResolverError(err)
}
}

// Handles a resource-not-found error from the xDS client. Propagates the error
// down to the child policy if one exists, or puts the channel in
// TRANSIENT_FAILURE.
// Handles an error Cluster update from the xDS client to stop using the
// previously seen resource. Propagates the error down to the child policy
// if one exists, and puts the channel in TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
b.logger.Warningf("CDS watch for resource %q reported resource-does-not-exist error", name)
err := b.annotateErrorWithNodeID(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "cluster %q not found", name))
func (b *cdsBalancer) onClusterResourceError(name string, err error) {
b.logger.Warningf("CDS watch for resource %q reported resource error", name)
b.closeChildPolicyAndReportTF(err)
}

Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {

// Ensure RPC fails with Unavailable status code and the error message is
// meaningful and contains the xDS node ID.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
wantErr := fmt.Sprintf("resource %q of type %q has been removed", clusterName, "ClusterResource")
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err := verifyRPCError(err, codes.Unavailable, wantErr, nodeID); err != nil {
t.Fatal(err)
Expand Down
14 changes: 7 additions & 7 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
func (cw *clusterWatcher) ResourceChanged(u *xdsresource.ClusterResourceData, onDone func()) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
func (cw *clusterWatcher) ResourceError(err error, onDone func()) {
handleResourceError := func(context.Context) { cw.parent.onClusterResourceError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleResourceError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
func (cw *clusterWatcher) AmbientError(err error, onDone func()) {
handleError := func(context.Context) { cw.parent.onClusterAmbientError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
}

// Ensure that RPCs start to fail with expected error.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
wantErr := fmt.Sprintf("resource %q of type %q has been removed", clusterName, "ClusterResource")
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type resourceUpdate struct {
priorities []priorityConfig
// To be invoked once the update is completely processed, or is dropped in
// favor of a newer update.
onDone xdsresource.OnDoneFunc
onDone func()
}

// topLevelResolver is used by concrete endpointsResolver implementations for
Expand All @@ -50,7 +50,7 @@ type topLevelResolver interface {
// endpointsResolver implementation. The onDone callback is to be invoked
// once the update is completely processed, or is dropped in favor of a
// newer update.
onUpdate(onDone xdsresource.OnDoneFunc)
onUpdate(onDone func())
}

// endpointsResolver wraps the functionality to resolve a given resource name to
Expand Down Expand Up @@ -282,7 +282,7 @@ func (rr *resourceResolver) stop(closing bool) {
// clusterresolver LB policy.
//
// Caller must hold rr.mu.
func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
func (rr *resourceResolver) generateLocked(onDone func()) {
var ret []priorityConfig
for _, rDM := range rr.children {
u, ok := rDM.r.lastUpdate()
Expand Down Expand Up @@ -312,7 +312,7 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
rr.updateChannel <- &resourceUpdate{priorities: ret, onDone: onDone}
}

func (rr *resourceResolver) onUpdate(onDone xdsresource.OnDoneFunc) {
func (rr *resourceResolver) onUpdate(onDone func()) {
handleUpdate := func(context.Context) {
rr.mu.Lock()
rr.generateLocked(onDone)
Expand Down
38 changes: 10 additions & 28 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
return ret
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
// ResourceChanged is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
Expand All @@ -89,54 +89,36 @@
er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) ResourceError(err error, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
}

if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
er.logger.Infof("EDS discovery mechanism for resource %q reported resource error: %v", er.nameToWatch, err)

Check warning on line 99 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L99

Added line #L99 was not covered by tests
}

er.mu.Lock()
if er.update != nil {
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
onDone()
return
}

// Else report an empty update that would result in no priority child being
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) AmbientError(err error, onDone func()) {
if er.stopped.HasFired() {
onDone()
return
}

er.logger.Warningf("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)

// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported ambient error: %v", er.nameToWatch, err)
}

Check warning on line 123 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L122-L123

Added lines #L122 - L123 were not covered by tests
}
4 changes: 2 additions & 2 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ type erroringConfigSelector struct {
err error
}

func newErroringConfigSelector(xdsNodeID string) *erroringConfigSelector {
return &erroringConfigSelector{err: annotateErrorWithNodeID(status.Errorf(codes.Unavailable, "no valid clusters"), xdsNodeID)}
func newErroringConfigSelector(err error, xdsNodeID string) *erroringConfigSelector {
return &erroringConfigSelector{err: annotateErrorWithNodeID(status.Errorf(codes.Unavailable, err.Error()), xdsNodeID)}
}

func (cs *erroringConfigSelector) SelectConfig(iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
Expand Down
Loading