Skip to content

Commit

Permalink
Fix the handling of removing disconnected streams to avoid a panic (g…
Browse files Browse the repository at this point in the history
…oogleforgames#3668)

when multiple streams disconnect.
  • Loading branch information
roberthbailey authored Feb 21, 2024
1 parent 029700b commit 4b0f725
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 29 deletions.
24 changes: 16 additions & 8 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,27 +1243,35 @@ func (s *SDKServer) sendGameServerUpdate(gs *agonesv1.GameServer) {
s.streamMutex.Lock()
defer s.streamMutex.Unlock()

for i, stream := range s.connectedStreams {
// Filter the slice of streams sharing the same backing array and capacity as the original
// so that storage is reused and no memory allocations are made. This modifies the original
// slice.
//
// See https://go.dev/wiki/SliceTricks#filtering-without-allocating
remainingStreams := s.connectedStreams[:0]
for _, stream := range s.connectedStreams {
select {
case <-stream.Context().Done():
s.connectedStreams = append(s.connectedStreams[:i], s.connectedStreams[i+1:]...)
s.logger.Debug("Dropping stream")

err := stream.Context().Err()
switch {
case err != nil:
s.logger.WithError(errors.WithStack(err)).Error("stream closed with error")
default:
s.logger.Debug("stream closed")
s.logger.Debug("Stream closed")
}
continue
default:
}
s.logger.Debug("Keeping stream")
remainingStreams = append(remainingStreams, stream)

if err := stream.Send(convert(gs)); err != nil {
s.logger.WithError(errors.WithStack(err)).
Error("error sending game server update event")
if err := stream.Send(convert(gs)); err != nil {
s.logger.WithError(errors.WithStack(err)).
Error("error sending game server update event")
}
}
}
s.connectedStreams = remainingStreams

if gs.Status.State == agonesv1.GameServerStateShutdown {
// Wrap this in a go func(), just in case pushing to this channel deadlocks since there is only one instance of
Expand Down
42 changes: 21 additions & 21 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,28 +833,28 @@ func TestSDKServer_SendGameServerUpdateRemovesDisconnectedStream(t *testing.T) {
return err == nil
}, time.Minute, time.Second, "Could not find the GameServer")

streamCtx, streamCancel := context.WithCancel(context.Background())
t.Cleanup(streamCancel)

// Trigger stream removal by sending an update on a cancelled stream.

stream := newGameServerMockStream()
stream.ctx = streamCtx

asyncWatchGameServer(t, sc, stream)
assert.Nil(t, waitConnectedStreamCount(sc, 1))

<-stream.msgs // Initial msg when WatchGameServer() is called.

streamCancel()

// Create and initialize two streams.
streamOne := newGameServerMockStream()
streamOneCtx, streamOneCancel := context.WithCancel(context.Background())
t.Cleanup(streamOneCancel)
streamOne.ctx = streamOneCtx
asyncWatchGameServer(t, sc, streamOne)

streamTwo := newGameServerMockStream()
streamTwoCtx, streamTwoCancel := context.WithCancel(context.Background())
t.Cleanup(streamTwoCancel)
streamTwo.ctx = streamTwoCtx
asyncWatchGameServer(t, sc, streamTwo)

// Verify that two streams are connected.
assert.Nil(t, waitConnectedStreamCount(sc, 2))
streamOneCancel()
streamTwoCancel()

// Trigger stream removal by sending a game server update.
sc.sendGameServerUpdate(fixture)

select {
case <-stream.msgs:
assert.Fail(t, "Event stream should have been removed.")
case <-time.After(1 * time.Second):
}
// Verify that zero streams are connected.
assert.Nil(t, waitConnectedStreamCount(sc, 0))
}

func TestSDKServerUpdateEventHandler(t *testing.T) {
Expand Down

0 comments on commit 4b0f725

Please sign in to comment.