Skip to content

Commit

Permalink
Merge pull request #1549 from TerraMA2/b4.0
Browse files Browse the repository at this point in the history
fix tcp messages
  • Loading branch information
janosimas authored Apr 24, 2018
2 parents fda6592 + 94e0294 commit 0cf14bf
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 65 deletions.
38 changes: 33 additions & 5 deletions src/terrama2/core/network/TcpManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,13 @@ void terrama2::core::TcpManager::sendSignalSlot(QTcpSocket* tcpSocket, TcpSignal
{
TERRAMA2_LOG_DEBUG() << QObject::tr("Sending signal information...");

const std::string bom = "(BOM)";
const std::string eom = "(EOM)";

QByteArray bytearray;
QDataStream out(&bytearray, QIODevice::WriteOnly);
///////////////////////////////////////////
// Prepare message
QByteArray message;
QDataStream out(&message, QIODevice::WriteOnly);

out << static_cast<uint32_t>(0);
out << static_cast<uint32_t>(signal);
Expand All @@ -389,12 +393,36 @@ void terrama2::core::TcpManager::sendSignalSlot(QTcpSocket* tcpSocket, TcpSignal
out << answer.toJson(QJsonDocument::Compact);
}

bytearray.remove(8, 4);//Remove QByteArray header
message.remove(8, 4);
out.device()->seek(0);
out << static_cast<uint32_t>(bytearray.size() - sizeof(uint32_t));
out << static_cast<uint32_t>(message.size() - sizeof(uint32_t));
///////////////////////////////////////////

// for(auto it = message.cbegin(); it != message.cend(); ++it)
// std::cout << *it;
// std::cout << std::endl;

///////////////////////////////////////////
// add header
QByteArray buffer;
QDataStream bufferStream(&buffer, QIODevice::WriteOnly);
bufferStream << bom.c_str() << message << eom.c_str();

// for(auto it = buffer.cbegin(); it != buffer.cend(); ++it)
// std::cout << *it;
// std::cout << std::endl;

buffer.remove(buffer.size() - eom.size() - 5, 4);
buffer.remove(bom.size()+5, 4);
buffer.remove(0,4);
///////////////////////////////////////////

for(auto it = buffer.cbegin(); it != buffer.cend(); ++it)
std::cout << *it;
std::cout << std::endl;

// wait while sending message
qint64 written = tcpSocket->write(bytearray);
qint64 written = tcpSocket->write(buffer);
if(written == -1 || !tcpSocket->waitForBytesWritten(30000))
TERRAMA2_LOG_WARNING() << QObject::tr("Unable to write to server.");
}
Expand Down
152 changes: 92 additions & 60 deletions webapp/core/Service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ var EventEmitter = require('events').EventEmitter;
var NodeUtils = require('util');
var Signals = require('./Signals');

let beginOfMessage = "(BOM)\0";
let endOfMessage = "(EOM)\0";

/**
This method parses the bytearray received.
Expand Down Expand Up @@ -99,90 +101,115 @@ var Service = module.exports = function(serviceInstance) {
let size = 0;
for (var i = 0; i < arguments.length; i++) {
if(arguments[i]) {
size += arguments[i].buffer.byteLength;
size += arguments[i].length;
}
}

let tmp = new Buffer(size);
let offset = 0;
for (var i = 0; i < arguments.length; i++) {
if(arguments[i]) {
tmp.set(new Buffer.from(arguments[i].buffer), offset);
offset+=arguments[i].buffer.byteLength;
tmp.set(new Buffer.from(arguments[i]), offset);
offset+=arguments[i].length;
}
}

return tmp;
}

let tempBuffer = undefined;
let extraData = undefined;
self.socket.on('data', function(byteArray) {
self.answered = true;
var formatMessage = "Socket %s received %s";
logger.debug(Utils.format(formatMessage, self.service.name, byteArray));
logger.debug(Utils.format(formatMessage, self.service.name, byteArray.toString()));

try {
// append and check if the complete message has arrived
tempBuffer = _createBufferFrom(tempBuffer, byteArray);
const messageSizeReceived = tempBuffer.readUInt32BE(0);
if(tempBuffer.length < (messageSizeReceived + 4)) {
// if we don't have the complete message
// wait for the rest
return;
}

let extraData = undefined;
if(tempBuffer.length > (messageSizeReceived + 4)) {
// append and check if the complete message has arrived
tempBuffer = _createBufferFrom(tempBuffer, byteArray);

let completeMessage = true;
while(tempBuffer && completeMessage) {
try {
let bom = tempBuffer.toString('utf-8', 0, beginOfMessage.length);
while(tempBuffer.length > beginOfMessage.length && bom !== beginOfMessage) {
tempBuffer = new Buffer.from(tempBuffer.slice(1));
bom = tempBuffer.toString('utf-8', 0, beginOfMessage.length);
}

if(bom !== beginOfMessage) {
tempBuffer = undefined;
throw new Error("Invalid message (BOM)");
}

const messageSizeReceived = tempBuffer.readUInt32BE(beginOfMessage.length);
const headerSize = beginOfMessage.length + endOfMessage.length;
const expectedLength = messageSizeReceived + 4;
if(tempBuffer.length < expectedLength+headerSize) {
// if we don't have the complete message
// wait for the rest
completeMessage = false;
return;
}

const eom = tempBuffer.toString('ascii', expectedLength + beginOfMessage.length, expectedLength+headerSize);
if(eom !== endOfMessage) {
tempBuffer = undefined;
throw new Error("Invalid message (EOM)");
}

// hold extra data for next message
extraData = new Buffer.from(tempBuffer.buffer.slice(messageSizeReceived + 5));
if(tempBuffer.length > expectedLength+headerSize) {
extraData = new Buffer.from(tempBuffer.slice(expectedLength + headerSize));
} else {
extraData = undefined;
}

// free any extra byte from the message
tempBuffer = new Buffer.from(tempBuffer.buffer, 0, (messageSizeReceived + 4));
}

const parsed = parseByteArray(tempBuffer);

// we got the message, empty buffer.
tempBuffer = extraData;

switch(parsed.signal) {
case Signals.LOG_SIGNAL:
self.emit("log", parsed.message);
break;
case Signals.STATUS_SIGNAL:
self.emit("status", parsed.message);
break;
case Signals.TERMINATE_SERVICE_SIGNAL:
self.emit("stop", parsed);
break;
case Signals.PROCESS_FINISHED_SIGNAL:
/**
* Used to notify when a process has been finished. C++ service emits a processed data to save
* and delivery to user
*
* @event Service#processFinished
* @type {Object}
*/
self.emit("processFinished", parsed.message);
break;
case Signals.VALIDATE_PROCESS_SIGNAL:
self.emit("validateProcess", parsed.message);
break;
}
tempBuffer = new Buffer.from(tempBuffer.slice(beginOfMessage.length, expectedLength+beginOfMessage.length));

