Skip to content

Commit

Permalink
add test data, add fileStream shell
Browse files Browse the repository at this point in the history
  • Loading branch information
ericxtang committed Mar 28, 2017
1 parent 433d29b commit 08a4722
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 20 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ lpms.HandleRTMPPublish(
//getStream
func(reqPath string) (stream.Stream, error) {
streamID := getStreamIDFromPath(reqPath)
stream := stream.NewStream(streamID)
stream := stream.NewVideoStream(streamID)
streamDB.db[streamID] = stream
return stream, nil
},
Expand Down Expand Up @@ -132,7 +132,7 @@ lpms.HandleTranscode(
//getOutStream
func(ctx context.Context, streamID string) (stream.Stream, error) {
//For this example, we'll name the transcoded stream "{streamID}_tran"
newStream := stream.NewStream(streamID + "_tran")
newStream := stream.NewVideoStream(streamID + "_tran")
streamDB.db[newStream.StreamID] = newStream
return newStream, nil
})
Expand Down
4 changes: 2 additions & 2 deletions cmd/lpms.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
//getStream
func(reqPath string) (stream.Stream, error) {
streamID := getStreamIDFromPath(reqPath)
stream := stream.NewStream(streamID)
stream := stream.NewVideoStream(streamID)
streamDB.db[streamID] = stream
return stream, nil
},
Expand All @@ -61,7 +61,7 @@ func main() {
//getOutStream
func(ctx context.Context, streamID string) (stream.Stream, error) {
//For this example, we'll name the transcoded stream "{streamID}_tran"
newStream := stream.NewStream(streamID + "_tran")
newStream := stream.NewVideoStream(streamID + "_tran")
streamDB.db[newStream.GetStreamID()] = newStream
return newStream, nil
})
Expand Down
Binary file added data/bunny.mp4
Binary file not shown.
Binary file added data/bunny2.mp4
Binary file not shown.
54 changes: 54 additions & 0 deletions stream/fileStream.go
Original file line number Diff line number Diff line change
@@ -1 +1,55 @@
package stream

import (
"context"
"io/ioutil"

"github.com/kz26/m3u8"
"github.com/nareix/joy4/av"
)

//For now, this class is only for testing purposes (so we can write out the transcoding results and compare)
type FileStream struct {
StreamID string
// RTMPTimeout time.Duration
// HLSTimeout time.Duration
buffer *streamBuffer
}

var outputDir = "data"

func (s *FileStream) Len() int64 {
return s.buffer.len()
}

func NewFileStream(id string) *FileStream {
return &FileStream{buffer: newStreamBuffer(), StreamID: id}
}

func (s *FileStream) GetStreamID() string {
return s.StreamID
}

//ReadRTMPFromStream reads the content from the RTMP stream out into the dst.
func (s *FileStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error {
return nil
}

func (s *FileStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error {
return nil
}

func (s *FileStream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error {
return nil
}

func (s *FileStream) WriteHLSSegmentToStream(seg HLSSegment) error {
err := ioutil.WriteFile("./data/"+s.StreamID+"_"+seg.Name, seg.Data, 0644)
// check(err)

return err
}

func (s *FileStream) ReadHLSFromStream(buffer HLSMuxer) error {
return nil
}
5 changes: 4 additions & 1 deletion stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stream
import (
"context"
"errors"
"fmt"
"io"
"reflect"

Expand Down Expand Up @@ -30,6 +31,8 @@ func newStreamBuffer() *streamBuffer {
}

func (b *streamBuffer) push(in interface{}) error {
fmt.Println("PUSH----")
fmt.Println(b)
b.q.Put(in)
return nil
}
Expand Down Expand Up @@ -83,7 +86,7 @@ func (s *VideoStream) Len() int64 {
return s.buffer.len()
}

func NewStream(id string) *VideoStream {
func NewVideoStream(id string) *VideoStream {
return &VideoStream{buffer: newStreamBuffer(), StreamID: id}
}

Expand Down
18 changes: 9 additions & 9 deletions stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (d NoEOFDemuxer) ReadPacket() (av.Packet, error) {

func TestWriteRTMPErrors(t *testing.T) {
// stream := Stream{Buffer: &StreamBuffer{}, StreamID: "test"}
stream := NewStream("test")
stream := NewVideoStream("test")
err := stream.WriteRTMPToStream(context.Background(), BadStreamsDemuxer{})
if err != ErrStreams {
t.Error("Expecting Streams Error, but got: ", err)
Expand Down Expand Up @@ -87,7 +87,7 @@ func (d PacketsDemuxer) ReadPacket() (av.Packet, error) {

func TestWriteRTMP(t *testing.T) {
// stream := Stream{Buffer: NewStreamBuffer(), StreamID: "test"}
stream := NewStream("test")
stream := NewVideoStream("test")
err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}})

if err != io.EOF {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (d BadPacketMuxer) WriteTrailer() error { return nil }
func (d BadPacketMuxer) WritePacket(av.Packet) error { return ErrBadPacket }

func TestReadRTMPError(t *testing.T) {
stream := NewStream("test")
stream := NewVideoStream("test")
err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}})
if err != io.EOF {
t.Error("Error setting up the test - while inserting packet.")
Expand All @@ -147,7 +147,7 @@ func (d PacketsMuxer) WriteTrailer() error { return nil }
func (d PacketsMuxer) WritePacket(av.Packet) error { return nil }

func TestReadRTMP(t *testing.T) {
stream := NewStream("test")
stream := NewVideoStream("test")
err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}})
if err != io.EOF {
t.Error("Error setting up the test - while inserting packet.")
Expand All @@ -162,7 +162,7 @@ func TestReadRTMP(t *testing.T) {
t.Error("Expecting buffer length to be 0, but got ", stream.Len())
}

stream2 := NewStream("test2")
stream2 := NewVideoStream("test2")
stream2.RTMPTimeout = time.Millisecond * 50
err2 := stream.WriteRTMPToStream(context.Background(), NoEOFDemuxer{c: &Counter{Count: 0}})
if err2 != ErrDroppedRTMPStream {
Expand All @@ -175,7 +175,7 @@ func TestReadRTMP(t *testing.T) {
}

func TestWriteHLS(t *testing.T) {
stream := NewStream("test")
stream := NewVideoStream("test")
err1 := stream.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{})
err2 := stream.WriteHLSSegmentToStream(HLSSegment{})
if err1 != nil {
Expand All @@ -199,7 +199,7 @@ func TestWriteHLS(t *testing.T) {
// }

func TestReadHLS(t *testing.T) {
stream := NewStream("test")
stream := NewVideoStream("test")
stream.HLSTimeout = time.Millisecond * 100
buffer := NewHLSBuffer()
grBefore := runtime.NumGoroutine()
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestReadHLS(t *testing.T) {
// }

// func TestWriteHLS(t *testing.T) {
// stream := NewStream("test")
// stream := NewVideoStream("test")
// numGR := runtime.NumGoroutine()
// ctx, cancel := context.WithCancel(context.Background())
// err := stream.WriteHLSToStream(ctx, GoodHLSDemux{})
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestReadHLS(t *testing.T) {

// //This test is more for documentation - this is how timeout works here.
// func TestWriteHLSTimeout(t *testing.T) {
// stream := NewStream("test")
// stream := NewVideoStream("test")
// numGR := runtime.NumGoroutine()
// ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
// defer cancel()
Expand Down
4 changes: 2 additions & 2 deletions transcoder/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestStartUpload(t *testing.T) {
tr := &ExternalTranscoder{}
mux := &PacketsMuxer{}
demux := &PacketsDemuxer{c: &Counter{}}
stream := stream.NewStream("test")
stream := stream.NewVideoStream("test")
stream.WriteRTMPToStream(context.Background(), demux)
ctx := context.Background()

Expand Down Expand Up @@ -79,7 +79,7 @@ func (d Downloader) Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSeg
func TestStartDownload(t *testing.T) {
// fmt.Println("Testing Download")
d := Downloader{}
s := stream.NewStream("test")
s := stream.NewVideoStream("test")
tr := &ExternalTranscoder{downloader: d}
err := tr.StartDownload(context.Background(), s)

Expand Down
14 changes: 11 additions & 3 deletions vidlistener/listener_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vidlistener

import (
"fmt"
"os/exec"
"testing"
"time"
Expand All @@ -9,7 +10,7 @@ import (
joy4rtmp "github.com/nareix/joy4/format/rtmp"
)

func TestError(t *testing.T) {
func TestListener(t *testing.T) {
server := &joy4rtmp.Server{Addr: ":1937"}
listener := &VidListener{RtmpServer: server}
listener.HandleRTMPPublish(
Expand All @@ -18,17 +19,24 @@ func TestError(t *testing.T) {
},
func(reqPath string) (stream.Stream, error) {
// return errors.New("Some Error")
return &stream.VideoStream{}, nil
return stream.NewVideoStream("test"), nil
},
func(reqPath string) {})

ffmpegCmd := "ffmpeg"
ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1937/movie/stream"}
go exec.Command(ffmpegCmd, ffmpegArgs...).Run()

cmd := exec.Command(ffmpegCmd, ffmpegArgs...)
go cmd.Run()
go listener.RtmpServer.ListenAndServe()

time.Sleep(time.Second * 1)
err := cmd.Process.Kill()
if err != nil {
fmt.Println("Error killing ffmpeg")
}

// time.Sleep(time.Second * 1)
}

// Integration test.
Expand Down
2 changes: 1 addition & 1 deletion vidplayer/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestRTMP(t *testing.T) {

func TestHLS(t *testing.T) {
player := &VidPlayer{}
s := stream.NewStream("test")
s := stream.NewVideoStream("test")
s.HLSTimeout = time.Second * 5
//Write some packets into the stream
s.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{})
Expand Down

0 comments on commit 08a4722

Please sign in to comment.