Skip to content

Commit

Permalink
feat: clint端Sender实例的创建及buffer函数的编写 (#27)
Browse files Browse the repository at this point in the history

* feat: clint端Sender实例的创建及buffer函数的编写
  • Loading branch information
yuhao423 authored Jul 1, 2024
1 parent 4e6a44e commit 3da2420
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ const yuFlux = new YuFlux("http://localhost:8000");
yuFlux.on("upgrade", (res) => {
console.error(res.headers, "res.headers");
});

yuFlux.on("open", () => {
console.error("open");
yuFlux.send("yuyu", () => {
console.error("我发送了");
});
});
188 changes: 185 additions & 3 deletions src/sender.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,95 @@
const { randomFillSync } = require("crypto");

const maskBuffer = Buffer.alloc(4);
const kByteLength = Symbol("kByteLength");
const { toBuffer, _mask } = require("./utils");
const RANDOM_POOL_SIZE = 1024 * 8;
let randomPool;
let randomPoolPointer = RANDOM_POOL_SIZE;

class Sender {
constructor() {}
constructor(socket, extensions, generateMask) {
this._socket = socket;
this._extensions = extensions || {};

//提供了生成mask的函数
if (generateMask) {
this._generateMask = generateMask;
this._maskBuffer = Buffer.alloc(4);
}

this._firstFragment = true; //是不是第一个数据帧
this._compress = false; //是否压缩

//todo 扩展,压缩使用的
this._bufferedBytes = 0;
this._deflating = false;
this._dequeue = [];
}

send(data, options, cb) {
//1.fin
//2.compass
//3.binary
//4.mask

//1.流程
let opcode;
let byteLength;
let readOnly;
let offset;
//特殊处理的rsv1
let rsv1 = options.compass; //将rsv1 和是否压缩绑定,默认是不压缩的,就是rsv1为false
options.binary ? (opcode = 2) : (opcode = 1); //1是文本帧 2是二进制帧

const perMessageDeflate = this._extensions["extensionName"];

send(data, options, cb) {}
if (data === "string") {
byteLength = Buffer.byteLength(data);
//加一个readonly
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
readOnly = toBuffer.readOnly;
//转化为buffer,并且获取长度
//...
}

//是不是第一个帧,和后面的扩展有关,很难
if (this._firstFragment) {
this._firstFragment = false;
//...
this._compress = rsv1; //rsv1还是设置为0,后面再扩展
} else {
//不是第一个帧,那就是一个持续帧,持续帧opcode为0,看文档
opcode = 0;
rsv1 = 0;
}

//如果设置了options.fin,就是说这个是最后一个,需要重置this._firstFragment为true
//这里很关键啊
if (options.fin) this._firstFragment = true;

//todo 压缩的扩展,队列及dispatch
if (perMessageDeflate) {
console.error("perMessageDeflate");
} else {
this.sendFrame(
Sender.frame(data, {
[kByteLength]: byteLength, //字节长度
fin: options.fin, //是不是最后一个
generateMask: this._generateMask, //是否提供generateMask函数
mask: options.mask, //是否需要掩码
maskBuffer: this._maskBuffer, //提供了maskBuffer吗?
opcode, //数据类型
readOnly, //是否只读
rsv1: false, //写死的rsv1
}),
cb,
);
}
}

/*
* @param {(Buffer|String)} data 要进行帧处理的数据
Expand All @@ -18,13 +105,108 @@ class Sender {
*/
static frame(data, options) {
let mask;
//1. 客户端发送消息,必须掩码
let offset = 2;
let skipMasking = false;
let merge = false;
//1. 客户端发送消息,必须掩码,如果用户提供掩码,则不需要处理,否则需要掩码处理
if (options.mask) {
mask = options.maskBuffer || maskBuffer;
//提供了生成 掩码键 masking-key 的函数
if (options.generateMask) {
options.generateMask(mask);
} else {
//生成随机的buffer用来掩码
if (randomPoolPointer === RANDOM_POOL_SIZE) {
if (randomPool === undefined) {
randomPool = Buffer.alloc(RANDOM_POOL_SIZE);
}

//crypto 导入这个函数,buffer全变成随机
randomFillSync(randomPool, 0, RANDOM_POOL_SIZE);
randomPoolPointer = 0;
}

mask[0] = randomPool[randomPoolPointer++];
mask[1] = randomPool[randomPoolPointer++];
mask[2] = randomPool[randomPoolPointer++];
mask[3] = randomPool[randomPoolPointer++];

//跳过掩码,有特殊情况可以跳过掩码
skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;

//申请buffer长度为6
offset = 6;
}
}

//data的长度 - payloadLen
let dataLength;

if (typeof data === "string") {
if ((!options.mask || skipMasking) && options[kByteLength] !== undefined) {
dataLength = options[kByteLength];
} else {
data = Buffer.from(data);
dataLength = data.length;
}
} else {
dataLength = data.length;
//todo merge的判断
merge = false;
}

//确定载荷的长度
let payloadLength = dataLength;

if (payloadLength >= 65536) {
offset += 8;
payloadLength = 127;
} else if (payloadLength > 125) {
offset += 2;
payloadLength = 126;
}

//生成buffer了
const target = Buffer.allocUnsafe(offset);
//fin为true,则必须为1,采用位运算来
target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
target[1] = payloadLength;

//改变数据帧的第二个字节
if (payloadLength === 126) {
target.writeUInt16BE(dataLength, 2);
} else if (payloadLength === 127) {
target[2] = target[3] = 0;
target.writeUIntBE(dataLength, 4, 6);
}

if (!options.mask) return [target, data];

//需要mask
target[1] |= 0x80;
target[offset - 4] = mask[0];
target[offset - 3] = mask[1];
target[offset - 2] = mask[2];
target[offset - 1] = mask[3];

if (skipMasking) return [target, data];

_mask(data, mask, data, 0, dataLength);

console.error([target, data]);
return [target, data];
}

sendFrame(list, cb) {
if (list.length === 2) {
this._socket.cork();
this._socket.write(list[0]);
this._socket.write(list[1], cb);
this._socket.uncork();
} else {
this._socket.write(list[0], cb);
}
}
}

module.exports = Sender;
27 changes: 27 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
*
* @param {*} data
* @returns {Buffer} buffer
*/
function toBuffer(data) {
toBuffer.readOnly = true;

if (Buffer.isBuffer(data)) return data;

let buf;
buf = Buffer.from(data);
toBuffer.readOnly = false;

return buf;
}

function _mask(source, mask, output, offset, length) {
for (let i = 0; i < length; i++) {
output[i + offset] = source[i] ^ mask[i & 3];
}
}

module.exports = {
toBuffer,
_mask,
};
5 changes: 3 additions & 2 deletions src/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ const EventEmitter = require("events");
const { URL } = require("url");
const { randomBytes, createHash } = require("crypto");

const Sender = require("./sender");
const { protocolVersions, readyStates, GUID, emptyBuffer } = require("./constant");

class YuFlux extends EventEmitter {
constructor(address, protocols, options) {
super();
this._readyState = readyStates[0];

this._extensions = {};
if (address !== undefined) {
this._isServer = false;
this._redirect = 0;
Expand Down Expand Up @@ -41,7 +42,7 @@ class YuFlux extends EventEmitter {
//1. receiver

//2. sender

this._sender = new Sender(socket, this._extensions, option.generateMask);
//3.socket
this._socket = socket;

Expand Down
13 changes: 10 additions & 3 deletions test/websocketClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@ wsClient.on('open', function open() {
array[i] = i / 2;
}

wsClient.send('yuyu',()=>{
console.error('我发送了');
wsClient.send('yuyu',{

},()=>{
console.log('我发送了1');
});

let a = []
for(let i = 0;i<200;i++){
a.push(3)
}
const longString = a.join('')
setTimeout(()=>{
wsClient.send('yuyu1',()=>{
wsClient.send(longString,()=>{
console.error('我发送了2');
});
},2000)
Expand Down

0 comments on commit 3da2420

Please sign in to comment.