Skip to content

Commit

Permalink
Enable ordered responses for delta watches
Browse files Browse the repository at this point in the history
Signed-off-by: huabing zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Aug 14, 2023
1 parent ca30412 commit 5c21349
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 39 deletions.
114 changes: 76 additions & 38 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,54 +233,93 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
info.mu.Lock()
defer info.mu.Unlock()

// responder callback for SOTW watches
respond := func(watch ResponseWatch, id int64) error {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
}
// discard the watch
delete(info.watches, id)
// Respond to SOTW watches for the node.
if err := cache.respondSOTWWatches(ctx, info, snapshot); err != nil {
return err
}

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
}

return nil
}

func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
// responder callback for SOTW watches
respond := func(watch ResponseWatch, id int64) error {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
}
return nil
// discard the watch
delete(info.watches, id)
}
return nil
}

// If ADS is enabled we need to order response watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseWatches()
for _, key := range info.orderedWatches {
err := respond(info.watches[key.ID], key.ID)
if err != nil {
return err
}
// If ADS is enabled we need to order response watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseWatches()
for _, key := range info.orderedWatches {
err := respond(info.watches[key.ID], key.ID)
if err != nil {
return err
}
} else {
for id, watch := range info.watches {
err := respond(watch, id)
if err != nil {
return err
}
}
} else {
for id, watch := range info.watches {
err := respond(watch, id)
if err != nil {
return err
}
}
}

return nil
}

func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
if len(info.deltaWatches) > 0 {
err := snapshot.ConstructVersionMap()
if err != nil {
return err
}
}

// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
if len(info.deltaWatches) > 0 {
err := snapshot.ConstructVersionMap()
// If ADS is enabled we need to order response delta watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseDeltaWatches()
for _, key := range info.orderedDeltaWatches {
watch := info.deltaWatches[key.ID]
res, err := cache.respondDelta(
ctx,
snapshot,
watch.Request,
watch.Response,
watch.StreamState,
)
if err != nil {
return err
}
// If we detect a nil response here, that means there has been no state change
// so we don't want to respond or remove any existing resource watches
if res != nil {
delete(info.deltaWatches, key.ID)
}
}

// this won't run if there are no delta watches
// to process.
} else {
for id, watch := range info.deltaWatches {
res, err := cache.respondDelta(
ctx,
Expand All @@ -299,7 +338,6 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
}
}
}

return nil
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/cache/v3/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type statusInfo struct {
orderedWatches keys

// deltaWatches are indexed channels for the delta response watches and the original requests
deltaWatches map[int64]DeltaResponseWatch
deltaWatches map[int64]DeltaResponseWatch
orderedDeltaWatches keys

// the timestamp of the last watch request
lastWatchRequestTime time.Time
Expand Down Expand Up @@ -177,3 +178,22 @@ func (info *statusInfo) orderResponseWatches() {
// This is only run when we enable ADS on the cache.
sort.Sort(info.orderedWatches)
}

// orderResponseDeltaWatches will track a list of delta watch keys and order them if
// true is passed.
func (info *statusInfo) orderResponseDeltaWatches() {
info.orderedDeltaWatches = make(keys, len(info.deltaWatches))

var index int
for id, deltaWatch := range info.deltaWatches {
info.orderedDeltaWatches[index] = key{
ID: id,
TypeURL: deltaWatch.Request.TypeUrl,
}
index++
}

// Sort our list which we can use in the SetSnapshot functions.
// This is only run when we enable ADS on the cache.
sort.Sort(info.orderedDeltaWatches)
}

0 comments on commit 5c21349

Please sign in to comment.