const parsed = parseByteArray(tempBuffer);

// we got the message, empty buffer.
tempBuffer = extraData;

switch(parsed.signal) {
case Signals.LOG_SIGNAL:
self.emit("log", parsed.message);
break;
case Signals.STATUS_SIGNAL:
self.emit("status", parsed.message);
break;
case Signals.TERMINATE_SERVICE_SIGNAL:
self.emit("stop", parsed);
break;
case Signals.PROCESS_FINISHED_SIGNAL:
/**
* Used to notify when a process has been finished. C++ service emits a processed data to save
* and delivery to user
*
* @event Service#processFinished
* @type {Object}
*/
self.emit("processFinished", parsed.message);
break;
case Signals.VALIDATE_PROCESS_SIGNAL:
self.emit("validateProcess", parsed.message);
break;
}

if (callbackSuccess) {
callbackSuccess(parsed);
}
} catch (e) {
// we got an error, empty buffer.
tempBuffer = undefined;
logger.debug(Utils.format("Error parsing bytearray received from %s. %s", self.service.name, e.toString()));
self.emit("serviceError", e);
if (callbackError) {
callbackError(e);
if (callbackSuccess) {
callbackSuccess(parsed);
}
} catch (e) {
// we got an error, empty buffer.
tempBuffer = undefined;
logger.debug(Utils.format("Error parsing bytearray received from %s. %s", self.service.name, e.toString()));
self.emit("serviceError", e);
if (callbackError) {
callbackError(e);
}
}
}

});

self.socket.on('drain', function() {
Expand Down Expand Up @@ -254,6 +281,11 @@ var Service = module.exports = function(serviceInstance) {
return;
}

// let tmp = new Buffer(buffer.length+beginOfMessage.length+endOfMessage.length);
// tmp.set(new Buffer.from(beginOfMessage), 0);
// tmp.set(new Buffer.from(buffer), beginOfMessage.length);
// tmp.set(new Buffer.from(endOfMessage), buffer.length+beginOfMessage.length);

self.writeData(buffer, null, null, function() {
logger.debug("Sent all");
});
Expand Down

0 comments on commit 0cf14bf

Please sign in to comment.