Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interleaved Message Handling and Frame Size handling in Decrypt #14

Open
wants to merge 5 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/f1x/aasdk/Messenger/Cryptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Cryptor: public ICryptor
void deinit() override;
bool doHandshake() override;
size_t encrypt(common::Data& output, const common::DataConstBuffer& buffer) override;
size_t decrypt(common::Data& output, const common::DataConstBuffer& buffer) override;
size_t decrypt(common::Data& output, const common::DataConstBuffer& buffer, int length) override;

common::Data readHandshakeBuffer() override;
void writeHandshakeBuffer(const common::DataConstBuffer& buffer) override;
Expand Down
1 change: 1 addition & 0 deletions include/f1x/aasdk/Messenger/FrameSize.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class FrameSize

common::Data getData() const;
size_t getSize() const;
size_t getTotalSize() const;

static size_t getSizeOf(FrameSizeType type);

Expand Down
2 changes: 1 addition & 1 deletion include/f1x/aasdk/Messenger/ICryptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ICryptor
virtual void deinit() = 0;
virtual bool doHandshake() = 0;
virtual size_t encrypt(common::Data& output, const common::DataConstBuffer& buffer) = 0;
virtual size_t decrypt(common::Data& output, const common::DataConstBuffer& buffer) = 0;
virtual size_t decrypt(common::Data& output, const common::DataConstBuffer& buffer, int length) = 0;
virtual common::Data readHandshakeBuffer() = 0;
virtual void writeHandshakeBuffer(const common::DataConstBuffer& buffer) = 0;
virtual bool isActive() const = 0;
Expand Down
1 change: 1 addition & 0 deletions include/f1x/aasdk/Messenger/IMessageInStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class IMessageInStream
virtual ~IMessageInStream() = default;

virtual void startReceive(ReceivePromise::Pointer promise) = 0;
virtual void setInterleavedHandler(ReceivePromise::Pointer promise) = 0;
};

}
Expand Down
15 changes: 13 additions & 2 deletions include/f1x/aasdk/Messenger/MessageInStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class MessageInStream: public IMessageInStream, public std::enable_shared_from_t
MessageInStream(boost::asio::io_service& ioService, transport::ITransport::Pointer transport, ICryptor::Pointer cryptor);

void startReceive(ReceivePromise::Pointer promise) override;
void setInterleavedHandler(ReceivePromise::Pointer promise) override;

private:
using std::enable_shared_from_this<MessageInStream>::shared_from_this;
Expand All @@ -48,11 +49,21 @@ class MessageInStream: public IMessageInStream, public std::enable_shared_from_t
boost::asio::io_service::strand strand_;
transport::ITransport::Pointer transport_;
ICryptor::Pointer cryptor_;
FrameType recentFrameType_;
FrameType thisFrameType_;
ReceivePromise::Pointer promise_;
ReceivePromise::Pointer interleavedPromise_;
Message::Pointer message_;
int frameSize_;

std::map<messenger::ChannelId, Message::Pointer> channel_assembly_buffers;

ChannelId currentChannelId_;
ChannelId originalMessageChannelId_;
std::map<messenger::ChannelId, Message::Pointer> messageBuffer_;


bool isInterleaved_;
bool haveOriginalChannel_;
bool isNewMessage_;
};

}
Expand Down
2 changes: 2 additions & 0 deletions include/f1x/aasdk/Messenger/Messenger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ class Messenger: public IMessenger, public std::enable_shared_from_this<Messenge
typedef std::list<std::pair<Message::Pointer, SendPromise::Pointer>> ChannelSendQueue;
void doSend();
void inStreamMessageHandler(Message::Pointer message);
void randomInStreamMessageHandler(Message::Pointer message);
void outStreamMessageHandler(ChannelSendQueue::iterator queueElement);
void rejectReceivePromiseQueue(const error::Error& e);
void rejectSendPromiseQueue(const error::Error& e);
void randomRejectReceivePromiseQueue(const error::Error& e);
void parseMessage(Message::Pointer message, ReceivePromise::Pointer promise);

boost::asio::io_service::strand receiveStrand_;
Expand Down
19 changes: 13 additions & 6 deletions src/Messenger/Cryptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <functional>
#include <f1x/aasdk/Messenger/Cryptor.hpp>
#include <f1x/aasdk/Error/Error.hpp>
#include <f1x/aasdk/Common/Log.hpp>

