Skip to content

Commit 560c2cd

Browse files
committed
Fix inconsistent update types from dedupe buffer.
If the dedupe buffer sees "create", "delete", "create", and it elides the "delete" then it could send duplicate "new" events for the same key resulting in miscounting downstream.
1 parent 4c26af2 commit 560c2cd

File tree

4 files changed

+332
-17
lines changed

4 files changed

+332
-17
lines changed

felix/daemon/daemon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ configRetry:
592592
}
593593
healthAggregator.Report(healthName, &health.HealthReport{Live: true, Ready: true})
594594
// Up-to-data Typha client will refuse to connect unless Typha signals
595-
// that it supports nore resource updates.
595+
// that it supports node resource updates.
596596
configParams.SetUseNodeResourceUpdates(true)
597597

598598
go func() {

libcalico-go/lib/backend/api/api.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,21 @@ const (
191191
UpdateTypeKVDeleted
192192
)
193193

194+
func (u UpdateType) String() string {
195+
switch u {
196+
case UpdateTypeKVUnknown:
197+
return "unknown"
198+
case UpdateTypeKVNew:
199+
return "new"
200+
case UpdateTypeKVUpdated:
201+
return "updated"
202+
case UpdateTypeKVDeleted:
203+
return "deleted"
204+
default:
205+
return fmt.Sprintf("Unknown<%v>", uint8(u))
206+
}
207+
}
208+
194209
// Interface can be implemented by anything that knows how to watch and report changes.
195210
type WatchInterface interface {
196211
// Stops watching. Will close the channel returned by ResultChan(). Releases

libcalico-go/lib/backend/syncersv1/dedupebuffer/dedupe_buffer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,19 @@ func (d *DedupeBuffer) OnUpdatesKeysKnown(updates []api.Update, keys []string) {
192192

193193
func (d *DedupeBuffer) queueUpdate(key string, u api.Update) {
194194
debug := log.IsLevelEnabled(log.DebugLevel)
195+
196+
if u.Value != nil {
197+
// A new KV or an update. Since we dedupe sequences of updates for the
198+
// same key, we need to recalculate the update type to make sense to the
199+
// downstream receiver. We do this even if the update is not on the
200+
// queue in order to handle resyncs with Typha.
201+
if d.liveResourceKeys.Contains(key) {
202+
u.UpdateType = api.UpdateTypeKVUpdated
203+
} else {
204+
u.UpdateType = api.UpdateTypeKVNew
205+
}
206+
}
207+
195208
if element, ok := d.keyToPendingUpdate[key]; ok {
196209
// Already got an in-flight update for this key.
197210
if u.Value == nil && !d.liveResourceKeys.Contains(key) {
@@ -208,6 +221,7 @@ func (d *DedupeBuffer) queueUpdate(key string, u api.Update) {
208221
if debug {
209222
log.WithField("key", key).Debug("Key updated before being sent.")
210223
}
224+
211225
usk := element.Value.(updateWithStringKey)
212226
usk.update = u
213227
element.Value = usk

0 commit comments

Comments
 (0)