Skip to content

Commit

Permalink
add the Stream interface, pass tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ericxtang committed Mar 28, 2017
1 parent 7c7bd74 commit 433d29b
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 28 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ lpms.HandleRTMPPublish(
return getStreamIDFromPath(reqPath), nil
},
//getStream
func(reqPath string) (*stream.Stream, error) {
func(reqPath string) (stream.Stream, error) {
streamID := getStreamIDFromPath(reqPath)
stream := stream.NewStream(streamID)
streamDB.db[streamID] = stream
Expand Down Expand Up @@ -122,15 +122,15 @@ To handle transcode request:
```
lpms.HandleTranscode(
//getInStream
func(ctx context.Context, streamID string) (*stream.Stream, error) {
func(ctx context.Context, streamID string) (stream.Stream, error) {
if stream := streamDB.db[streamID]; stream != nil {
return stream, nil
}
return nil, stream.ErrNotFound
},
//getOutStream
func(ctx context.Context, streamID string) (*stream.Stream, error) {
func(ctx context.Context, streamID string) (stream.Stream, error) {
//For this example, we'll name the transcoded stream "{streamID}_tran"
newStream := stream.NewStream(streamID + "_tran")
streamDB.db[newStream.StreamID] = newStream
Expand Down
12 changes: 6 additions & 6 deletions cmd/lpms.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type StreamDB struct {
db map[string]*stream.Stream
db map[string]stream.Stream
}

type BufferDB struct {
Expand All @@ -26,7 +26,7 @@ func main() {
flag.Parse()

lpms := lpms.New("1935", "8000", "2435", "7935")
streamDB := &StreamDB{db: make(map[string]*stream.Stream)}
streamDB := &StreamDB{db: make(map[string]stream.Stream)}
bufferDB := &BufferDB{db: make(map[string]*stream.HLSBuffer)}

lpms.HandleRTMPPublish(
Expand All @@ -35,7 +35,7 @@ func main() {
return getStreamIDFromPath(reqPath), nil
},
//getStream
func(reqPath string) (*stream.Stream, error) {
func(reqPath string) (stream.Stream, error) {
streamID := getStreamIDFromPath(reqPath)
stream := stream.NewStream(streamID)
streamDB.db[streamID] = stream
Expand All @@ -51,18 +51,18 @@ func main() {

lpms.HandleTranscode(
//getInStream
func(ctx context.Context, streamID string) (*stream.Stream, error) {
func(ctx context.Context, streamID string) (stream.Stream, error) {
if stream := streamDB.db[streamID]; stream != nil {
return stream, nil
}

return nil, stream.ErrNotFound
},
//getOutStream
func(ctx context.Context, streamID string) (*stream.Stream, error) {
func(ctx context.Context, streamID string) (stream.Stream, error) {
//For this example, we'll name the transcoded stream "{streamID}_tran"
newStream := stream.NewStream(streamID + "_tran")
streamDB.db[newStream.StreamID] = newStream
streamDB.db[newStream.GetStreamID()] = newStream
return newStream, nil
})

Expand Down
10 changes: 5 additions & 5 deletions lpms.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (l *LPMS) Start() error {
//HandleRTMPPublish offload to the video listener
func (l *LPMS) HandleRTMPPublish(
getStreamID func(reqPath string) (string, error),
stream func(reqPath string) (*stream.Stream, error),
stream func(reqPath string) (stream.Stream, error),
endStream func(reqPath string)) error {

return l.vidListen.HandleRTMPPublish(getStreamID, stream, endStream)
Expand All @@ -83,7 +83,7 @@ func (l *LPMS) HandleHLSPlay(getStream func(reqPath string) (*stream.HLSBuffer,

//HandleTranscode kicks off a transcoding process, keeps a local HLS buffer, and returns the new stream ID.
//stream is the video stream you want to be transcoded. getNewStreamID gives you a way to name the transcoded stream.
func (l *LPMS) HandleTranscode(getInStream func(ctx context.Context, streamID string) (*stream.Stream, error), getOutStream func(ctx context.Context, streamID string) (*stream.Stream, error)) {
func (l *LPMS) HandleTranscode(getInStream func(ctx context.Context, streamID string) (stream.Stream, error), getOutStream func(ctx context.Context, streamID string) (stream.Stream, error)) {
http.HandleFunc("/transcode", func(w http.ResponseWriter, r *http.Request) {
ctx, _ := context.WithCancel(context.Background())
// defer cancel()
Expand Down Expand Up @@ -117,12 +117,12 @@ func (l *LPMS) HandleTranscode(getInStream func(ctx context.Context, streamID st
ec := make(chan error, 1)
go func() { ec <- l.doTranscoding(ctx, inStream, newStream) }()

w.Write([]byte("New Stream: " + newStream.StreamID))
w.Write([]byte("New Stream: " + newStream.GetStreamID()))
})
}

func (l *LPMS) doTranscoding(ctx context.Context, inStream *stream.Stream, newStream *stream.Stream) error {
t := transcoder.New(l.srsRTMPPort, l.srsHTTPPort, newStream.StreamID)
func (l *LPMS) doTranscoding(ctx context.Context, inStream stream.Stream, newStream stream.Stream) error {
t := transcoder.New(l.srsRTMPPort, l.srsHTTPPort, newStream.GetStreamID())
//Should kick off a goroutine for this, so we can return the new streamID rightaway.

tranMux, err := t.LocalSRSUploadMux()
Expand Down
1 change: 1 addition & 0 deletions stream/fileStream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package stream
33 changes: 24 additions & 9 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,38 @@ type HLSSegment struct {
Data []byte
}

type Stream struct {
type Stream interface {
GetStreamID() string
Len() int64
// NewStream() Stream
ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error
WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error
WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error
WriteHLSSegmentToStream(seg HLSSegment) error
ReadHLSFromStream(buffer HLSMuxer) error
}

type VideoStream struct {
StreamID string
RTMPTimeout time.Duration
HLSTimeout time.Duration
buffer *streamBuffer
}

func (s *Stream) Len() int64 {
func (s *VideoStream) Len() int64 {
return s.buffer.len()
}

func NewStream(id string) *Stream {
return &Stream{buffer: newStreamBuffer(), StreamID: id}
func NewStream(id string) *VideoStream {
return &VideoStream{buffer: newStreamBuffer(), StreamID: id}
}

func (s *VideoStream) GetStreamID() string {
return s.StreamID
}

//ReadRTMPFromStream reads the content from the RTMP stream out into the dst.
func (s *Stream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error {
func (s *VideoStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error {
defer dst.Close()

//TODO: Make sure to listen to ctx.Done()
Expand Down Expand Up @@ -117,7 +132,7 @@ func (s *Stream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error
}

//WriteRTMPToStream writes a video stream from src into the stream.
func (s *Stream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error {
func (s *VideoStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error {
defer src.Close()

c := make(chan error, 1)
Expand Down Expand Up @@ -165,16 +180,16 @@ func (s *Stream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) erro
}
}

func (s *Stream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error {
func (s *VideoStream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error {
return s.buffer.push(pl)
}

func (s *Stream) WriteHLSSegmentToStream(seg HLSSegment) error {
func (s *VideoStream) WriteHLSSegmentToStream(seg HLSSegment) error {
return s.buffer.push(seg)
}

//ReadHLSFromStream reads an HLS stream into an HLSBuffer
func (s *Stream) ReadHLSFromStream(buffer HLSMuxer) error {
func (s *VideoStream) ReadHLSFromStream(buffer HLSMuxer) error {
for {
item, err := s.buffer.poll(s.HLSTimeout)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions transcoder/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (et *ExternalTranscoder) LocalSRSUploadMux() (av.MuxCloser, error) {

//StartUpload takes a io.Stream of RTMP stream, and loads it into a local RTMP endpoint. The streamID will be used as the streaming endpoint.
//So if you want to create a new stream, make sure to do that before passing in the stream.
func (et *ExternalTranscoder) StartUpload(ctx context.Context, rtmpMux av.MuxCloser, src *stream.Stream) error {
func (et *ExternalTranscoder) StartUpload(ctx context.Context, rtmpMux av.MuxCloser, src stream.Stream) error {
upErrC := make(chan error, 1)

go func() { upErrC <- src.ReadRTMPFromStream(ctx, rtmpMux) }()
Expand All @@ -68,7 +68,7 @@ func (et *ExternalTranscoder) StartUpload(ctx context.Context, rtmpMux av.MuxClo
}

//StartDownload pushes hls playlists and segments into the stream as they become available from the transcoder.
func (et *ExternalTranscoder) StartDownload(ctx context.Context, hlsMux *stream.Stream) error {
func (et *ExternalTranscoder) StartDownload(ctx context.Context, hlsMux stream.Stream) error {
pc := make(chan *m3u8.MediaPlaylist)
sc := make(chan *stream.HLSSegment)
ec := make(chan error)
Expand Down
2 changes: 1 addition & 1 deletion vidlistener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type VidListener struct {
//user can name the stream, and getStream so the user can keep track of all the streams.
func (s *VidListener) HandleRTMPPublish(
getStreamID func(reqPath string) (string, error),
getStream func(reqPath string) (*stream.Stream, error),
getStream func(reqPath string) (stream.Stream, error),
endStream func(reqPath string)) error {

s.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
Expand Down
4 changes: 2 additions & 2 deletions vidlistener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func TestError(t *testing.T) {
func(reqPath string) (string, error) {
return "test", nil
},
func(reqPath string) (*stream.Stream, error) {
func(reqPath string) (stream.Stream, error) {
// return errors.New("Some Error")
return &stream.Stream{}, nil
return &stream.VideoStream{}, nil
},
func(reqPath string) {})

Expand Down

0 comments on commit 433d29b

Please sign in to comment.