Skip to content

Commit a25c0f5

Browse files
committed
Fix Typha tests.
1 parent 560c2cd commit a25c0f5

File tree

2 files changed

+11
-46
lines changed

2 files changed

+11
-46
lines changed

typha/fv-tests/server_test.go

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ var _ = Describe("With an in-process Server", func() {
192192
// simulates a second connection and check that that also converges to the given state.
193193
expectClientState := func(c *ClientState, status api.SyncStatus, kvs map[string]api.Update) {
194194
// Wait until we reach that state.
195-
Eventually(c.recorder.Status).Should(Equal(status))
196-
Eventually(c.recorder.KVs).Should(Equal(kvs))
195+
EventuallyWithOffset(1, c.recorder.Status).Should(Equal(status), "Unexpected sync status")
196+
EventuallyWithOffset(1, c.recorder.KVs).Should(Equal(kvs), "Unexpected KVs")
197197

198198
// Now, a newly-connecting client should also reach the same state.
199199
log.Info("Starting transient client to read snapshot.")
@@ -303,7 +303,7 @@ var _ = Describe("With an in-process Server", func() {
303303
},
304304
)
305305
// Our updates shouldn't affect the felix syncer.
306-
expectFelixClientState(api.WaitForDatastore, map[string]api.Update{})
306+
expectFelixClientState(api.ResyncInProgress, map[string]api.Update{})
307307
},
308308
Entry("IP pool", ipPool1, "/calico/v1/ipam/v4/pool/10.0.1.0-24"),
309309
Entry("Node conf", nodeBGPConfNode, "/calico/bgp/v1/host/node1/foo"),
@@ -404,7 +404,7 @@ var _ = Describe("With an in-process Server", func() {
404404
})
405405

406406
// Simulate an old client.
407-
Describe("with a client that doesn't support connection restart", func() {
407+
Describe("with a client that doesn't support decoder restart", func() {
408408
BeforeEach(func() {
409409
h.CreateClientNoDecodeRestart("no decoder restart", syncproto.SyncerTypeFelix)
410410
})
@@ -685,49 +685,11 @@ var _ = Describe("With an in-process Server with short ping timeout", func() {
685685
// interval for the check to take place.
686686
time.Sleep(1 * time.Second)
687687

688-
// Check that the client knows it can use node resource updates since the server supports it.
689-
supportsNodeResourceUpdates, err := client.SupportsNodeResourceUpdates(10 * time.Second)
690-
Expect(supportsNodeResourceUpdates).To(BeTrue())
691-
Expect(err).To(BeNil())
692-
693688
// Then send an update.
694689
h.FelixCache.OnStatusUpdated(api.InSync)
695690
Eventually(recorder.Status).Should(Equal(api.InSync))
696691
})
697692

698-
It("should return an error if client does not receive a server hello in time", func() {
699-
// Start a real client, which will respond correctly to pings.
700-
clientCxt, clientCancel := context.WithCancel(context.Background())
701-
recorder := NewRecorder()
702-
703-
client := syncclient.New(
704-
h.Discoverer(),
705-
"test-version",
706-
"test-host",
707-
"test-info",
708-
recorder,
709-
nil,
710-
)
711-
err := client.Start(clientCxt)
712-
recorderCtx, recorderCancel := context.WithCancel(context.Background())
713-
defer recorderCancel()
714-
go recorder.Loop(recorderCtx)
715-
Expect(err).NotTo(HaveOccurred())
716-
defer func() {
717-
clientCancel()
718-
client.Finished.Wait()
719-
}()
720-
721-
// Kill the server
722-
h.ServerCancel()
723-
h.Server.Finished.Wait()
724-
725-
// Check that the client knows it can use node resource updates since the server supports it.
726-
supportsNodeResourceUpdates, err := client.SupportsNodeResourceUpdates(0 * time.Second)
727-
Expect(supportsNodeResourceUpdates).To(BeFalse())
728-
Expect(err).NotTo(BeNil())
729-
})
730-
731693
Describe("with a raw connection", func() {
732694
var rawConn net.Conn
733695
var w *gob.Encoder
@@ -1589,7 +1551,6 @@ var _ = Describe("with server requiring TLS", func() {
15891551

15901552
if !expectConnection {
15911553
// Client connection should have failed, so should not have got any updates.
1592-
Consistently(clientState.recorder.Status).Should(Equal(api.SyncStatus(0)))
15931554
Consistently(clientState.recorder.KVs).Should(Equal(map[string]api.Update{}))
15941555
}
15951556
}

typha/pkg/syncclient/sync_client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,18 @@ func (s *SyncerClient) Start(cxt context.Context) error {
208208
// We can only do that if the client is restart-aware.
209209
s.Finished.Add(1)
210210
go func() {
211-
defer s.Finished.Done()
211+
defer func() {
212+
s.logCxt.Info("Typha client shutting down.")
213+
s.Finished.Done()
214+
}()
212215

213216
for cxt.Err() == nil {
214217
connectionFinishedWG.Wait()
215218
if rac, ok := s.callbacks.(RestartAwareCallbacks); ok {
216219
log.Info("Typha connection failed but client callback is restart-aware. Restarting connection...")
217220
s.refreshConnID()
218221
rac.OnTyphaConnectionRestarted()
222+
s.callbacks.OnStatusUpdated(api.WaitForDatastore)
219223
} else {
220224
log.Info("Typha client callback is not restart-aware. Exiting...")
221225
return
@@ -234,7 +238,7 @@ func (s *SyncerClient) Start(cxt context.Context) error {
234238
func (s *SyncerClient) startOneConnection(cxt context.Context, connFinished *sync.WaitGroup) error {
235239
// Defensive: in case there's a bug in NextAddr() and it never stops returning values,
236240
// set a sanity limit on the number of tries.
237-
s.callbacks.OnStatusUpdated(api.WaitForDatastore)
241+
s.logCxt.Debug("Signalling WaitForDatastore...")
238242
startTime := time.Now()
239243
maxTries := s.calculateConnectionAttemptLimit(len(s.discoverer.CachedTyphaAddrs()))
240244
remainingTries := maxTries
@@ -255,7 +259,6 @@ func (s *SyncerClient) startOneConnection(cxt context.Context, connFinished *syn
255259
time.Sleep(100 * time.Millisecond) // Avoid tight loop.
256260
} else {
257261
s.logCxt.Infof("Successfully connected to Typha at %s after %v.", addr.Addr, time.Since(startTime))
258-
s.callbacks.OnStatusUpdated(api.ResyncInProgress)
259262
break
260263
}
261264
}
@@ -436,6 +439,7 @@ func (s *SyncerClient) loop(cxt context.Context, cancelFn context.CancelFunc, co
436439

437440
logCxt := s.logCxt.WithField("connection", s.connInfo)
438441
logCxt.Info("Started Typha client main loop")
442+
s.callbacks.OnStatusUpdated(api.ResyncInProgress)
439443

440444
// Always start with basic gob encoding for the handshake. We may upgrade to a compressed version below.
441445
s.encoder = gob.NewEncoder(s.connection)

0 commit comments

Comments
 (0)