Skip to content

Commit

Permalink
Merge pull request livepeer#22 from livepeer/videoPlayerFix
Browse files Browse the repository at this point in the history
Video player fix
  • Loading branch information
ericxtang authored Jun 12, 2017
2 parents 97da3db + 851c83f commit e97eb09
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 36 deletions.
11 changes: 6 additions & 5 deletions cmd/rtmp_segment_packer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"io"
"net/url"
"strings"

"time"
Expand Down Expand Up @@ -265,17 +266,17 @@ func main() {
flag.Set("logtostderr", "true")
flag.Parse()

lpms := lpms.New("1935", "8000", "2435", "7935", "")
lpms := lpms.New("1935", "8000", "", "")
streamDB := &StreamDB{db: make(map[string]stream.Stream)}

lpms.HandleRTMPPublish(
//getStreamID
func(reqPath string) (string, error) {
return getStreamIDFromPath(reqPath), nil
func(url *url.URL) (string, error) {
return getStreamIDFromPath(url.Path), nil
},
//getStream
func(reqPath string) (stream.Stream, stream.Stream, error) {
streamID := getStreamIDFromPath(reqPath)
func(url *url.URL) (stream.Stream, stream.Stream, error) {
streamID := getStreamIDFromPath(url.Path)
stream1 := NewSegmentStream(streamID)
stream2 := NewSegmentStream(streamID)
streamDB.db[streamID] = stream1
Expand Down
11 changes: 6 additions & 5 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -26,18 +27,18 @@ func main() {
flag.Set("logtostderr", "true")
flag.Parse()

lpms := lpms.New("1935", "8000", "2435", "7935", "")
lpms := lpms.New("1935", "8000", "", "")
streamDB := &StreamDB{db: make(map[string]stream.Stream)}
bufferDB := &BufferDB{db: make(map[string]*stream.HLSBuffer)}

lpms.HandleRTMPPublish(
//getStreamID
func(reqPath string) (string, error) {
return getStreamIDFromPath(reqPath), nil
func(url *url.URL) (string, error) {
return getStreamIDFromPath(url.Path), nil
},
//getStream
func(reqPath string) (stream.Stream, stream.Stream, error) {
rtmpStreamID := getStreamIDFromPath(reqPath)
func(url *url.URL) (stream.Stream, stream.Stream, error) {
rtmpStreamID := getStreamIDFromPath(url.Path)
hlsStreamID := rtmpStreamID + "_hls"
rtmpStream := stream.NewVideoStream(rtmpStreamID, stream.RTMP)
hlsStream := stream.NewVideoStream(hlsStreamID, stream.HLS)
Expand Down
11 changes: 6 additions & 5 deletions lpms.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"net/http"
"net/url"

"github.com/golang/glog"
"github.com/livepeer/lpms/stream"
Expand Down Expand Up @@ -37,11 +38,11 @@ type transcodeReq struct {
}

//New creates a new LPMS server object. It really just brokers everything to the components.
func New(rtmpPort string, httpPort string, srsRTMPPort string, srsHTTPPort string, ffmpegPath string) *LPMS {
func New(rtmpPort, httpPort, ffmpegPath, vodPath string) *LPMS {
server := &joy4rtmp.Server{Addr: (":" + rtmpPort)}
player := &vidplayer.VidPlayer{RtmpServer: server}
player := &vidplayer.VidPlayer{RtmpServer: server, VodPath: vodPath}
listener := &vidlistener.VidListener{RtmpServer: server, FfmpegPath: ffmpegPath}
return &LPMS{rtmpServer: server, vidPlayer: player, vidListen: listener, srsRTMPPort: srsRTMPPort, srsHTTPPort: srsHTTPPort, httpPort: httpPort, ffmpegPath: ffmpegPath}
return &LPMS{rtmpServer: server, vidPlayer: player, vidListen: listener, httpPort: httpPort, ffmpegPath: ffmpegPath}
}

//Start starts the rtmp and http server
Expand All @@ -65,8 +66,8 @@ func (l *LPMS) Start() error {

//HandleRTMPPublish offload to the video listener
func (l *LPMS) HandleRTMPPublish(
getStreamID func(reqPath string) (string, error),
getStream func(reqPath string) (stream.Stream, stream.Stream, error),
getStreamID func(url *url.URL) (string, error),
getStream func(url *url.URL) (stream.Stream, stream.Stream, error),
endStream func(rtmpStrmID string, hlsStrmID string)) error {

return l.vidListen.HandleRTMPPublish(getStreamID, getStream, endStream)
Expand Down
19 changes: 16 additions & 3 deletions segmenter/video_segmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"time"

Expand Down Expand Up @@ -64,7 +65,7 @@ func NewFFMpegVideoSegmenter(workDir string, strmID string, localRtmpUrl string,
}

//RTMPToHLS invokes the FFMpeg command to do the segmenting. This method blocks unless killed.
func (s *FFMpegVideoSegmenter) RTMPToHLS(ctx context.Context, opt SegmenterOptions) error {
func (s *FFMpegVideoSegmenter) RTMPToHLS(ctx context.Context, opt SegmenterOptions, cleanup bool) error {
//Set up local workdir
if _, err := os.Stat(s.WorkDir); os.IsNotExist(err) {
err := os.Mkdir(s.WorkDir, 0700)
Expand All @@ -91,7 +92,7 @@ func (s *FFMpegVideoSegmenter) RTMPToHLS(ctx context.Context, opt SegmenterOptio

var cmd *exec.Cmd

cmd = exec.Command(path.Join(s.ffmpegPath, "ffmpeg"), "-i", s.LocalRtmpUrl, "-vcodec", "copy", "-acodec", "copy", "-bsf:v", "h264_mp4toannexb", "-f", "segment", "-muxdelay", "0", "-segment_list", plfn, tsfn)
cmd = exec.Command(path.Join(s.ffmpegPath, "ffmpeg"), "-i", s.LocalRtmpUrl, "-vcodec", "copy", "-acodec", "copy", "-bsf:v", "h264_mp4toannexb", "-f", "segment", "-segment_time", "8", "-muxdelay", "0", "-segment_list", plfn, tsfn)

err = cmd.Start()
if err != nil {
Expand All @@ -109,6 +110,10 @@ func (s *FFMpegVideoSegmenter) RTMPToHLS(ctx context.Context, opt SegmenterOptio
case <-ctx.Done():
//Can't close RTMP server, joy4 doesn't support it.
//server.Stop()
glog.Infof("VideoSegmenter stopped for %v", s.StrmID)
if cleanup {
s.Cleanup()
}
cmd.Process.Kill()
return ctx.Err()
}
Expand All @@ -129,7 +134,8 @@ func (s *FFMpegVideoSegmenter) PollSegment(ctx context.Context) (*VideoSegment,

for i := 0; i < PlaylistRetryCount; i++ {
pl, _ := m3u8.NewMediaPlaylist(uint(s.curSegment+1), uint(s.curSegment+1))
pl.DecodeFrom(bytes.NewReader(readPlaylist(plfn)), true)
content := readPlaylist(plfn)
pl.DecodeFrom(bytes.NewReader(content), true)
for _, plSeg := range pl.Segments {
if plSeg.URI == name {
length, err = time.ParseDuration(fmt.Sprintf("%vs", plSeg.Duration))
Expand Down Expand Up @@ -260,3 +266,10 @@ func (s *FFMpegVideoSegmenter) pollSegment(ctx context.Context, curFn string, ne
s.curSegWaitTime = s.curSegWaitTime + sleepTime
}
}

func (s *FFMpegVideoSegmenter) Cleanup() {
files, _ := filepath.Glob(path.Join(s.WorkDir, s.StrmID) + "*")
for _, fn := range files {
os.Remove(fn)
}
}
19 changes: 16 additions & 3 deletions segmenter/video_segmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestSegmenter(t *testing.T) {
defer cancel()

//Kick off FFMpeg to create segments
go func() { se <- func() error { return vs.RTMPToHLS(ctx, opt) }() }()
go func() { se <- func() error { return vs.RTMPToHLS(ctx, opt, false) }() }()
select {
case err := <-se:
if err != context.DeadlineExceeded {
Expand Down Expand Up @@ -128,7 +128,8 @@ func TestSegmenter(t *testing.T) {
t.Errorf("Expecting HLS segment, got %v", seg.Format)
}

if seg.Length != time.Second*2 {
timeDiff := seg.Length - time.Second*2
if timeDiff > time.Millisecond*500 || timeDiff < -time.Millisecond*500 {
t.Errorf("Expecting 2 sec segments, got %v", seg.Length)
}

Expand Down Expand Up @@ -253,8 +254,19 @@ func TestPollSegTimeout(t *testing.T) {
os.RemoveAll(workDir)
os.Mkdir(workDir, 0700)

newPl := `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-ALLOW-CACHE:YES
#EXT-X-TARGETDURATION:7
#EXTINF:2.066000,
test_0.ts
#EXTINF:2.066000,
test_1.ts
`
err := ioutil.WriteFile(workDir+"/test.m3u8", []byte(newPl), 0755)
newSeg := `some random data`
err := ioutil.WriteFile(workDir+"/test_0.ts", []byte(newSeg), 0755)
err = ioutil.WriteFile(workDir+"/test_0.ts", []byte(newSeg), 0755)
err = ioutil.WriteFile(workDir+"/test_1.ts", []byte(newSeg), 0755)
if err != nil {
t.Errorf("Error writing playlist: %v", err)
Expand All @@ -272,4 +284,5 @@ func TestPollSegTimeout(t *testing.T) {
t.Errorf("Expecting timeout, got %v", err)
}

os.RemoveAll(workDir)
}
20 changes: 18 additions & 2 deletions stream/stream_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ func (s *StreamSubscriber) UnsubscribeHLS(muxID string) error {
return nil
}

func (s *StreamSubscriber) UnsubscribeAll() error {
if s.hlsSubscribers != nil {
for k := range s.hlsSubscribers {
delete(s.hlsSubscribers, k)
}
}

if s.rtmpSubscribers != nil {
for k := range s.rtmpSubscribers {
delete(s.rtmpSubscribers, k)
}
}

return nil
}

func (s *StreamSubscriber) StartHLSWorker(ctx context.Context, segWaitTime time.Duration) error {
lastSegTimer := time.Now()
for {
Expand Down Expand Up @@ -170,15 +186,15 @@ func (s *StreamSubscriber) GetRTMPBuffer(subID string) av.Muxer {
return s.rtmpSubscribers[subID]
}

func (s *StreamSubscriber) HLSSubscribers() []string {
func (s *StreamSubscriber) HLSSubscribersReport() []string {
var res []string
for sub, v := range s.hlsSubscribers {
res = append(res, fmt.Sprintf("%v: %v", reflect.TypeOf(v), sub))
}
return res
}

func (s *StreamSubscriber) RTMPSubscribers() []string {
func (s *StreamSubscriber) RTMPSubscribersReport() []string {
var res []string
for sub, v := range s.rtmpSubscribers {
res = append(res, fmt.Sprintf("%v: %v", reflect.TypeOf(v), sub))
Expand Down
13 changes: 7 additions & 6 deletions vidlistener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vidlistener

import (
"context"
"net/url"
"os"
"time"

Expand All @@ -26,20 +27,20 @@ type VidListener struct {
//HandleRTMPPublish immediately turns the RTMP stream into segmented HLS, and writes it into a stream.
//It exposes getStreamID so the user can name the stream, and getStream so the user can keep track of all the streams.
func (self *VidListener) HandleRTMPPublish(
getStreamID func(reqPath string) (string, error),
getStream func(reqPath string) (rtmpStrm stream.Stream, hlsStrm stream.Stream, err error),
getStreamID func(url *url.URL) (string, error),
getStream func(url *url.URL) (rtmpStrm stream.Stream, hlsStrm stream.Stream, err error),
endStream func(rtmpStrmID string, hlsStrmID string)) error {

self.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
glog.Infof("RTMP server got upstream")
glog.Infof("RTMP server got upstream: %v", conn.URL)

_, err := getStreamID(conn.URL.Path)
_, err := getStreamID(conn.URL)
if err != nil {
glog.Errorf("RTMP Stream Publish Error: %v", err)
return
}

rs, hs, err := getStream(conn.URL.Path)
rs, hs, err := getStream(conn.URL)
if err != nil {
glog.Errorf("RTMP Publish couldn't get a destination stream for %v", conn.URL.Path)
return
Expand Down Expand Up @@ -75,7 +76,7 @@ func (self *VidListener) segmentStream(ctx context.Context, rs stream.Stream, hs
localRtmpUrl := "rtmp://localhost" + self.RtmpServer.Addr + "/stream/" + rs.GetStreamID()
s := segmenter.NewFFMpegVideoSegmenter(workDir, hs.GetStreamID(), localRtmpUrl, segOptions.SegLength, self.FfmpegPath)
c := make(chan error, 1)
go func() { c <- s.RTMPToHLS(ctx, segOptions) }()
go func() { c <- s.RTMPToHLS(ctx, segOptions, true) }()

go func() {
c <- func() error {
Expand Down
5 changes: 3 additions & 2 deletions vidlistener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vidlistener

import (
"fmt"
"net/url"
"os/exec"
"testing"
"time"
Expand All @@ -14,10 +15,10 @@ func TestListener(t *testing.T) {
server := &joy4rtmp.Server{Addr: ":1937"}
listener := &VidListener{RtmpServer: server}
listener.HandleRTMPPublish(
func(reqPath string) (string, error) {
func(url *url.URL) (string, error) {
return "test", nil
},
func(reqPath string) (stream.Stream, stream.Stream, error) {
func(url *url.URL) (stream.Stream, stream.Stream, error) {
// return errors.New("Some Error")
return stream.NewVideoStream("test", stream.RTMP), stream.NewVideoStream("test", stream.HLS), nil
},
Expand Down
Loading

0 comments on commit e97eb09

Please sign in to comment.