Skip to content

Commit

Permalink
fix the 'can't buffer any more data for the data channel' issue
Browse files Browse the repository at this point in the history
  • Loading branch information
RTM945 committed May 7, 2020
1 parent bcaa978 commit 1401024
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 33 deletions.
4 changes: 2 additions & 2 deletions simple-test-page/beautiful.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ <h4 class="modal-title">输入提取码</h4>
<table id="tasks" class="table table-hover" style="display: none;">
<thead>
<tr>
<th style="width: 40%;">文件名</th>
<th style="width: 50%;">文件名</th>
<th style="width: 50%;">进度</th>
<th style="width: 9%;">操作</th>
<!-- <th style="width: 9%;">操作</th> -->
</tr>
</thead>
<tbody>
Expand Down
11 changes: 7 additions & 4 deletions simple-test-page/js/main.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] }
//const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] }
const downloadTasks = new Map()

var stompClient = null
Expand Down Expand Up @@ -28,7 +28,8 @@ function connect(token) {
}

function createPeer() {
let pc = new RTCPeerConnection(servers)
//let pc = new RTCPeerConnection(servers)
let pc = new RTCPeerConnection(null)
console.log(pc)
pc.onicecandidate = (event) => {
if (event.candidate) {
Expand Down Expand Up @@ -156,7 +157,7 @@ function setProgress(task) {
</div>
</div>
</td>
<td><span class="glyphicon glyphicon-remove" aria-hidden="true"></span> 取消</td>
<!--<td><span class="glyphicon glyphicon-remove" aria-hidden="true"></span> 取消</td>-->
</tr>`
table.append(tr)
}
Expand All @@ -176,9 +177,11 @@ function download(path) {
protocolDataChannel.send(JSON.stringify({ handler: 'download', data: path }))
}

const downloadpreffix = 'download-'
function recieveData(channel) {
let path = channel.label.split('-')[1]
let path = channel.label.substring(downloadpreffix.length)
let myTask = downloadTasks.get(path)
console.log(myTask)
if (myTask) {
channel.binaryType = 'arraybuffer'
channel.onmessage = ({ data }) => {
Expand Down
4 changes: 2 additions & 2 deletions webrtc-cli/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ ipc.on('download', (event, task) => {
}
const readStream = fs.createReadStream(filepath)
readStream.on('data', chunk => {
task.received += chunk.length
win.webContents.send('sendData', task, chunk)
// task.received += chunk.length
win.webContents.send('sendData', task.channelId, chunk)
})
})
118 changes: 93 additions & 25 deletions webrtc-cli/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const peerMap = new Map()
const channels = new Map()
const peer_channel = new Map()
const tasks = new Map()

const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] }
// const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] }
var stompClient = null

function connect(signal) {
Expand Down Expand Up @@ -54,7 +55,8 @@ function oncandidate(msg) {
}

function createPeer(remote) {
let pc = new RTCPeerConnection(servers)
// let pc = new RTCPeerConnection(servers)
let pc = new RTCPeerConnection(null)
console.log(pc)
peerMap.set(remote, pc)
peer_channel.set(remote, new Array())
Expand Down Expand Up @@ -118,39 +120,105 @@ async function listFiles(dir) {
return files
}

const MAX_CHUNK_SIZE = 262144;
async function download(filepath, size, remote) {
let pc = peerMap.get(remote)
if (!pc) {
console.log(`no peerconnection for ${remote}`)
return
}
let channel = pc.createDataChannel(`download-${filepath}`)
const channelId = nextChannelId()
console.log(`channelId ${channelId}`)
channels.set(channelId, channel)
peer_channel.get(remote).push(channelId)
channel.binaryType = 'arraybuffer'
channel.onclose = () => {
console.log(`remote ${remote} channel ${channel.label} close`)
channels.delete(channelId)
}
const task = { filepath: filepath, size: size, received: 0, channelId: channelId, timeout: null }
// tasks.push(task)
let channel = pc.createDataChannel(`download-${filepath}`)
let task = {
filepath: filepath,
chunkSize: 0,
lowWaterMark: 0,
highWaterMark: 0,
bufferedAmount: 0,
size: size,
received: 0,
send: 0,
buffered: [],
channelId: channelId,
pause: false
}
tasks.set(channelId, task)
channel.onopen = () => {
task.chunkSize = Math.min(pc.sctp.maxMessageSize, MAX_CHUNK_SIZE)
task.lowWaterMark = task.chunkSize
task.highWaterMark = Math.max(task.chunkSize * 8, 1048576)
console.log('Send buffer low water threshold: ', task.lowWaterMark)
console.log('Send buffer high water threshold: ', task.highWaterMark)
channel.bufferedAmountLowThreshold = task.lowWaterMark
channel.onbufferedamountlow = (e) => {
console.log('BufferedAmountLow event:', e);
if (task.pause) {
task.pause = false
sendData(channel, task)
}
}
console.log(`channelId ${channelId}`)
channels.set(channelId, channel)
peer_channel.get(remote).push(channelId)
channel.binaryType = 'arraybuffer'
channel.onclose = () => {
console.log(`remote ${remote} channel ${channel.label} close`)
channels.delete(channelId)
}
}

window.electron.send('download', task)
}

window.electron.on('sendData', (task, data) => {
const channel = channels.get(task.channelId)
if (channel && channel.readyState == 'open') {
channel.send(data)
console.log(`${task.received} / ${task.size}`)
if (task.received == task.size) {
console.log('send finished')
channel.close()
}
window.electron.on('sendData', (channelId, data) => {
const channel = channels.get(channelId)
const task = tasks.get(channelId)
if (channel && task) {
task.received += data.length
task.buffered.push(data)
sendData(channel, task)
}
// if (channel && task && channel.readyState == 'open') {
// channel.send(data)
// console.log(`${task.received} / ${task.size}`)
// if (task.received == task.size) {
// console.log('send finished')
// channel.close()
// }
// }
})

function sendData(channel, task) {
console.log(task.pause)
if (task.pause) {
return
}
if (channel.readyState == 'open') {
let bufferedAmount = channel.bufferedAmount;
let data = task.buffered.shift()
while (data) {
channel.send(data.buffer);
task.send += data.length;
bufferedAmount += data.length;
console.log(`${task.send} / ${task.size}`)
if (bufferedAmount >= task.highWaterMark) {
// if (channel.bufferedAmount < task.lowWaterMark) {
// task.pause = true;
// // task.timeout = setTimeout(() => sendData(channel, task), 0);
// }
task.pause = true;
console.log(`Paused sending, buffered amount: ${task.bufferedAmount} (announced: ${channel.bufferedAmount})`);
break;
}
data = task.buffered.shift()
}
}
if (task.send == task.size) {
console.log('send finished')
channel.close()
}
}

window.electron.on('readFileErr', (task, err) => {
console.log(err)
const channel = channels.get(task.channelId)
Expand All @@ -174,9 +242,9 @@ function makeid(length) {
return result
}

var nextChannelId = function() {
let channelId = 0
return function() {
var nextChannelId = function () {
let channelId = 0
return function () {
channelId++
return channelId
}
Expand Down

0 comments on commit 1401024

Please sign in to comment.