-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathProduceRequest.js
76 lines (67 loc) · 2.74 KB
/
ProduceRequest.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
var BufferMaker = require('buffermaker');
var Message = require('./Message');
var Request = require('./Request');
var _ = require('underscore');
var binary = require('binary');
var assert = require('assert');
var ProduceRequest = function(topic, partition, messages){
this.topic = topic;
this.partition = partition;
this.messages = messages;
this.requestType = Request.Types.PRODUCE;
};
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ REQUEST HEADER /
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| MESSAGES_LENGTH |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
/ MESSAGES /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
MESSAGES_LENGTH = int32 // Length in bytes of the MESSAGES section
MESSAGES = Collection of MESSAGES (see above)
*/
ProduceRequest.prototype.toBytes = function(){
var messages = this.messages;
var that = this;
var messageBuffers = [];
_.each(messages, function(message){
var bytes = message.toBytes();
messageBuffers.push(bytes);
});
var messagesBuffer = Buffer.concat(messageBuffers);
assert(Buffer.isBuffer(messagesBuffer));
var messagesLengthBuffer = new BufferMaker().UInt32BE(messagesBuffer.length).make();
var body = Buffer.concat([messagesLengthBuffer, messagesBuffer]);
var req = new Request(this.topic, this.partition, Request.Types.PRODUCE, body);
return req.toBytes();
};
ProduceRequest.fromBytes = function(buffer) {
var request = Request.fromBytes(buffer);
var messages = toMessages(getMessagesSegment(request.body));
return new ProduceRequest(request.topic, request.partition, messages);
};
module.exports = ProduceRequest;
var MESSAGES_LENGTH_SIZE = 4;
var getLengthOfMessagesSegment = function(buffer) {
assert(!_.isString(binary.parse(buffer).word32bu('length').vars));
return binary.parse(buffer).word32bu('length').vars.length;
};
var toMessages = function(messagesBuffer) {
var messages = [];
while(messagesBuffer.length > 0) {
var message = Message.fromBytes(messagesBuffer);
messages.push(message);
messagesBuffer = messagesBuffer.slice(message.toBytes().length);
}
return messages;
};
var getMessagesSegment = function(body) {
var start = MESSAGES_LENGTH_SIZE;
var stop = getLengthOfMessagesSegment(body) + MESSAGES_LENGTH_SIZE;
return body.slice(start, stop);
};