namespace f1x
{
Expand Down Expand Up @@ -179,18 +180,23 @@ size_t Cryptor::encrypt(common::Data& output, const common::DataConstBuffer& buf
return this->read(output);
}

size_t Cryptor::decrypt(common::Data& output, const common::DataConstBuffer& buffer)
size_t Cryptor::decrypt(common::Data& output, const common::DataConstBuffer& buffer, int frameLength)
{
int overhead = 29;
int length = frameLength - overhead;
std::lock_guard<decltype(mutex_)> lock(mutex_);

this->write(buffer);
const size_t beginOffset = output.size();
output.resize(beginOffset + 1);

size_t availableBytes = 1;
size_t totalReadSize = 0;
size_t totalReadSize = 0; // Initialise
size_t availableBytes = length;
size_t readBytes = (length - totalReadSize) > 2048 ? 2048 : length - totalReadSize; // Calculate How many Bytes to Read
output.resize(output.size() + readBytes); // Resize Output to match the bytes we want to read

// We try to be a bit more explicit here, using the frame length from the frame itself rather than just blindly reading from the SSL buffer.

while(availableBytes > 0)
while(readBytes > 0)
{
const auto& currentBuffer = common::DataBuffer(output, totalReadSize + beginOffset);
auto readSize = sslWrapper_->sslRead(ssl_, currentBuffer.data, currentBuffer.size);
Expand All @@ -202,7 +208,8 @@ size_t Cryptor::decrypt(common::Data& output, const common::DataConstBuffer& buf

totalReadSize += readSize;
availableBytes = sslWrapper_->getAvailableBytes(ssl_);
output.resize(output.size() + availableBytes);
readBytes = (length - totalReadSize) > 2048 ? 2048 : length - totalReadSize;
output.resize(output.size() + readBytes);
}

return totalReadSize;
Expand Down
1 change: 1 addition & 0 deletions src/Messenger/FrameSize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ FrameSize::FrameSize(const common::DataConstBuffer& buffer)
{
frameSizeType_ = FrameSizeType::SHORT;
frameSize_ = boost::endian::big_to_native(reinterpret_cast<const uint16_t&>(buffer.cdata[0]));
totalSize_ = frameSize_;
}

if(buffer.size >= 6)
Expand Down
112 changes: 79 additions & 33 deletions src/Messenger/MessageInStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ MessageInStream::MessageInStream(boost::asio::io_service& ioService, transport::
, transport_(std::move(transport))
, cryptor_(std::move(cryptor))
{

isNewMessage_ = true;
}

void MessageInStream::startReceive(ReceivePromise::Pointer promise)
Expand All @@ -42,7 +42,7 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise)
if(promise_ == nullptr)
{
promise_ = std::move(promise);

isNewMessage_ = true;
auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
transportPromise->then(
[this, self = this->shared_from_this()](common::Data data) mutable {
Expand All @@ -62,36 +62,68 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise)
});
}

void MessageInStream::setInterleavedHandler(ReceivePromise::Pointer promise)
{
interleavedPromise_ = std::move(promise);
}

void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer)
{
FrameHeader frameHeader(buffer);
if (buffer.cdata[0] != 3) {
AASDK_LOG(debug) << "Message from channel " << std::to_string(buffer.cdata[0]);

isInterleaved_ = false;

// Store the ChannelId if this is a new message.
if (isNewMessage_) {
originalMessageChannelId_ = frameHeader.getChannelId();
isNewMessage_ = false;
}

// If Frame Channel does not match Message Channel, store Existing Message in Buffer.
if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId())
{
// we have interleaved channels, stop working on the old one and store it for later; Switch to the new one
channel_assembly_buffers[message_->getChannelId()] = message_;
AASDK_LOG(debug) << "[MessageInStream] ChannelId mismatch -- Frame " << channelIdToString(frameHeader.getChannelId()) << " -- Message -- " << channelIdToString(message_->getChannelId());
isInterleaved_ = true;

messageBuffer_[message_->getChannelId()] = message_;
message_ = nullptr;
// message_.reset();
// promise_->reject(error::Error(error::ErrorCode::MESSENGER_INTERTWINED_CHANNELS));
// promise_.reset();
// return;
}
auto prevBuffer = channel_assembly_buffers.find(frameHeader.getChannelId());
if(prevBuffer != channel_assembly_buffers.end()){ // is there previous data in our map?
if(frameHeader.getType()!=FrameType::FIRST) //only use the data if we're not on a new frame, otherwise disregard
message_ = prevBuffer->second;
else{
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());

if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) {
// If it's a First or Bulk Frame, we need to start a new message.
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
} else {
// This is a Middle or Last Frame. We must find an existing message.
auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId());
if (bufferedMessage != messageBuffer_.end()) {
/*
* If the original channel does not match, then this is an interleaved frame.
* It is no good just to match the channelid on the message we recovered from the queue, we need
* to go back to the original message channel as even this frame may be ANOTHER interleaved
* message frame within an existing interleaved message.
* Our promise must resolve only the channel id we've been tasked to work on.
* Everything else is incidental.
*/

// We can restore the original message, and and if the current frame matches the chnnale id
// then we're not interleaved anymore.
if (originalMessageChannelId_ == frameHeader.getChannelId()) {
isInterleaved_ = false;
AASDK_LOG(debug) << "[MessageInStream] Restored Message from Buffer";
}

message_ = bufferedMessage->second;
messageBuffer_.erase(bufferedMessage);
}
channel_assembly_buffers.erase(prevBuffer); // get rid of the previously stored data because it's now our working data.
}
else if(message_ == nullptr){

// If we have nothing at this point, start a new message.
if(message_ == nullptr)
{
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
}
recentFrameType_ = frameHeader.getType();

thisFrameType_ = frameHeader.getType();
const size_t frameSize = FrameSize::getSizeOf(frameHeader.getType() == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT);

auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
Expand Down Expand Up @@ -122,6 +154,7 @@ void MessageInStream::receiveFrameSizeHandler(const common::DataConstBuffer& buf
});

FrameSize frameSize(buffer);
frameSize_ = (int) frameSize.getSize();
transport_->receive(frameSize.getSize(), std::move(transportPromise));
}

Expand All @@ -131,7 +164,7 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer&
{
try
{
cryptor_->decrypt(message_->getPayload(), buffer);
cryptor_->decrypt(message_->getPayload(), buffer, frameSize_);
}
catch(const error::Error& e)
{
Expand All @@ -146,23 +179,36 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer&
message_->insertPayload(buffer);
}

if(recentFrameType_ == FrameType::BULK || recentFrameType_ == FrameType::LAST)
bool isResolved = false;

// If this is the LAST frame or a BULK frame...
if(thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST)
{
promise_->resolve(std::move(message_));
promise_.reset();
// If this isn't an interleaved frame, then we can resolve the promise
if (!isInterleaved_) {
AASDK_LOG(debug) << "[MessageInStream] Resolving message.";
isResolved = true;
promise_->resolve(std::move(message_));
promise_.reset();
} else {
// Otherwise resolve through our random promise
AASDK_LOG(debug) << "[MessageInStream] Resolving interleaved frame";
interleavedPromise_->resolve(std::move(message_));
}
}
else
{

// If the main promise isn't resolved, then carry on retrieving frame headers.
if (!isResolved) {
auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
transportPromise->then(
[this, self = this->shared_from_this()](common::Data data) mutable {
this->receiveFrameHeaderHandler(common::DataConstBuffer(data));
},
[this, self = this->shared_from_this()](const error::Error& e) mutable {
message_.reset();
promise_->reject(e);
promise_.reset();
});
[this, self = this->shared_from_this()](common::Data data) mutable {
this->receiveFrameHeaderHandler(common::DataConstBuffer(data));
},
[this, self = this->shared_from_this()](const error::Error& e) mutable {
message_.reset();
promise_->reject(e);
promise_.reset();
});

transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise));
}
Expand Down
29 changes: 27 additions & 2 deletions src/Messenger/Messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom
receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable {
if(!channelReceiveMessageQueue_.empty(channelId))
{
this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise);
//TODO: Use this to check on the Frame/Channel Id?
//this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise);
promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId)));
//TODO: Problem with resolving like this, is that if we resolve an interleave frame, our resolve goes to the wrong channel - eg Audio on VideoServiceChannel
}
else
{
Expand All @@ -54,6 +57,11 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom
inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->startReceive(std::move(inStreamPromise));

auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_);
randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise));
}
}
});
Expand All @@ -80,7 +88,8 @@ void Messenger::inStreamMessageHandler(Message::Pointer message)

if(channelReceivePromiseQueue_.isPending(channelId))
{
this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId));
//this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId));
channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message));
}
else
{
Expand All @@ -96,6 +105,17 @@ void Messenger::inStreamMessageHandler(Message::Pointer message)
}
}

void Messenger::randomInStreamMessageHandler(Message::Pointer message)
{
//AASDK_LOG(debug) << "Interleaved Message Pushed to Queue";;
channelReceiveMessageQueue_.push(std::move(message));

auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_);
randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise));
}

void Messenger::parseMessage(Message::Pointer message, ReceivePromise::Pointer promise) {
if (message->getChannelId() != ChannelId::VIDEO) {
//AASDK_LOG(debug) << channelIdToString(message->getChannelId()) << " " << MessageId(message->getPayload());
Expand Down Expand Up @@ -132,6 +152,11 @@ void Messenger::rejectReceivePromiseQueue(const error::Error& e)
}
}

void Messenger::randomRejectReceivePromiseQueue(const error::Error& e)
{
// Dummy
}

void Messenger::rejectSendPromiseQueue(const error::Error& e)
{
while(!channelSendPromiseQueue_.empty())
Expand Down
6 changes: 6 additions & 0 deletions src/Transport/SSLWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <openssl/ssl.h>
#include <openssl/conf.h>
#include <f1x/aasdk/Transport/SSLWrapper.hpp>
#include <f1x/aasdk/Common/Log.hpp>

namespace f1x
{
Expand Down Expand Up @@ -49,6 +50,8 @@ SSLWrapper::~SSLWrapper()
ERR_remove_state(0);
#endif
ERR_free_strings();
ERR_load_crypto_strings();
ERR_load_ERR_strings();
}

X509* SSLWrapper::readCertificate(const std::string& certificate)
Expand Down Expand Up @@ -188,6 +191,9 @@ int SSLWrapper::sslWrite(SSL *ssl, const void *buf, int num)

int SSLWrapper::getError(SSL* ssl, int returnCode)
{
while (auto err = ERR_get_error()) {
AASDK_LOG(error) << "[SSLWrapper] SSL Error " << ERR_error_string(err, NULL);
}
return SSL_get_error(ssl, returnCode);
}

Expand Down