From 4b0f7255300675a574bf2f9a8a764bc67586942e Mon Sep 17 00:00:00 2001 From: Robert Bailey Date: Wed, 21 Feb 2024 11:13:36 -0800 Subject: [PATCH] Fix the handling of removing disconnected streams to avoid a panic (#3668) when multiple streams disconnect. --- pkg/sdkserver/sdkserver.go | 24 ++++++++++++------- pkg/sdkserver/sdkserver_test.go | 42 ++++++++++++++++----------------- 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/pkg/sdkserver/sdkserver.go b/pkg/sdkserver/sdkserver.go index 7102d5a796..15a142349c 100644 --- a/pkg/sdkserver/sdkserver.go +++ b/pkg/sdkserver/sdkserver.go @@ -1243,27 +1243,35 @@ func (s *SDKServer) sendGameServerUpdate(gs *agonesv1.GameServer) { s.streamMutex.Lock() defer s.streamMutex.Unlock() - for i, stream := range s.connectedStreams { + // Filter the slice of streams sharing the same backing array and capacity as the original + // so that storage is reused and no memory allocations are made. This modifies the original + // slice. + // + // See https://go.dev/wiki/SliceTricks#filtering-without-allocating + remainingStreams := s.connectedStreams[:0] + for _, stream := range s.connectedStreams { select { case <-stream.Context().Done(): - s.connectedStreams = append(s.connectedStreams[:i], s.connectedStreams[i+1:]...) + s.logger.Debug("Dropping stream") err := stream.Context().Err() switch { case err != nil: s.logger.WithError(errors.WithStack(err)).Error("stream closed with error") default: - s.logger.Debug("stream closed") + s.logger.Debug("Stream closed") } - continue default: - } + s.logger.Debug("Keeping stream") + remainingStreams = append(remainingStreams, stream) - if err := stream.Send(convert(gs)); err != nil { - s.logger.WithError(errors.WithStack(err)). - Error("error sending game server update event") + if err := stream.Send(convert(gs)); err != nil { + s.logger.WithError(errors.WithStack(err)). + Error("error sending game server update event") + } } } + s.connectedStreams = remainingStreams if gs.Status.State == agonesv1.GameServerStateShutdown { // Wrap this in a go func(), just in case pushing to this channel deadlocks since there is only one instance of diff --git a/pkg/sdkserver/sdkserver_test.go b/pkg/sdkserver/sdkserver_test.go index 95b2713fd2..d8e1d2adab 100644 --- a/pkg/sdkserver/sdkserver_test.go +++ b/pkg/sdkserver/sdkserver_test.go @@ -833,28 +833,28 @@ func TestSDKServer_SendGameServerUpdateRemovesDisconnectedStream(t *testing.T) { return err == nil }, time.Minute, time.Second, "Could not find the GameServer") - streamCtx, streamCancel := context.WithCancel(context.Background()) - t.Cleanup(streamCancel) - - // Trigger stream removal by sending an update on a cancelled stream. - - stream := newGameServerMockStream() - stream.ctx = streamCtx - - asyncWatchGameServer(t, sc, stream) - assert.Nil(t, waitConnectedStreamCount(sc, 1)) - - <-stream.msgs // Initial msg when WatchGameServer() is called. - - streamCancel() - + // Create and initialize two streams. + streamOne := newGameServerMockStream() + streamOneCtx, streamOneCancel := context.WithCancel(context.Background()) + t.Cleanup(streamOneCancel) + streamOne.ctx = streamOneCtx + asyncWatchGameServer(t, sc, streamOne) + + streamTwo := newGameServerMockStream() + streamTwoCtx, streamTwoCancel := context.WithCancel(context.Background()) + t.Cleanup(streamTwoCancel) + streamTwo.ctx = streamTwoCtx + asyncWatchGameServer(t, sc, streamTwo) + + // Verify that two streams are connected. + assert.Nil(t, waitConnectedStreamCount(sc, 2)) + streamOneCancel() + streamTwoCancel() + + // Trigger stream removal by sending a game server update. sc.sendGameServerUpdate(fixture) - - select { - case <-stream.msgs: - assert.Fail(t, "Event stream should have been removed.") - case <-time.After(1 * time.Second): - } + // Verify that zero streams are connected. + assert.Nil(t, waitConnectedStreamCount(sc, 0)) } func TestSDKServerUpdateEventHandler(t *testing.T) {