Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rtmp Client Fallback Stream #159

Open
hasnhasan opened this issue Oct 28, 2024 · 1 comment
Open

Rtmp Client Fallback Stream #159

hasnhasan opened this issue Oct 28, 2024 · 1 comment

Comments

@hasnhasan
Copy link

hasnhasan commented Oct 28, 2024

I am sending the incoming broadcast to the RTMP Server to another client. If the incoming broadcast to the Server is interrupted, I send the fallback video until the broadcast comes back. I get the following error.

[h264 @ 0x7f79a20414c0] cabac_init_idc 32 overflow
[h264 @ 0x7f79a20414c0] decode_slice_header error
[h264 @ 0x7f79a20414c0] no frame!
[h264 @ 0x7f79b2046c80] co located POCs unavailable
[NULL @ 0x7f79617045c0] illegal reordering_of_pic_nums_idc 4
[h264 @ 0x7f7982219d40] illegal modification_of_pic_nums_idc 4
[h264 @ 0x7f7982219d40] decode_slice_header error
[h264 @ 0x7f7982219d40] no frame!

package main

import (
	"fmt"
	"io"
	"log"
	"net"
	"net/url"
	"os"
	"sync"

	"github.com/yapingcat/gomedia/go-codec"
	"github.com/yapingcat/gomedia/go-flv"
	"github.com/yapingcat/gomedia/go-rtmp"
)

type RTMPRelay struct {
	conn net.Conn
	server     *rtmp.RtmpServerHandle
	client     *rtmp.RtmpClient
	fallbackFLV string
	mutex      sync.Mutex
	isFallback bool
	lastPts, lastDts uint32
}

type Broadcast map[string]*RTMPRelay

var Broadcasts Broadcast = make(map[string]*RTMPRelay)
var mtx sync.Mutex

func (b *Broadcast) Find(name string) *RTMPRelay {
	mtx.Lock()
	defer mtx.Unlock()
	if p, found := (*b)[name]; found {
		return p
	} else {
		return nil
	}
}

func NewRtmpClient(rtmpUrl string) *rtmp.RtmpClient {
	u, err := url.Parse(rtmpUrl)
	if err != nil {
		panic(err)
	}
	host := u.Host
	if u.Port() == "" {
		host += ":1935"
	}
	clientConn, err := net.Dial("tcp4", host)
	if err != nil {
		fmt.Println("connect failed", err)
	}

	client := rtmp.NewRtmpClient(rtmp.WithComplexHandshake(), rtmp.WithEnablePublish())
	client.SetOutput(func(data []byte) error {
		_,err := clientConn.Write(data)
		return err
	})

	client.Start(rtmpUrl)
	go func ()  {
		buf := make([]byte, 4096)
		n := 0
		for err == nil {
			n, err = clientConn.Read(buf)
			if err != nil {
				continue
			}
			client.Input(buf[:n])
		}
	}()
	return client
}

func NewRTMPRelay(conn net.Conn) (*RTMPRelay, error) {
	relay := &RTMPRelay{}

	handle := rtmp.NewRtmpServerHandle()
	handle.OnPublish(func(app, streamName string) rtmp.StatusCode {
		relay = Broadcasts.Find(streamName)
		if relay == nil {
			relay = &RTMPRelay{
				conn: conn,
				server: handle,
				client: NewRtmpClient("rtmp://127.0.0.1/live/SyXTJSpx1e"),
				isFallback: false,
				fallbackFLV: "videos/test.flv",
				lastPts: uint32(0),
				lastDts: uint32(0),
			}
		} else {
			relay.stopFallback()
			relay.conn = conn
			relay.server = handle
		}
		Broadcasts[streamName] = relay
		return rtmp.NETSTREAM_PUBLISH_START
	})

	handle.SetOutput(func(b []byte) error {
			_, err := conn.Write(b)
			return err
	})

	handle.OnFrame(func(cid codec.CodecID, pts, dts uint32, frame []byte) {
		if cid == codec.CODECID_VIDEO_H264 || cid == codec.CODECID_VIDEO_H265 {
			relay.lastDts = dts
			relay.lastPts = pts
		}
		relay.client.WriteFrame(cid,frame,pts,dts)
	})

	go func ()  {
		buf := make([]byte, 4096)
		for {
			n, err := conn.Read(buf)
			if err == io.EOF {
				log.Println("Broadcast interrupted, switching to fallback")
				relay.startFallback()
				return
			}
			err = handle.Input(buf[0:n])
			if err != nil {
				log.Printf("Package reading error: %v", err)
				break
			}
		}
		conn.Close()
	}()

	return relay, nil
}

func (r *RTMPRelay) startFallback() {
	r.mutex.Lock()
	r.isFallback = true
	r.mutex.Unlock()
	
	f := flv.CreateFlvReader()

	_lastPts := uint32(0)
	_lastDts := uint32(0)

	f.OnFrame = func(cid codec.CodecID, frame []byte, pts, dts uint32) {
		if !r.isFallback {
			return
		}
		
		if cid == codec.CODECID_VIDEO_H264 || cid == codec.CODECID_VIDEO_H265 {
			pts += r.lastPts
			dts += r.lastDts
			_lastPts = pts
			_lastDts = dts
		}

		r.client.WriteFrame(cid,frame,pts,dts)
	}

	fd, _ := os.Open(r.fallbackFLV)
	defer fd.Close()
	cache := make([]byte, 4096)
	for {
		if !r.isFallback {
			return
		}
		n, err := fd.Read(cache)
		if err != nil {
			fmt.Println(err)
			break
		}
		f.Input(cache[0:n])
		//err = r.server.Input(cache[0:n])
	}
	r.mutex.Lock()
	r.lastDts = _lastDts
	r.lastPts = _lastPts
	r.mutex.Unlock()
}

func (r *RTMPRelay) stopFallback() {
	r.mutex.Lock()
	r.isFallback = false
	r.mutex.Unlock()
}


func main() {
	listen, _ := net.Listen("tcp4", ":1936")
	for {
		conn, _ := listen.Accept()
		go NewRTMPRelay(conn)
	}
}
@hasnhasan
Copy link
Author

@yapingcat I think you need to change “picture numbers” in f.OnFrame. I would be very grateful if you could help me with this 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant