Skip to content

Commit

Permalink
Refine errors for HTTP streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 31, 2024
1 parent eb69ad5 commit 15f132a
Showing 1 changed file with 101 additions and 15 deletions.
116 changes: 101 additions & 15 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ package main
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path"
"srs-proxy/errors"
"strconv"
"strings"
"sync"
"time"

"srs-proxy/errors"
"srs-proxy/logger"
)

Expand Down Expand Up @@ -168,7 +170,7 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

if err := v.serve(ctx, w, r); err != nil {
handleErr := func(err error) {
if perr, ok := err.(*RTMPProxyError); ok {
if perr.isBackend {
handleBackendErr(perr.err)
Expand All @@ -178,10 +180,24 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} else {
handleClientErr(err)
}
}

if err := v.serve(ctx, w, r); err != nil {
if merr, ok := err.(*RTMPMultipleError); ok {
// If multiple errors, handle all of them.
for _, err := range merr.errs {
handleErr(err)
}
} else {
// If single error, directly handle it.
handleErr(err)
}

if !v.written {
apiError(ctx, w, r, err)
}
} else {
logger.Df(ctx, "HTTP client done")
}
}

Expand Down Expand Up @@ -218,11 +234,18 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt
}

if err = v.serveByBackend(ctx, w, r, backend, streamURL); err != nil {
wrappedErr := errors.Wrapf(err, "serve %v by backend %+v", originalURL, backend)
extraMsg := fmt.Sprintf("serve %v by backend %+v", originalURL, backend)
if perr, ok := err.(*RTMPProxyError); ok {
return &RTMPProxyError{perr.isBackend, wrappedErr}
return &RTMPProxyError{perr.isBackend, errors.Wrapf(perr.err, extraMsg)}
} else if merr, ok := err.(*RTMPMultipleError); ok {
var errs []error
for _, e := range merr.errs {
errs = append(errs, errors.Wrapf(e, extraMsg))
}
return NewRTMPMultipleError(errs...)
} else {
return errors.Wrapf(err, extraMsg)
}
return wrappedErr
}

return nil
Expand All @@ -241,6 +264,19 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
httpPort = int(iv)
}

// If any goroutine quit, cancel another one.
parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)

go func() {
select {
case <-ctx.Done():
case <-r.Context().Done():
// If client request cancelled, cancel the proxy goroutine.
cancel()
}
}()

// Connect to backend SRS server via HTTP client.
backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, httpPort, r.URL.Path)
req, err := http.NewRequestWithContext(ctx, "GET", backendURL, nil)
Expand All @@ -250,6 +286,14 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite

resp, err := http.DefaultClient.Do(req)
if err != nil {
if urlErr, ok := err.(*url.Error); ok {
if urlErr.Err == io.EOF {
return &RTMPProxyError{true, errors.Errorf("do request to %v EOF", backendURL)}
}
if urlErr.Err == context.Canceled && r.Context().Err() != nil {
return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")}
}
}
return &RTMPProxyError{true, errors.Wrapf(err, "do request to %v", backendURL)}
}
defer resp.Body.Close()
Expand All @@ -267,19 +311,61 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
}

v.written = true
logger.Df(ctx, "HTTP start streaming")

// For all proxy goroutines.
var wg sync.WaitGroup
defer wg.Wait()

// Detect the client closed.
wg.Add(1)
var r0 error
go func() {
defer wg.Done()
defer cancel()

r0 = func() error {
select {
case <-ctx.Done():
return nil
case <-r.Context().Done():
return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")}
}
}()
}()

// Copy all data from backend to client.
buf := make([]byte, 4096)
for {
n, err := resp.Body.Read(buf)
if err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "read stream from %v", backendURL)}
}
wg.Add(1)
var r1 error
go func() {
defer wg.Done()
defer cancel()

if _, err := w.Write(buf[:n]); err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "write stream client")}
}
r1 = func() error {
buf := make([]byte, 4096)
for {
n, err := resp.Body.Read(buf)
if err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "read stream from %v", backendURL)}
}

if _, err := w.Write(buf[:n]); err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "write stream client")}
}
}
}()
}()

// Wait until all goroutine quit.
wg.Wait()

// Reset the error if caused by another goroutine.
if errors.Cause(r0) == context.Canceled && parentCtx.Err() == nil {
r0 = nil
}
if errors.Cause(r1) == context.Canceled && parentCtx.Err() == nil {
r1 = nil
}

return nil
return NewRTMPMultipleError(r0, r1, parentCtx.Err())
}

0 comments on commit 15f132a

Please sign in to comment.