Open
Description
Details
- user should be able to connect a live video stream using webrtc collector
- users should be able to stream in a collection of audio video dataset
- in general try supporting P2P and webrtc is one flavor of it
import asyncio
import cv2
from aiortc import VideoStreamTrack, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
# Create a simple video track that captures video from a webcam
class VideoTransformTrack(VideoStreamTrack):
def __init__(self, track):
super().__init__()
self.track = track
async def recv(self):
frame = await self.track.recv()
# Perform any processing on the video frame here if needed
return frame
async def consume_signaling(queue, pc):
while True:
obj = await queue.get()
if obj["type"] == "offer":
await pc.setRemoteDescription(RTCSessionDescription(sdp=obj["sdp"], type="offer"))
await pc.setLocalDescription(await pc.createAnswer())
await queue.put({"type": "answer", "sdp": pc.localDescription.sdp})
elif obj["type"] == "candidate":
candidate = RTCIceCandidate(obj["candidate"])
await pc.addIceCandidate(candidate)
async def main():
pc = RTCPeerConnection()
@pc.on("datachannel")
def on_datachannel(channel):
@channel.on("message")
def on_message(message):
if isinstance(message, str) and message.startswith("ping"):
channel.send("pong" + message[4:])
player = MediaPlayer("test.mp4")
await pc.setLocalDescription(await pc.createOffer())
queue = asyncio.Queue()
pc_id = "PeerConnection(%s)" % id(pc)
def log_info(msg, *args):
print(pc_id, msg, *args)
@pc.on("datachannel")
def on_datachannel(channel):
def channel_log_info(msg, *args):
log_info("[datachannel] " + msg, *args)
channel.on("message")(channel_log_info)
channel.on("open")(lambda: channel_log_info("opened"))
channel.on("close")(lambda: channel_log_info("closed"))
@pc.on("iceconnectionstatechange")
def on_iceconnectionstatechange():
log_info("ICE connection state is %s", pc.iceConnectionState)
if pc.iceConnectionState == "failed":
pc.close()
print("Connection failed.")
@pc.on("icecandidate")
def on_icecandidate(candidate):
queue.put({"type": "candidate", "candidate": candidate.to_dict()})
pc.createDataChannel("chat")
await consume_signaling(queue, pc)
Metadata
Metadata
Assignees
Labels
No labels