Skip to content

Commit 3fe1534

Browse files
committed
fix graceful shutdown, add missing srt init/cleanup calls
1 parent 93ab9a1 commit 3fe1534

File tree

3 files changed

+35
-6
lines changed

3 files changed

+35
-6
lines changed

Diff for: api/server.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"log"
77
"net/http"
8+
"sync"
89
"time"
910

1011
"github.com/voc/srtrelay/config"
@@ -15,6 +16,7 @@ import (
1516
type Server struct {
1617
conf config.APIConfig
1718
srtServer srt.Server
19+
done sync.WaitGroup
1820
}
1921

2022
func NewServer(conf config.APIConfig, srtServer srt.Server) *Server {
@@ -36,10 +38,17 @@ func (s *Server) Listen(ctx context.Context) error {
3638
MaxHeaderBytes: 1 << 14,
3739
}
3840

41+
s.done.Add(1)
3942
go func() {
40-
log.Fatal(serv.ListenAndServe())
43+
defer s.done.Done()
44+
err := serv.ListenAndServe()
45+
if err != nil {
46+
log.Println(err)
47+
}
4148
}()
49+
s.done.Add(1)
4250
go func() {
51+
defer s.done.Done()
4352
<-ctx.Done()
4453
ctx2, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
4554
defer cancel()
@@ -49,6 +58,11 @@ func (s *Server) Listen(ctx context.Context) error {
4958
return nil
5059
}
5160

61+
// Wait blocks until listening sockets have been closed
62+
func (s *Server) Wait() {
63+
s.done.Wait()
64+
}
65+
5266
func (s *Server) HandleStreams(w http.ResponseWriter, r *http.Request) {
5367
w.Header().Set("Content-Type", "application/json")
5468
stats := s.srtServer.GetStatistics()

Diff for: main.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"os"
99
"os/signal"
1010
"syscall"
11-
"time"
1211

12+
"github.com/haivision/srtgo"
1313
"github.com/voc/srtrelay/api"
1414
"github.com/voc/srtrelay/config"
1515
"github.com/voc/srtrelay/relay"
@@ -86,17 +86,18 @@ func main() {
8686
// setup graceful shutdown
8787
ctx, cancel := context.WithCancel(context.Background())
8888
handleSignal(ctx, cancel)
89-
defer cancel()
9089

9190
// create server
91+
srtgo.InitSRT()
9292
srtServer := srt.NewServer(&serverConfig)
9393
err = srtServer.Listen(ctx)
9494
if err != nil {
9595
log.Fatal(err)
9696
}
9797

98+
var apiServer *api.Server
9899
if conf.API.Enabled {
99-
apiServer := api.NewServer(conf.API, srtServer)
100+
apiServer = api.NewServer(conf.API, srtServer)
100101
err := apiServer.Listen(ctx)
101102
if err != nil {
102103
log.Fatal(err)
@@ -105,6 +106,9 @@ func main() {
105106
}
106107

107108
// Wait for graceful shutdown
108-
<-ctx.Done()
109-
time.Sleep(200 * time.Millisecond)
109+
srtServer.Wait()
110+
if apiServer != nil {
111+
apiServer.Wait()
112+
}
113+
srtgo.CleanupSRT()
110114
}

Diff for: srt/server.go

+11
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type ServerConfig struct {
4141
// Server is an interface for a srt relay server
4242
type Server interface {
4343
Listen(context.Context) error
44+
Wait()
4445
Handle(context.Context, *srtgo.SrtSocket, *net.UDPAddr)
4546
GetStatistics() []*relay.StreamStatistics
4647
GetSocketStatistics() []*SocketStatistics
@@ -53,6 +54,7 @@ type ServerImpl struct {
5354

5455
mutex sync.Mutex
5556
conns map[*srtConn]bool
57+
done sync.WaitGroup
5658
}
5759

5860
// NewServer creates a server
@@ -97,6 +99,11 @@ func (s *ServerImpl) Listen(ctx context.Context) error {
9799
return nil
98100
}
99101

102+
// Wait blocks until listening sockets have been closed
103+
func (s *ServerImpl) Wait() {
104+
s.done.Wait()
105+
}
106+
100107
func (s *ServerImpl) listenCallback(socket *srtgo.SrtSocket, version int, addr *net.UDPAddr, idstring string) bool {
101108
var streamid stream.StreamID
102109

@@ -130,12 +137,16 @@ func (s *ServerImpl) listenAt(ctx context.Context, host string, port uint16) err
130137
return fmt.Errorf("Listen failed: %v", err)
131138
}
132139

140+
s.done.Add(1)
133141
go func() {
142+
defer s.done.Done()
134143
<-ctx.Done()
135144
sck.Close()
136145
}()
137146

147+
s.done.Add(1)
138148
go func() {
149+
defer s.done.Done()
139150
for {
140151
sock, addr, err := sck.Accept()
141152
if err != nil {

0 commit comments

Comments
 (0)