Skip to content

Commit

Permalink
Refine names.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 9, 2024
1 parent 62c280c commit 79161b9
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 47 deletions.
17 changes: 11 additions & 6 deletions proxy/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,28 @@ import (
"srs-proxy/logger"
)

type httpAPI struct {
// srsHTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
// to proxy other HTTP API of SRS like the streams and clients, etc.
type srsHTTPAPIServer struct {
// The underlayer HTTP server.
server *http.Server
// The WebRTC server.
rtc *rtcServer
rtc *srsWebRTCServer
// The gracefully quit timeout, wait server to quit.
gracefulQuitTimeout time.Duration
// The wait group for all goroutines.
wg sync.WaitGroup
}

func NewHttpAPI(opts ...func(*httpAPI)) *httpAPI {
v := &httpAPI{}
func NewSRSHTTPAPIServer(opts ...func(*srsHTTPAPIServer)) *srsHTTPAPIServer {
v := &srsHTTPAPIServer{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *httpAPI) Close() error {
func (v *srsHTTPAPIServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)
Expand All @@ -44,7 +46,7 @@ func (v *httpAPI) Close() error {
return nil
}

func (v *httpAPI) Run(ctx context.Context) error {
func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
// Parse address to listen.
addr := envHttpAPI()
if !strings.Contains(addr, ":") {
Expand Down Expand Up @@ -111,6 +113,9 @@ func (v *httpAPI) Run(ctx context.Context) error {
return nil
}

// systemAPI is the system HTTP API of the proxy server, for SRS media server to register the service
// to proxy server. It also provides some other system APIs like the status of proxy server, like exporter
// for Prometheus metrics.
type systemAPI struct {
// The underlayer HTTP server.
server *http.Server
Expand Down
13 changes: 8 additions & 5 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"srs-proxy/logger"
)

type httpServer struct {
// srsHTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS,
// HLS, etc. The proxy server will figure out which SRS origin server to proxy to, then proxy
// the request to the origin server.
type srsHTTPStreamServer struct {
// The underlayer HTTP server.
server *http.Server
// The gracefully quit timeout, wait server to quit.
Expand All @@ -28,15 +31,15 @@ type httpServer struct {
wg stdSync.WaitGroup
}

func NewHttpServer(opts ...func(*httpServer)) *httpServer {
v := &httpServer{}
func NewSRSHTTPStreamServer(opts ...func(*srsHTTPStreamServer)) *srsHTTPStreamServer {
v := &srsHTTPStreamServer{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *httpServer) Close() error {
func (v *srsHTTPStreamServer) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
defer cancel()
v.server.Shutdown(ctx)
Expand All @@ -45,7 +48,7 @@ func (v *httpServer) Close() error {
return nil
}

func (v *httpServer) Run(ctx context.Context) error {
func (v *srsHTTPStreamServer) Run(ctx context.Context) error {
// Parse address to listen.
addr := envHttpServer()
if !strings.Contains(addr, ":") {
Expand Down
32 changes: 16 additions & 16 deletions proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,32 @@ func doMain(ctx context.Context) error {
}

// Start the RTMP server.
rtmpServer := NewRtmpServer()
defer rtmpServer.Close()
if err := rtmpServer.Run(ctx); err != nil {
srsRTMPServer := NewSRSRTMPServer()
defer srsRTMPServer.Close()
if err := srsRTMPServer.Run(ctx); err != nil {
return errors.Wrapf(err, "rtmp server")
}

// Start the WebRTC server.
rtcServer := newRTCServer()
defer rtcServer.Close()
if err := rtcServer.Run(ctx); err != nil {
srsWebRTCServer := NewSRSWebRTCServer()
defer srsWebRTCServer.Close()
if err := srsWebRTCServer.Run(ctx); err != nil {
return errors.Wrapf(err, "rtc server")
}

// Start the HTTP API server.
httpAPI := NewHttpAPI(func(server *httpAPI) {
server.gracefulQuitTimeout, server.rtc = gracefulQuitTimeout, rtcServer
srsHTTPAPIServer := NewSRSHTTPAPIServer(func(server *srsHTTPAPIServer) {
server.gracefulQuitTimeout, server.rtc = gracefulQuitTimeout, srsWebRTCServer
})
defer httpAPI.Close()
if err := httpAPI.Run(ctx); err != nil {
defer srsHTTPAPIServer.Close()
if err := srsHTTPAPIServer.Run(ctx); err != nil {
return errors.Wrapf(err, "http api server")
}

// Start the SRT server.
srtServer := newSRTServer()
defer srtServer.Close()
if err := srtServer.Run(ctx); err != nil {
srsSRTServer := NewSRSSRTServer()
defer srsSRTServer.Close()
if err := srsSRTServer.Run(ctx); err != nil {
return errors.Wrapf(err, "srt server")
}

Expand All @@ -107,11 +107,11 @@ func doMain(ctx context.Context) error {
}

// Start the HTTP web server.
httpServer := NewHttpServer(func(server *httpServer) {
srsHTTPStreamServer := NewSRSHTTPStreamServer(func(server *srsHTTPStreamServer) {
server.gracefulQuitTimeout = gracefulQuitTimeout
})
defer httpServer.Close()
if err := httpServer.Run(ctx); err != nil {
defer srsHTTPStreamServer.Close()
if err := srsHTTPStreamServer.Run(ctx); err != nil {
return errors.Wrapf(err, "http server")
}

Expand Down
21 changes: 12 additions & 9 deletions proxy/rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"srs-proxy/sync"
)

type rtcServer struct {
// srsWebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out
// which backend server to proxy to. It will also replace the UDP port to the proxy server's in the
// SDP answer.
type srsWebRTCServer struct {
// The UDP listener for WebRTC server.
listener *net.UDPConn

Expand All @@ -35,15 +38,15 @@ type rtcServer struct {
wg stdSync.WaitGroup
}

func newRTCServer(opts ...func(*rtcServer)) *rtcServer {
v := &rtcServer{}
func NewSRSWebRTCServer(opts ...func(*srsWebRTCServer)) *srsWebRTCServer {
v := &srsWebRTCServer{}
for _, opt := range opts {
opt(v)
}
return v
}

func (v *rtcServer) Close() error {
func (v *srsWebRTCServer) Close() error {
if v.listener != nil {
_ = v.listener.Close()
}
Expand All @@ -52,7 +55,7 @@ func (v *rtcServer) Close() error {
return nil
}

func (v *rtcServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
ctx = logger.WithContext(ctx)

Expand Down Expand Up @@ -89,7 +92,7 @@ func (v *rtcServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter,
return nil
}

func (v *rtcServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
ctx = logger.WithContext(ctx)

Expand Down Expand Up @@ -126,7 +129,7 @@ func (v *rtcServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter,
return nil
}

func (v *rtcServer) proxyApiToBackend(
func (v *srsWebRTCServer) proxyApiToBackend(
ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer,
remoteSDPOffer string, streamURL string,
) error {
Expand Down Expand Up @@ -226,7 +229,7 @@ func (v *rtcServer) proxyApiToBackend(
return nil
}

func (v *rtcServer) Run(ctx context.Context) error {
func (v *srsWebRTCServer) Run(ctx context.Context) error {
// Parse address to listen.
endpoint := envWebRTCServer()
if !strings.Contains(endpoint, ":") {
Expand Down Expand Up @@ -268,7 +271,7 @@ func (v *rtcServer) Run(ctx context.Context) error {
return nil
}

func (v *rtcServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
var connection *RTCConnection

// If STUN binding request, parse the ufrag and identify the connection.
Expand Down
13 changes: 8 additions & 5 deletions proxy/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"srs-proxy/rtmp"
)

type rtmpServer struct {
// srsRTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS
// server. It will figure out the backend server to proxy to. Unlike the edge server, it will
// not cache the stream, but just proxy the stream to backend.
type srsRTMPServer struct {
// The TCP listener for RTMP server.
listener *net.TCPListener
// The random number generator.
Expand All @@ -27,8 +30,8 @@ type rtmpServer struct {
wg sync.WaitGroup
}

func NewRtmpServer(opts ...func(*rtmpServer)) *rtmpServer {
v := &rtmpServer{
func NewSRSRTMPServer(opts ...func(*srsRTMPServer)) *srsRTMPServer {
v := &srsRTMPServer{
rd: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for _, opt := range opts {
Expand All @@ -37,7 +40,7 @@ func NewRtmpServer(opts ...func(*rtmpServer)) *rtmpServer {
return v
}

func (v *rtmpServer) Close() error {
func (v *srsRTMPServer) Close() error {
if v.listener != nil {
v.listener.Close()
}
Expand All @@ -46,7 +49,7 @@ func (v *rtmpServer) Close() error {
return nil
}

func (v *rtmpServer) Run(ctx context.Context) error {
func (v *srsRTMPServer) Run(ctx context.Context) error {
endpoint := envRtmpServer()
if !strings.Contains(endpoint, ":") {
endpoint = ":" + endpoint
Expand Down
15 changes: 9 additions & 6 deletions proxy/srt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"srs-proxy/sync"
)

type srtServer struct {
// srsSRTServer is the proxy for SRS server via SRT. It will figure out which backend server to
// proxy to. It only parses the SRT handshake messages, parses the stream id, and proxy to the
// backend server.
type srsSRTServer struct {
// The UDP listener for SRT server.
listener *net.UDPConn

Expand All @@ -31,8 +34,8 @@ type srtServer struct {
wg stdSync.WaitGroup
}

func newSRTServer(opts ...func(*srtServer)) *srtServer {
v := &srtServer{
func NewSRSSRTServer(opts ...func(*srsSRTServer)) *srsSRTServer {
v := &srsSRTServer{
start: time.Now(),
}

Expand All @@ -42,7 +45,7 @@ func newSRTServer(opts ...func(*srtServer)) *srtServer {
return v
}

func (v *srtServer) Close() error {
func (v *srsSRTServer) Close() error {
if v.listener != nil {
v.listener.Close()
}
Expand All @@ -51,7 +54,7 @@ func (v *srtServer) Close() error {
return nil
}

func (v *srtServer) Run(ctx context.Context) error {
func (v *srsSRTServer) Run(ctx context.Context) error {
// Parse address to listen.
endpoint := envSRTServer()
if !strings.Contains(endpoint, ":") {
Expand Down Expand Up @@ -93,7 +96,7 @@ func (v *srtServer) Run(ctx context.Context) error {
return nil
}

func (v *srtServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
func (v *srsSRTServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
socketID := srtParseSocketID(data)

var pkt *SRTHandshakePacket
Expand Down

0 comments on commit 79161b9

Please sign in to comment.