-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathMessage.js
171 lines (146 loc) · 4.9 KB
/
Message.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
var crc32 = require('crc32');
var buffercrc32 = require('buffer-crc32');
var binary = require('binary');
var BufferMaker = require('buffermaker');
var _ = require('underscore');
var BASIC_MESSAGE_HEADER_SIZE = 5;
var MESSAGE_SIZE_BYTE_LENGTH = 4;
var V0_7_MESSAGE_HEADER_SIZE = 10;
var COMPRESSION_DEFAULT = 0;
var assert = require('assert');
/*
A message. The format of an N byte message is the following:
1 byte "magic" identifier to allow format changes
4 byte CRC32 of the payload
N - 5 byte payload
*/
function Message(payload, checksum, magic, compression){
// note: payload should be a Buffer
var _compression = COMPRESSION_DEFAULT;
// compression:
// 0 = none; 1 = gzip; 2 = snappy;
// Only exists at all if MAGIC == 1
if (!!magic && magic == 1){
if (_.include([0,1,2], compression)){
_compression = compression;
} else {
throw "InvalidCompressionType: " + compression;
}
}
this.compression = _compression;
this.MAGIC_IDENTIFIER_DEFAULT = 1;
if (magic === undefined){
this.magic = this.MAGIC_IDENTIFIER_DEFAULT;
} else {
this.magic = magic;
}
if (this.magic !== 0 && this.magic !== 1){
throw "Unsupported Kafka message version: " + this.magic;
}
if (!payload && payload !== ''){
throw "payload is a required argument";
}
this.payload = payload;
this.checksum = checksum || this.calculateChecksum();
}
Message.prototype.byteLength = function(){
if (!this.bytesLengthVal){
this.toBytes();
}
return this.bytesLengthVal;
};
Message.prototype.calculateChecksum = function(){
//var val1 = parseInt(crc32(this.payload), 16);
var val2 = buffercrc32.unsigned(this.payload); // works for multibyte strings
return val2;
};
Message.prototype.isValid = function(){
return this.checksum == this.calculateChecksum();
};
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| LENGTH |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| MAGIC | COMPRESSION | CHECKSUM |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| CHECKSUM (cont.) | PAYLOAD /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /
/ PAYLOAD (cont.) /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
LENGTH = int32 // Length in bytes of entire message (excluding this field)
MAGIC = int8 // 0 = COMPRESSION attribute byte does not exist (v0.6 and below)
// 1 = COMPRESSION attribute byte exists (v0.7 and above)
COMPRESSION = int8 // 0 = none; 1 = gzip; 2 = snappy;
// Only exists at all if MAGIC == 1
CHECKSUM = int32 // CRC32 checksum of the PAYLOAD
PAYLOAD = Bytes[] // Message content
*/
Message.prototype.toBytes = function(){
var nonLength = new BufferMaker()
.UInt8(this.magic)
.UInt8(this.compression)
.UInt32BE(this.calculateChecksum())
.string(this.payload)
.make();
assert(Buffer.isBuffer(nonLength));
var encodedMessage = new BufferMaker()
.UInt32BE(nonLength.length)
.string(nonLength)
.make();
assert(Buffer.isBuffer(encodedMessage));
this.bytesLengthVal = encodedMessage.length;
return encodedMessage;
};
Message.fromBytes = function(buf){
// Format is:
// 32-bit unsigned Integer, network (big-endian) byte order
// 8-bit unsigned Integer
// 32-bit unsigned Integer, network (big-endian) byte order
assert(Buffer.isBuffer(buf));
this.bytesLengthVal = buf.length;
var unpacked = binary.parse(buf)
.word32bu('size')
.word8u('magic')
.vars;
if (!unpacked.size){
throw "incomplete message";
}
var size = unpacked.size;
var payloadSize;
if (unpacked.magic === 0){
payloadSize = size - 5;
unpacked = binary.parse(buf)
.word32bu('size')
.word8u('magic')
.word32bu('checksum')
.buffer('payload', payloadSize)
.vars;
} else {
// magic is assumed to be 1 here.
// if it's not, it's invalid and will throw an
// error when we construct later
payloadSize = size - 6;
unpacked = binary.parse(buf)
.word32bu('size')
.word8u('magic')
.word8u('compression')
.word32bu('checksum')
.buffer('payload', payloadSize)
.vars;
}
var magic = unpacked.magic;
var compression = unpacked.compression;
var checksum = unpacked.checksum;
var payload = unpacked.payload;
assert(Buffer.isBuffer(payload));
if (payload.length < payloadSize){
throw "incomplete message";
}
return new Message(payload, checksum, magic, compression);
};
Message.getHeaderLength = function() {
return V0_7_MESSAGE_HEADER_SIZE;
};
module.exports = Message;