@@ -18,12 +18,14 @@ import (
18
18
"container/list"
19
19
"fmt"
20
20
"sync"
21
+ "time"
21
22
22
23
log "github.com/sirupsen/logrus"
23
24
24
25
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
25
26
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
26
27
"github.com/projectcalico/calico/libcalico-go/lib/set"
28
+ "github.com/projectcalico/calico/typha/pkg/syncclient"
27
29
)
28
30
29
31
// DedupeBuffer buffer implements the syncer callbacks API on its
@@ -50,7 +52,9 @@ type DedupeBuffer struct {
50
52
51
53
// liveResourceKeys Contains an entry for every key that we have sent to
52
54
// the consumer and that we have not subsequently sent a deletion for.
53
- liveResourceKeys set.Set [string ]
55
+ liveResourceKeys set.Set [string ]
56
+ liveKeysNotSeenSinceReconnect set.Set [string ]
57
+ resyncStart time.Time
54
58
// pendingUpdates is the queue of updates that we want to send to the
55
59
// consumer. We use a linked list so that we can remove items from
56
60
// the middle if they are deleted before making it off the queue.
@@ -69,11 +73,39 @@ func New() *DedupeBuffer {
69
73
return d
70
74
}
71
75
76
+ func (d * DedupeBuffer ) OnTyphaConnectionRestarted () {
77
+ d .lock .Lock ()
78
+ defer d .lock .Unlock ()
79
+
80
+ // We're about to be sent a complete new snapshot of the data. Clear
81
+ // our in-flight state and make a transient copy of the keys that we have
82
+ // already sent so that we can figure out if any KVs were deleted while
83
+ // we were disconnected.
84
+ log .Info ("Typha connection restarted, clearing pending update queue." )
85
+ clear (d .keyToPendingUpdate )
86
+ d .pendingUpdates = list.List {}
87
+ if d .liveKeysNotSeenSinceReconnect == nil {
88
+ // Not already doing a resync.
89
+ d .resyncStart = time .Now ()
90
+ }
91
+ d .liveKeysNotSeenSinceReconnect = d .liveResourceKeys .Copy ()
92
+ }
93
+
72
94
// OnStatusUpdated queues a status update to be sent to the sink.
73
95
func (d * DedupeBuffer ) OnStatusUpdated (status api.SyncStatus ) {
74
96
d .lock .Lock ()
75
97
defer d .lock .Unlock ()
76
98
99
+ // Check if queue is empty before onInSyncAfterReconnection() since that
100
+ // call may push things onto the queue.
101
+ queueWasEmpty := d .pendingUpdates .Len () == 0
102
+
103
+ if status == api .InSync && d .liveKeysNotSeenSinceReconnect != nil {
104
+ // We were processing a reconnection and now we're in sync. See if we
105
+ // need to clean anything up.
106
+ d .onInSyncAfterReconnection ()
107
+ }
108
+
77
109
// Statuses are idempotent so skip sending if the latest one in the queue
78
110
// was the same.
79
111
if d .mostRecentStatusReceived == status {
@@ -94,7 +126,6 @@ func (d *DedupeBuffer) OnStatusUpdated(status api.SyncStatus) {
94
126
}
95
127
96
128
// Add the status to the queue.
97
- queueWasEmpty := d .pendingUpdates .Len () == 0
98
129
d .pendingUpdates .PushBack (status )
99
130
if queueWasEmpty {
100
131
// Only need to signal when the first item goes on the queue.
@@ -146,40 +177,11 @@ func (d *DedupeBuffer) OnUpdatesKeysKnown(updates []api.Update, keys []string) {
146
177
continue
147
178
}
148
179
}
149
-
150
- if element , ok := d .keyToPendingUpdate [key ]; ok {
151
- // Already got an in-flight update for this key.
152
- if u .Value == nil && ! d .liveResourceKeys .Contains (key ) {
153
- // This is a deletion, but the key in question never made it
154
- // off the queue, remove it entirely.
155
- if debug {
156
- log .WithField ("key" , key ).Debug ("Key deleted before being sent." )
157
- }
158
- delete (d .keyToPendingUpdate , key )
159
- d .pendingUpdates .Remove (element )
160
- } else {
161
- // Update to a key that's already on the queue, swap in the
162
- // most recent value.
163
- if debug {
164
- log .WithField ("key" , key ).Debug ("Key updated before being sent." )
165
- }
166
- usk := element .Value .(updateWithStringKey )
167
- usk .update = u
168
- element .Value = usk
169
- }
170
- } else {
171
- // No in-flight entry for this key. Add to queue and record that
172
- // it's in flight.
173
- if debug {
174
- log .WithField ("key" , key ).Debug ("No in flight value for key, adding to queue." )
175
- }
176
- element = d .pendingUpdates .PushBack (updateWithStringKey {
177
- key : key ,
178
- update : u ,
179
- })
180
- d .keyToPendingUpdate [key ] = element
181
- d .peakPendingUpdatesLen = max (len (d .keyToPendingUpdate ), d .peakPendingUpdatesLen )
180
+ if d .liveKeysNotSeenSinceReconnect != nil {
181
+ d .liveKeysNotSeenSinceReconnect .Discard (key )
182
182
}
183
+
184
+ d .queueUpdate (key , u )
183
185
}
184
186
queueNowEmpty := d .pendingUpdates .Len () == 0
185
187
if queueWasEmpty && ! queueNowEmpty {
@@ -191,6 +193,57 @@ func (d *DedupeBuffer) OnUpdatesKeysKnown(updates []api.Update, keys []string) {
191
193
}
192
194
}
193
195
196
+ func (d * DedupeBuffer ) queueUpdate (key string , u api.Update ) {
197
+ debug := log .IsLevelEnabled (log .DebugLevel )
198
+
199
+ if u .Value != nil {
200
+ // A new KV or an update. Since we dedupe sequences of updates for the
201
+ // same key, we need to recalculate the update type to make sense to the
202
+ // downstream receiver. We do this even if the update is not on the
203
+ // queue in order to handle resyncs with Typha.
204
+ if d .liveResourceKeys .Contains (key ) {
205
+ u .UpdateType = api .UpdateTypeKVUpdated
206
+ } else {
207
+ u .UpdateType = api .UpdateTypeKVNew
208
+ }
209
+ }
210
+
211
+ if element , ok := d .keyToPendingUpdate [key ]; ok {
212
+ // Already got an in-flight update for this key.
213
+ if u .Value == nil && ! d .liveResourceKeys .Contains (key ) {
214
+ // This is a deletion, but the key in question never made it
215
+ // off the queue, remove it entirely.
216
+ if debug {
217
+ log .WithField ("key" , key ).Debug ("Key deleted before being sent." )
218
+ }
219
+ delete (d .keyToPendingUpdate , key )
220
+ d .pendingUpdates .Remove (element )
221
+ } else {
222
+ // Update to a key that's already on the queue, swap in the
223
+ // most recent value.
224
+ if debug {
225
+ log .WithField ("key" , key ).Debug ("Key updated before being sent." )
226
+ }
227
+
228
+ usk := element .Value .(updateWithStringKey )
229
+ usk .update = u
230
+ element .Value = usk
231
+ }
232
+ } else {
233
+ // No in-flight entry for this key. Add to queue and record that
234
+ // it's in flight.
235
+ if debug {
236
+ log .WithField ("key" , key ).Debug ("No in flight value for key, adding to queue." )
237
+ }
238
+ element = d .pendingUpdates .PushBack (updateWithStringKey {
239
+ key : key ,
240
+ update : u ,
241
+ })
242
+ d .keyToPendingUpdate [key ] = element
243
+ d .peakPendingUpdatesLen = max (len (d .keyToPendingUpdate ), d .peakPendingUpdatesLen )
244
+ }
245
+ }
246
+
194
247
func (d * DedupeBuffer ) SendToSinkForever (sink api.SyncerCallbacks ) {
195
248
d .lock .Lock ()
196
249
defer d .lock .Unlock ()
@@ -314,4 +367,37 @@ func (d *DedupeBuffer) dropLockAndSendBatch(sink api.SyncerCallbacks, buf []any)
314
367
}
315
368
}
316
369
370
+ func (d * DedupeBuffer ) onInSyncAfterReconnection () {
371
+ defer func () {
372
+ log .Infof ("Resync with Typha complete; dropping resync-tracking state. Resync took %v." ,
373
+ time .Since (d .resyncStart ).Round (time .Millisecond ))
374
+ d .liveKeysNotSeenSinceReconnect = nil
375
+ }()
376
+
377
+ if d .liveKeysNotSeenSinceReconnect .Len () == 0 {
378
+ return
379
+ }
380
+
381
+ log .Infof ("In sync with Typha, synthesizing deletions for %d " +
382
+ "resources not seen during the resync." ,
383
+ d .liveKeysNotSeenSinceReconnect .Len ())
384
+ d .liveKeysNotSeenSinceReconnect .Iter (func (key string ) error {
385
+ parsedKey := model .KeyFromDefaultPath (key )
386
+ if parsedKey == nil {
387
+ // Not clear how this could happen since these keys came from the
388
+ // set that we'd already parsed and passed downstream!
389
+ log .WithField ("key" , key ).Panic ("Failed to parse key during reconnection to Typha." )
390
+ }
391
+ d .queueUpdate (key , api.Update {
392
+ KVPair : model.KVPair {
393
+ Key : parsedKey ,
394
+ Value : nil ,
395
+ },
396
+ UpdateType : api .UpdateTypeKVDeleted ,
397
+ })
398
+ return nil
399
+ })
400
+ }
401
+
317
402
var _ api.SyncerCallbacks = (* DedupeBuffer )(nil )
403
+ var _ syncclient.RestartAwareCallbacks = (* DedupeBuffer )(nil )
0 commit comments