Skip to content

Commit

Permalink
Refine RTMP streaming error.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 3, 2024
1 parent 746d432 commit 63e2e8f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 129 deletions.
134 changes: 20 additions & 114 deletions proxy/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,57 +86,19 @@ func (v *rtmpServer) Run(ctx context.Context) error {
defer v.wg.Done()
defer conn.Close()

var backendClosedErr, clientClosedErr bool

handleBackendErr := func(err error) {
if isPeerClosedError(err) {
if !backendClosedErr {
backendClosedErr = true
logger.Df(ctx, "RTMP backend peer closed")
}
} else {
logger.Wf(ctx, "RTMP backend err %+v", err)
}
}

handleClientErr := func(err error) {
if isPeerClosedError(err) {
if !clientClosedErr {
clientClosedErr = true
logger.Df(ctx, "RTMP client peer closed")
}
} else {
logger.Wf(ctx, "RTMP client %v err %+v", conn.RemoteAddr(), err)
}
}

handleErr := func(err error) {
if perr, ok := err.(*RTMPProxyError); ok {
// For proxy error, maybe caused by proxy or client.
if perr.isBackend {
handleBackendErr(perr.err)
} else {
handleClientErr(perr.err)
}
if isPeerClosedError(err) {
logger.Df(ctx, "RTMP peer is closed")
} else {
// Default as client error.
handleClientErr(err)
logger.Wf(ctx, "RTMP serve err %+v", err)
}
}

rc := NewRTMPConnection(func(client *RTMPConnection) {
client.rd = v.rd
})
if err := rc.serve(ctx, conn); err != nil {
if merr, ok := err.(*RTMPMultipleError); ok {
// If multiple errors, handle all of them.
for _, err := range merr.errs {
handleErr(err)
}
} else {
// If single error, directly handle it.
handleErr(err)
}
handleErr(err)
} else {
logger.Df(ctx, "RTMP client done")
}
Expand All @@ -147,60 +109,6 @@ func (v *rtmpServer) Run(ctx context.Context) error {
return nil
}

type RTMPMultipleError struct {
// The caused errors.
errs []error
}

// NewRTMPMultipleError ignore nil errors. If no error, return nil.
func NewRTMPMultipleError(errs ...error) error {
var nerrs []error
for _, err := range errs {
if errors.Cause(err) != nil {
nerrs = append(nerrs, err)
}
}

if len(nerrs) == 0 {
return nil
}

return &RTMPMultipleError{errs: nerrs}
}

func (v *RTMPMultipleError) Error() string {
var b strings.Builder
for i, err := range v.errs {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(err.Error())
}
return b.String()
}

func (v *RTMPMultipleError) Cause() error {
if len(v.errs) == 0 {
return nil
}
return v.errs[0]
}

type RTMPProxyError struct {
// Whether error is caused by backend.
isBackend bool
// The caused error.
err error
}

func (v *RTMPProxyError) Error() string {
return v.err.Error()
}

func (v *RTMPProxyError) Cause() error {
return v.err
}

type RTMPConnection struct {
// The random number generator.
rd *rand.Rand
Expand All @@ -217,13 +125,15 @@ func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection {
func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
logger.Df(ctx, "Got RTMP client from %v", conn.RemoteAddr())

// Close the connection when ctx done.
// If any goroutine quit, cancel another one.
parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var backend *RTMPClientToBackend
if true {
connDoneCtx, connDoneCancel := context.WithCancel(ctx)
defer connDoneCancel()
go func() {
<-connDoneCtx.Done()
<-ctx.Done()
conn.Close()
if backend != nil {
backend.Close()
Expand Down Expand Up @@ -380,7 +290,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
defer backend.Close()

if err := backend.Connect(ctx, tcUrl, streamName); err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "connect backend, tcUrl=%v, stream=%v", tcUrl, streamName)}
return errors.Wrapf(err, "connect backend, tcUrl=%v, stream=%v", tcUrl, streamName)
}

// Start the streaming.
Expand Down Expand Up @@ -424,10 +334,6 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
var wg sync.WaitGroup
defer wg.Wait()

// If any goroutine quit, cancel another one.
parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)

// Proxy all message from backend to client.
wg.Add(1)
var r0 error
Expand All @@ -439,13 +345,13 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
for {
m, err := backend.client.ReadMessage(ctx)
if err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "read message")}
return errors.Wrapf(err, "read message")
}
//logger.Df(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))

// TODO: Update the stream ID if not the same.
if err := client.WriteMessage(ctx, m); err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "write message")}
return errors.Wrapf(err, "write message")
}
}
}()
Expand All @@ -462,13 +368,13 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
for {
m, err := client.ReadMessage(ctx)
if err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "read message")}
return errors.Wrapf(err, "read message")
}
//logger.Df(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))

// TODO: Update the stream ID if not the same.
if err := backend.client.WriteMessage(ctx, m); err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "write message")}
return errors.Wrapf(err, "write message")
}
}
}()
Expand All @@ -478,14 +384,14 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
wg.Wait()

// Reset the error if caused by another goroutine.
if errors.Cause(r0) == context.Canceled && parentCtx.Err() == nil {
r0 = nil
if r0 != nil {
return errors.Wrapf(r0, "proxy backend->client")
}
if errors.Cause(r1) == context.Canceled && parentCtx.Err() == nil {
r1 = nil
if r1 != nil {
return errors.Wrapf(r1, "proxy client->backend")
}

return NewRTMPMultipleError(r0, r1, parentCtx.Err())
return parentCtx.Err()
}

type RTMPClientType string
Expand Down
15 changes: 0 additions & 15 deletions proxy/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,3 @@ func convertURLToStreamURL(r *http.Request) (unifiedURL, fullURL string) {
fullURL = fmt.Sprintf("%v%v", unifiedURL, streamExt)
return
}

// wrapProxyError extract and wrap the proxy and multiple errors with extraMsg.
func wrapProxyError(err error, extraMsg string) error {
if perr, ok := err.(*RTMPProxyError); ok {
return &RTMPProxyError{perr.isBackend, errors.Wrapf(perr.err, extraMsg)}
} else if merr, ok := err.(*RTMPMultipleError); ok {
var errs []error
for _, e := range merr.errs {
errs = append(errs, errors.Wrapf(e, extraMsg))
}
return NewRTMPMultipleError(errs...)
} else {
return errors.Wrapf(err, extraMsg)
}
}

0 comments on commit 63e2e8f

Please sign in to comment.