Skip to content

[Collector] WebRtc  #74

Open
Open
@saraswatpuneet

Description

@saraswatpuneet

Details

  • user should be able to connect a live video stream using webrtc collector
  • users should be able to stream in a collection of audio video dataset
  • in general try supporting P2P and webrtc is one flavor of it
import asyncio
import cv2
from aiortc import VideoStreamTrack, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer

# Create a simple video track that captures video from a webcam
class VideoTransformTrack(VideoStreamTrack):
    def __init__(self, track):
        super().__init__()
        self.track = track

    async def recv(self):
        frame = await self.track.recv()
        # Perform any processing on the video frame here if needed
        return frame

async def consume_signaling(queue, pc):
    while True:
        obj = await queue.get()
        if obj["type"] == "offer":
            await pc.setRemoteDescription(RTCSessionDescription(sdp=obj["sdp"], type="offer"))
            await pc.setLocalDescription(await pc.createAnswer())
            await queue.put({"type": "answer", "sdp": pc.localDescription.sdp})
        elif obj["type"] == "candidate":
            candidate = RTCIceCandidate(obj["candidate"])
            await pc.addIceCandidate(candidate)

async def main():
    pc = RTCPeerConnection()

    @pc.on("datachannel")
    def on_datachannel(channel):
        @channel.on("message")
        def on_message(message):
            if isinstance(message, str) and message.startswith("ping"):
                channel.send("pong" + message[4:])

    player = MediaPlayer("test.mp4")
    await pc.setLocalDescription(await pc.createOffer())
    queue = asyncio.Queue()

    pc_id = "PeerConnection(%s)" % id(pc)
    def log_info(msg, *args):
        print(pc_id, msg, *args)

    @pc.on("datachannel")
    def on_datachannel(channel):
        def channel_log_info(msg, *args):
            log_info("[datachannel] " + msg, *args)
        channel.on("message")(channel_log_info)
        channel.on("open")(lambda: channel_log_info("opened"))
        channel.on("close")(lambda: channel_log_info("closed"))

    @pc.on("iceconnectionstatechange")
    def on_iceconnectionstatechange():
        log_info("ICE connection state is %s", pc.iceConnectionState)
        if pc.iceConnectionState == "failed":
            pc.close()
            print("Connection failed.")
    
    @pc.on("icecandidate")
    def on_icecandidate(candidate):
        queue.put({"type": "candidate", "candidate": candidate.to_dict()})

    pc.createDataChannel("chat")

    await consume_signaling(queue, pc)


Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions