diff --git a/README.md b/README.md index daab390705..dd631e83e4 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ lpms.HandleRTMPPublish( //getStream func(reqPath string) (stream.Stream, error) { streamID := getStreamIDFromPath(reqPath) - stream := stream.NewStream(streamID) + stream := stream.NewVideoStream(streamID) streamDB.db[streamID] = stream return stream, nil }, @@ -132,7 +132,7 @@ lpms.HandleTranscode( //getOutStream func(ctx context.Context, streamID string) (stream.Stream, error) { //For this example, we'll name the transcoded stream "{streamID}_tran" - newStream := stream.NewStream(streamID + "_tran") + newStream := stream.NewVideoStream(streamID + "_tran") streamDB.db[newStream.StreamID] = newStream return newStream, nil }) diff --git a/cmd/lpms.go b/cmd/lpms.go index a402d4299c..fee4f9e973 100644 --- a/cmd/lpms.go +++ b/cmd/lpms.go @@ -37,7 +37,7 @@ func main() { //getStream func(reqPath string) (stream.Stream, error) { streamID := getStreamIDFromPath(reqPath) - stream := stream.NewStream(streamID) + stream := stream.NewVideoStream(streamID) streamDB.db[streamID] = stream return stream, nil }, @@ -61,7 +61,7 @@ func main() { //getOutStream func(ctx context.Context, streamID string) (stream.Stream, error) { //For this example, we'll name the transcoded stream "{streamID}_tran" - newStream := stream.NewStream(streamID + "_tran") + newStream := stream.NewVideoStream(streamID + "_tran") streamDB.db[newStream.GetStreamID()] = newStream return newStream, nil }) diff --git a/data/bunny.mp4 b/data/bunny.mp4 new file mode 100644 index 0000000000..81d11df5f2 Binary files /dev/null and b/data/bunny.mp4 differ diff --git a/data/bunny2.mp4 b/data/bunny2.mp4 new file mode 100644 index 0000000000..3ce062e77b Binary files /dev/null and b/data/bunny2.mp4 differ diff --git a/stream/fileStream.go b/stream/fileStream.go index 11541cc2bd..8e6d64e9af 100644 --- a/stream/fileStream.go +++ b/stream/fileStream.go @@ -1 +1,55 @@ package stream + +import ( + "context" + "io/ioutil" + + "github.com/kz26/m3u8" + "github.com/nareix/joy4/av" +) + +//For now, this class is only for testing purposes (so we can write out the transcoding results and compare) +type FileStream struct { + StreamID string + // RTMPTimeout time.Duration + // HLSTimeout time.Duration + buffer *streamBuffer +} + +var outputDir = "data" + +func (s *FileStream) Len() int64 { + return s.buffer.len() +} + +func NewFileStream(id string) *FileStream { + return &FileStream{buffer: newStreamBuffer(), StreamID: id} +} + +func (s *FileStream) GetStreamID() string { + return s.StreamID +} + +//ReadRTMPFromStream reads the content from the RTMP stream out into the dst. +func (s *FileStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error { + return nil +} + +func (s *FileStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error { + return nil +} + +func (s *FileStream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error { + return nil +} + +func (s *FileStream) WriteHLSSegmentToStream(seg HLSSegment) error { + err := ioutil.WriteFile("./data/"+s.StreamID+"_"+seg.Name, seg.Data, 0644) + // check(err) + + return err +} + +func (s *FileStream) ReadHLSFromStream(buffer HLSMuxer) error { + return nil +} diff --git a/stream/stream.go b/stream/stream.go index 2d441d9c22..d29e9d81d3 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -3,6 +3,7 @@ package stream import ( "context" "errors" + "fmt" "io" "reflect" @@ -30,6 +31,8 @@ func newStreamBuffer() *streamBuffer { } func (b *streamBuffer) push(in interface{}) error { + fmt.Println("PUSH----") + fmt.Println(b) b.q.Put(in) return nil } @@ -83,7 +86,7 @@ func (s *VideoStream) Len() int64 { return s.buffer.len() } -func NewStream(id string) *VideoStream { +func NewVideoStream(id string) *VideoStream { return &VideoStream{buffer: newStreamBuffer(), StreamID: id} } diff --git a/stream/stream_test.go b/stream/stream_test.go index 897f113d9b..e776549333 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -52,7 +52,7 @@ func (d NoEOFDemuxer) ReadPacket() (av.Packet, error) { func TestWriteRTMPErrors(t *testing.T) { // stream := Stream{Buffer: &StreamBuffer{}, StreamID: "test"} - stream := NewStream("test") + stream := NewVideoStream("test") err := stream.WriteRTMPToStream(context.Background(), BadStreamsDemuxer{}) if err != ErrStreams { t.Error("Expecting Streams Error, but got: ", err) @@ -87,7 +87,7 @@ func (d PacketsDemuxer) ReadPacket() (av.Packet, error) { func TestWriteRTMP(t *testing.T) { // stream := Stream{Buffer: NewStreamBuffer(), StreamID: "test"} - stream := NewStream("test") + stream := NewVideoStream("test") err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}}) if err != io.EOF { @@ -121,7 +121,7 @@ func (d BadPacketMuxer) WriteTrailer() error { return nil } func (d BadPacketMuxer) WritePacket(av.Packet) error { return ErrBadPacket } func TestReadRTMPError(t *testing.T) { - stream := NewStream("test") + stream := NewVideoStream("test") err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}}) if err != io.EOF { t.Error("Error setting up the test - while inserting packet.") @@ -147,7 +147,7 @@ func (d PacketsMuxer) WriteTrailer() error { return nil } func (d PacketsMuxer) WritePacket(av.Packet) error { return nil } func TestReadRTMP(t *testing.T) { - stream := NewStream("test") + stream := NewVideoStream("test") err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}}) if err != io.EOF { t.Error("Error setting up the test - while inserting packet.") @@ -162,7 +162,7 @@ func TestReadRTMP(t *testing.T) { t.Error("Expecting buffer length to be 0, but got ", stream.Len()) } - stream2 := NewStream("test2") + stream2 := NewVideoStream("test2") stream2.RTMPTimeout = time.Millisecond * 50 err2 := stream.WriteRTMPToStream(context.Background(), NoEOFDemuxer{c: &Counter{Count: 0}}) if err2 != ErrDroppedRTMPStream { @@ -175,7 +175,7 @@ func TestReadRTMP(t *testing.T) { } func TestWriteHLS(t *testing.T) { - stream := NewStream("test") + stream := NewVideoStream("test") err1 := stream.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{}) err2 := stream.WriteHLSSegmentToStream(HLSSegment{}) if err1 != nil { @@ -199,7 +199,7 @@ func TestWriteHLS(t *testing.T) { // } func TestReadHLS(t *testing.T) { - stream := NewStream("test") + stream := NewVideoStream("test") stream.HLSTimeout = time.Millisecond * 100 buffer := NewHLSBuffer() grBefore := runtime.NumGoroutine() @@ -267,7 +267,7 @@ func TestReadHLS(t *testing.T) { // } // func TestWriteHLS(t *testing.T) { -// stream := NewStream("test") +// stream := NewVideoStream("test") // numGR := runtime.NumGoroutine() // ctx, cancel := context.WithCancel(context.Background()) // err := stream.WriteHLSToStream(ctx, GoodHLSDemux{}) @@ -305,7 +305,7 @@ func TestReadHLS(t *testing.T) { // //This test is more for documentation - this is how timeout works here. // func TestWriteHLSTimeout(t *testing.T) { -// stream := NewStream("test") +// stream := NewVideoStream("test") // numGR := runtime.NumGoroutine() // ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) // defer cancel() diff --git a/transcoder/external_test.go b/transcoder/external_test.go index 1dedc93364..3820bf7be2 100644 --- a/transcoder/external_test.go +++ b/transcoder/external_test.go @@ -50,7 +50,7 @@ func TestStartUpload(t *testing.T) { tr := &ExternalTranscoder{} mux := &PacketsMuxer{} demux := &PacketsDemuxer{c: &Counter{}} - stream := stream.NewStream("test") + stream := stream.NewVideoStream("test") stream.WriteRTMPToStream(context.Background(), demux) ctx := context.Background() @@ -79,7 +79,7 @@ func (d Downloader) Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSeg func TestStartDownload(t *testing.T) { // fmt.Println("Testing Download") d := Downloader{} - s := stream.NewStream("test") + s := stream.NewVideoStream("test") tr := &ExternalTranscoder{downloader: d} err := tr.StartDownload(context.Background(), s) diff --git a/vidlistener/listener_test.go b/vidlistener/listener_test.go index 30f54d9b51..0f39e968e4 100644 --- a/vidlistener/listener_test.go +++ b/vidlistener/listener_test.go @@ -1,6 +1,7 @@ package vidlistener import ( + "fmt" "os/exec" "testing" "time" @@ -9,7 +10,7 @@ import ( joy4rtmp "github.com/nareix/joy4/format/rtmp" ) -func TestError(t *testing.T) { +func TestListener(t *testing.T) { server := &joy4rtmp.Server{Addr: ":1937"} listener := &VidListener{RtmpServer: server} listener.HandleRTMPPublish( @@ -18,17 +19,24 @@ func TestError(t *testing.T) { }, func(reqPath string) (stream.Stream, error) { // return errors.New("Some Error") - return &stream.VideoStream{}, nil + return stream.NewVideoStream("test"), nil }, func(reqPath string) {}) ffmpegCmd := "ffmpeg" ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1937/movie/stream"} - go exec.Command(ffmpegCmd, ffmpegArgs...).Run() + cmd := exec.Command(ffmpegCmd, ffmpegArgs...) + go cmd.Run() go listener.RtmpServer.ListenAndServe() time.Sleep(time.Second * 1) + err := cmd.Process.Kill() + if err != nil { + fmt.Println("Error killing ffmpeg") + } + + // time.Sleep(time.Second * 1) } // Integration test. diff --git a/vidplayer/player_test.go b/vidplayer/player_test.go index 38d9447a94..3785bce0fd 100644 --- a/vidplayer/player_test.go +++ b/vidplayer/player_test.go @@ -56,7 +56,7 @@ func TestRTMP(t *testing.T) { func TestHLS(t *testing.T) { player := &VidPlayer{} - s := stream.NewStream("test") + s := stream.NewVideoStream("test") s.HLSTimeout = time.Second * 5 //Write some packets into the stream s.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{})