diff --git a/simple-test-page/beautiful.html b/simple-test-page/beautiful.html index b584b91..1a81c37 100644 --- a/simple-test-page/beautiful.html +++ b/simple-test-page/beautiful.html @@ -67,9 +67,9 @@ - + - + diff --git a/simple-test-page/js/main.js b/simple-test-page/js/main.js index c725318..112b0e3 100644 --- a/simple-test-page/js/main.js +++ b/simple-test-page/js/main.js @@ -1,4 +1,4 @@ -const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] } +//const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] } const downloadTasks = new Map() var stompClient = null @@ -28,7 +28,8 @@ function connect(token) { } function createPeer() { - let pc = new RTCPeerConnection(servers) + //let pc = new RTCPeerConnection(servers) + let pc = new RTCPeerConnection(null) console.log(pc) pc.onicecandidate = (event) => { if (event.candidate) { @@ -156,7 +157,7 @@ function setProgress(task) { - + ` table.append(tr) } @@ -176,9 +177,11 @@ function download(path) { protocolDataChannel.send(JSON.stringify({ handler: 'download', data: path })) } +const downloadpreffix = 'download-' function recieveData(channel) { - let path = channel.label.split('-')[1] + let path = channel.label.substring(downloadpreffix.length) let myTask = downloadTasks.get(path) + console.log(myTask) if (myTask) { channel.binaryType = 'arraybuffer' channel.onmessage = ({ data }) => { diff --git a/webrtc-cli/app.js b/webrtc-cli/app.js index 7ed3f42..1b9d142 100644 --- a/webrtc-cli/app.js +++ b/webrtc-cli/app.js @@ -133,7 +133,7 @@ ipc.on('download', (event, task) => { } const readStream = fs.createReadStream(filepath) readStream.on('data', chunk => { - task.received += chunk.length - win.webContents.send('sendData', task, chunk) + // task.received += chunk.length + win.webContents.send('sendData', task.channelId, chunk) }) }) \ No newline at end of file diff --git a/webrtc-cli/index.js b/webrtc-cli/index.js index 836085c..7135b5a 100644 --- a/webrtc-cli/index.js +++ b/webrtc-cli/index.js @@ -1,8 +1,9 @@ const peerMap = new Map() const channels = new Map() const peer_channel = new Map() +const tasks = new Map() -const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] } +// const servers = { iceServers: [{ "urls": ["stun:stun.l.google.com:19302"] }] } var stompClient = null function connect(signal) { @@ -54,7 +55,8 @@ function oncandidate(msg) { } function createPeer(remote) { - let pc = new RTCPeerConnection(servers) + // let pc = new RTCPeerConnection(servers) + let pc = new RTCPeerConnection(null) console.log(pc) peerMap.set(remote, pc) peer_channel.set(remote, new Array()) @@ -118,39 +120,105 @@ async function listFiles(dir) { return files } +const MAX_CHUNK_SIZE = 262144; async function download(filepath, size, remote) { let pc = peerMap.get(remote) if (!pc) { console.log(`no peerconnection for ${remote}`) return } - let channel = pc.createDataChannel(`download-${filepath}`) const channelId = nextChannelId() - console.log(`channelId ${channelId}`) - channels.set(channelId, channel) - peer_channel.get(remote).push(channelId) - channel.binaryType = 'arraybuffer' - channel.onclose = () => { - console.log(`remote ${remote} channel ${channel.label} close`) - channels.delete(channelId) - } - const task = { filepath: filepath, size: size, received: 0, channelId: channelId, timeout: null } - // tasks.push(task) + let channel = pc.createDataChannel(`download-${filepath}`) + let task = { + filepath: filepath, + chunkSize: 0, + lowWaterMark: 0, + highWaterMark: 0, + bufferedAmount: 0, + size: size, + received: 0, + send: 0, + buffered: [], + channelId: channelId, + pause: false + } + tasks.set(channelId, task) + channel.onopen = () => { + task.chunkSize = Math.min(pc.sctp.maxMessageSize, MAX_CHUNK_SIZE) + task.lowWaterMark = task.chunkSize + task.highWaterMark = Math.max(task.chunkSize * 8, 1048576) + console.log('Send buffer low water threshold: ', task.lowWaterMark) + console.log('Send buffer high water threshold: ', task.highWaterMark) + channel.bufferedAmountLowThreshold = task.lowWaterMark + channel.onbufferedamountlow = (e) => { + console.log('BufferedAmountLow event:', e); + if (task.pause) { + task.pause = false + sendData(channel, task) + } + } + console.log(`channelId ${channelId}`) + channels.set(channelId, channel) + peer_channel.get(remote).push(channelId) + channel.binaryType = 'arraybuffer' + channel.onclose = () => { + console.log(`remote ${remote} channel ${channel.label} close`) + channels.delete(channelId) + } + } + window.electron.send('download', task) } -window.electron.on('sendData', (task, data) => { - const channel = channels.get(task.channelId) - if (channel && channel.readyState == 'open') { - channel.send(data) - console.log(`${task.received} / ${task.size}`) - if (task.received == task.size) { - console.log('send finished') - channel.close() - } +window.electron.on('sendData', (channelId, data) => { + const channel = channels.get(channelId) + const task = tasks.get(channelId) + if (channel && task) { + task.received += data.length + task.buffered.push(data) + sendData(channel, task) } + // if (channel && task && channel.readyState == 'open') { + // channel.send(data) + // console.log(`${task.received} / ${task.size}`) + // if (task.received == task.size) { + // console.log('send finished') + // channel.close() + // } + // } }) +function sendData(channel, task) { + console.log(task.pause) + if (task.pause) { + return + } + if (channel.readyState == 'open') { + let bufferedAmount = channel.bufferedAmount; + let data = task.buffered.shift() + while (data) { + channel.send(data.buffer); + task.send += data.length; + bufferedAmount += data.length; + console.log(`${task.send} / ${task.size}`) + if (bufferedAmount >= task.highWaterMark) { + // if (channel.bufferedAmount < task.lowWaterMark) { + // task.pause = true; + // // task.timeout = setTimeout(() => sendData(channel, task), 0); + // } + task.pause = true; + console.log(`Paused sending, buffered amount: ${task.bufferedAmount} (announced: ${channel.bufferedAmount})`); + break; + } + data = task.buffered.shift() + } + } + if (task.send == task.size) { + console.log('send finished') + channel.close() + } +} + window.electron.on('readFileErr', (task, err) => { console.log(err) const channel = channels.get(task.channelId) @@ -174,9 +242,9 @@ function makeid(length) { return result } -var nextChannelId = function() { - let channelId = 0 - return function() { +var nextChannelId = function () { + let channelId = 0 + return function () { channelId++ return channelId }