From f1ae6aaa68206c53698f7db43a868c5aefac7cfb Mon Sep 17 00:00:00 2001 From: Samuel Huebl Date: Mon, 7 Nov 2022 15:39:57 +0100 Subject: [PATCH] Feature: Enable Fragmentation for Text Frames in WebSocket Gateway #126 https://github.com/ASNeG/OpcUaWebServer/issues/126 --- .../WebSocket/WebSocketChannel.h | 2 +- .../WebSocket/WebSocketMessageContext.cpp | 123 ++++++++++++++++ .../WebSocket/WebSocketMessageContext.h | 62 ++++++++ .../WebSocket/WebSocketServerBase.cpp | 135 ++++++++++++------ .../WebSocket/WebSocketServerBase.h | 15 +- 5 files changed, 289 insertions(+), 48 deletions(-) create mode 100644 src/OpcUaWebServer/WebSocket/WebSocketMessageContext.cpp create mode 100644 src/OpcUaWebServer/WebSocket/WebSocketMessageContext.h diff --git a/src/OpcUaWebServer/WebSocket/WebSocketChannel.h b/src/OpcUaWebServer/WebSocket/WebSocketChannel.h index a65a764..403feaa 100644 --- a/src/OpcUaWebServer/WebSocket/WebSocketChannel.h +++ b/src/OpcUaWebServer/WebSocket/WebSocketChannel.h @@ -51,7 +51,7 @@ namespace OpcUaWebServer bool timeout_; uint32_t id_; - uint8_t opcode_; + uint8_t opcode_ { 0x01 }; // OP_TEXT_FRAME boost::asio::streambuf recvBuffer_; boost::asio::streambuf sendBuffer_; WebSocketRequest webSocketRequest_; diff --git a/src/OpcUaWebServer/WebSocket/WebSocketMessageContext.cpp b/src/OpcUaWebServer/WebSocket/WebSocketMessageContext.cpp new file mode 100644 index 0000000..db045c0 --- /dev/null +++ b/src/OpcUaWebServer/WebSocket/WebSocketMessageContext.cpp @@ -0,0 +1,123 @@ +/* + Copyright 2015-2022 ASNeG (info@asneg.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Samuel Huebl + + */ + +#include + +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaWebServer/WebSocket/WebSocketMessageContext.h" + +using namespace OpcUaStackCore; + +namespace OpcUaWebServer +{ + + void + WebSocketMessageContext::setHeader(bool fin, uint8_t opcode, bool mask, uint32_t length) + { + fin_ = fin; + opcode_ = opcode; + mask_ = mask; + length_ = length; + + Log(Info, "WebSocketServer set new message header") + .parameter("fin", fin_) + .parameter("opcode", opcode_) + .parameter("mask", mask_) + .parameter("length", length_); + } + + bool + WebSocketMessageContext::fin(void) + { + return fin_; + } + + uint8_t + WebSocketMessageContext::opcode(void) + { + return opcode_; + } + + bool + WebSocketMessageContext::mask(void) + { + return mask_; + } + + uint32_t + WebSocketMessageContext::length(void) + { + return length_; + } + + void + WebSocketMessageContext::setMessageFragment(WebSocketMessage::SPtr message) + { + Log(Info, "WebSocketServer add container for message fragmentation."); + + if (webSocketMessageFragmentation_ != nullptr) { + webSocketMessageFragmentation_.reset(); + webSocketMessageFragmentation_ = nullptr; + } + + webSocketMessageFragmentation_ = message; + } + + WebSocketMessage::SPtr + WebSocketMessageContext::getMessageFragments(void) + { + return webSocketMessageFragmentation_; + } + + void + WebSocketMessageContext::clear(void) + { + Log(Info, "WebSocketServer clears message context."); + + fin_ = true; + opcode_ = 1; + mask_ = true; + length_ = 0; + + if (webSocketMessageFragmentation_ != nullptr) { + webSocketMessageFragmentation_.reset(); + webSocketMessageFragmentation_ = nullptr; + } + } + + + WebSocketMessageContext::EnumFrameType + WebSocketMessageContext::getFrameType(void) + { + // https://www.rfc-editor.org/rfc/rfc6455#section-5.4 + // opcode == 0 && fin == true => LastFrame + // opcode == 0 && fin == false => Frame + // opcode != 0 && fin == false => FirstFrame + // opcode != 0 && fin == true => SingleFrame + + if (opcode_ == 0x00 /*OP_CONTINUATION_FRAME*/) { + if (fin_) return EnumFrameType::FrameType_LastFrame; + else return EnumFrameType::FrameType_Frame; + } + + if (!fin_) return EnumFrameType::FrameType_FirstFrame; + + return EnumFrameType::FrameType_SingleFrame; + } + +} diff --git a/src/OpcUaWebServer/WebSocket/WebSocketMessageContext.h b/src/OpcUaWebServer/WebSocket/WebSocketMessageContext.h new file mode 100644 index 0000000..4e9e854 --- /dev/null +++ b/src/OpcUaWebServer/WebSocket/WebSocketMessageContext.h @@ -0,0 +1,62 @@ +/* + Copyright 2015-2022 ASNeG (info@asneg.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Samuel Huebl + */ + +#include "OpcUaStackCore/Base/os.h" +#include "OpcUaWebServer/WebSocket/WebSocketMessage.h" + +namespace OpcUaWebServer +{ + + class DLLEXPORT WebSocketMessageContext + { + public: + typedef enum { + FrameType_Unknown = 0, + FrameType_SingleFrame = 1, + FrameType_Frame = 2, + FrameType_FirstFrame = 3, + FrameType_LastFrame = 4 + } EnumFrameType; + + typedef boost::shared_ptr SPtr; + + WebSocketMessageContext(void) = default; + virtual ~WebSocketMessageContext(void) = default; + + void setHeader(bool fin, uint8_t opcode, bool mask, uint32_t length); + bool fin(void); + uint8_t opcode(void); + bool mask(void); + uint32_t length(void); + + void setMessageFragment(WebSocketMessage::SPtr message); + WebSocketMessage::SPtr getMessageFragments(void); + + void clear(void); + + EnumFrameType getFrameType(void); + + private: + bool fin_ { true }; + uint8_t opcode_ { 0x1 }; + bool mask_ { true }; + uint32_t length_ { 0x0 }; + + WebSocketMessage::SPtr webSocketMessageFragmentation_ { nullptr }; + }; + +} diff --git a/src/OpcUaWebServer/WebSocket/WebSocketServerBase.cpp b/src/OpcUaWebServer/WebSocket/WebSocketServerBase.cpp index 1019c6f..ff59a7f 100644 --- a/src/OpcUaWebServer/WebSocket/WebSocketServerBase.cpp +++ b/src/OpcUaWebServer/WebSocket/WebSocketServerBase.cpp @@ -503,7 +503,9 @@ namespace OpcUaWebServer // ------------------------------------------------------------------------ // ------------------------------------------------------------------------ void - WebSocketServerBase::receiveMessage(WebSocketChannel* webSocketChannel) + WebSocketServerBase::receiveMessage( + WebSocketChannel* webSocketChannel, + WebSocketMessageContext::SPtr webSocketMessageContext) { // start request timer webSocketChannel->slotTimerElement_->expireFromNow(webSocketConfig_->idleTimeout()); @@ -518,8 +520,8 @@ namespace OpcUaWebServer webSocketConfig_->strand(), webSocketChannel->recvBuffer_, 2, - [this, webSocketChannel](const boost::system::error_code& error, std::size_t bytes_transfered) { - handleReceiveMessageHeader(error, bytes_transfered, webSocketChannel); + [this, webSocketChannel, webSocketMessageContext](const boost::system::error_code& error, std::size_t bytes_transfered) { + handleReceiveMessageHeader(error, bytes_transfered, webSocketChannel, webSocketMessageContext); } ); } @@ -528,13 +530,14 @@ namespace OpcUaWebServer WebSocketServerBase::handleReceiveMessageHeader( const boost::system::error_code& error, std::size_t bytes_transfered, - WebSocketChannel* webSocketChannel + WebSocketChannel* webSocketChannel, + WebSocketMessageContext::SPtr webSocketMessageContext ) { webSocketChannel->asyncRead_ = false; if (webSocketChannel->timeout_ || error || webSocketChannel->shutdown_) { - Log(Debug, "WebSocketServer handle receive message header timeout, error or shutdown"); + Log(Error, "WebSocketServer handle receive message header timeout, error or shutdown"); closeWebSocketChannel(webSocketChannel); return; } @@ -543,7 +546,7 @@ namespace OpcUaWebServer webSocketConfig_->ioThread()->slotTimer()->stop(webSocketChannel->slotTimerElement_); if (error) { - Log(Debug, "WebSocketServer receive message header error; close channel") + Log(Error, "WebSocketServer receive message header error; close channel") .parameter("Address", webSocketChannel->partner_.address().to_string()) .parameter("Port", webSocketChannel->partner_.port()) .parameter("ChannelId", webSocketChannel->id_); @@ -561,15 +564,10 @@ namespace OpcUaWebServer bool mask = (headerBytes[1] & 0x80) == 0x80; // true - mask uint32_t length = headerBytes[1] & 0x7f; - if (webSocketChannel->opcode_ == OP_CONTINUATION_FRAME) { - Log(Debug, "WebSocketServer do not support continuation frame messages; close channel") - .parameter("Address", webSocketChannel->partner_.address().to_string()) - .parameter("Port", webSocketChannel->partner_.port()) - .parameter("ChannelId", webSocketChannel->id_); - - closeWebSocketChannel(webSocketChannel); - return; + if (webSocketMessageContext == nullptr) { + webSocketMessageContext = boost::make_shared(); } + webSocketMessageContext->setHeader(fin, webSocketChannel->opcode_, mask, length); if (webSocketChannel->opcode_ == OP_BINARY_FRAME) { Log(Error, "WebSocketServer do not support binary frame messages; close channel") @@ -582,7 +580,7 @@ namespace OpcUaWebServer } if (webSocketChannel->opcode_ == OP_CLOSE_FRAME) { - Log(Debug, "WebSocketServer receive close frame messages; close channel") + Log(Error, "WebSocketServer receive close frame messages; close channel") .parameter("Address", webSocketChannel->partner_.address().to_string()) .parameter("Port", webSocketChannel->partner_.port()) .parameter("ChannelId", webSocketChannel->id_); @@ -591,11 +589,14 @@ namespace OpcUaWebServer return; } - if (webSocketChannel->opcode_ != OP_TEXT_FRAME && webSocketChannel->opcode_ != OP_PING_FRAME) { - Log(Debug, "WebSocketServer do not support continuation text messages; close channel") + if (webSocketChannel->opcode_ != OP_TEXT_FRAME && + webSocketChannel->opcode_ != OP_PING_FRAME && + webSocketChannel->opcode_ != OP_CONTINUATION_FRAME) { + Log(Error, "WebSocketServer do not support opcode; close channel") .parameter("Address", webSocketChannel->partner_.address().to_string()) .parameter("Port", webSocketChannel->partner_.port()) - .parameter("ChannelId", webSocketChannel->id_); + .parameter("ChannelId", webSocketChannel->id_) + .parameter("OpCode", webSocketChannel->opcode_); closeWebSocketChannel(webSocketChannel); return; @@ -615,8 +616,8 @@ namespace OpcUaWebServer webSocketConfig_->strand(), webSocketChannel->recvBuffer_, length+4, - [this, webSocketChannel](const boost::system::error_code& error, std::size_t bytes_transfered) { - handleReceiveMessageContent(error, bytes_transfered, webSocketChannel); + [this, webSocketChannel, webSocketMessageContext](const boost::system::error_code& error, std::size_t bytes_transfered) { + handleReceiveMessageContent(error, bytes_transfered, webSocketChannel, webSocketMessageContext); } ); return; @@ -636,8 +637,8 @@ namespace OpcUaWebServer webSocketConfig_->strand(), webSocketChannel->recvBuffer_, 2, - [this, webSocketChannel](const boost::system::error_code& error, std::size_t bytes_transfered) { - handleReceiveMessageLength2(error, bytes_transfered, webSocketChannel); + [this, webSocketChannel, webSocketMessageContext](const boost::system::error_code& error, std::size_t bytes_transfered) { + handleReceiveMessageLength2(error, bytes_transfered, webSocketChannel, webSocketMessageContext); } ); return; @@ -657,8 +658,8 @@ namespace OpcUaWebServer webSocketConfig_->strand(), webSocketChannel->recvBuffer_, 8, - [this, webSocketChannel](const boost::system::error_code& error, std::size_t bytes_transfered) { - handleReceiveMessageLength8(error, bytes_transfered, webSocketChannel); + [this, webSocketChannel, webSocketMessageContext](const boost::system::error_code& error, std::size_t bytes_transfered) { + handleReceiveMessageLength8(error, bytes_transfered, webSocketChannel, webSocketMessageContext); } ); return; @@ -670,13 +671,14 @@ namespace OpcUaWebServer WebSocketServerBase::handleReceiveMessageLength2( const boost::system::error_code& error, std::size_t bytes_transfered, - WebSocketChannel* webSocketChannel + WebSocketChannel* webSocketChannel, + WebSocketMessageContext::SPtr webSocketMessageContext ) { webSocketChannel->asyncRead_ = false; if (webSocketChannel->timeout_ || error || webSocketChannel->shutdown_) { - Log(Debug, "WebSocketServer handle receive message header length2 timeout, error or shutdown"); + Log(Error, "WebSocketServer handle receive message header length2 timeout, error or shutdown"); closeWebSocketChannel(webSocketChannel); return; } @@ -685,7 +687,7 @@ namespace OpcUaWebServer webSocketConfig_->ioThread()->slotTimer()->stop(webSocketChannel->slotTimerElement_); if (error) { - Log(Debug, "WebSocketServer receive message content2 error; close channel") + Log(Error, "WebSocketServer receive message content2 error; close channel") .parameter("Address", webSocketChannel->partner_.address().to_string()) .parameter("Port", webSocketChannel->partner_.port()) .parameter("ChannelId", webSocketChannel->id_); @@ -715,19 +717,23 @@ namespace OpcUaWebServer webSocketConfig_->strand(), webSocketChannel->recvBuffer_, length+4, - [this, webSocketChannel](const boost::system::error_code& error, std::size_t bytes_transfered) { - handleReceiveMessageContent(error, bytes_transfered, webSocketChannel); + [this, webSocketChannel, webSocketMessageContext](const boost::system::error_code& error, std::size_t bytes_transfered) { + handleReceiveMessageContent(error, bytes_transfered, webSocketChannel, webSocketMessageContext); } ); } void - WebSocketServerBase::handleReceiveMessageLength8(const boost::system::error_code& error, std::size_t bytes_transfered, WebSocketChannel* webSocketChannel) + WebSocketServerBase::handleReceiveMessageLength8( + const boost::system::error_code& error, + std::size_t bytes_transfered, + WebSocketChannel* webSocketChannel, + WebSocketMessageContext::SPtr webSocketMessageContext) { webSocketChannel->asyncRead_ = false; if (webSocketChannel->timeout_ || error || webSocketChannel->shutdown_) { - Log(Debug, "WebSocketServer handle receive message header length8 timeout, error or shutdown"); + Log(Error, "WebSocketServer handle receive message header length8 timeout, error or shutdown"); closeWebSocketChannel(webSocketChannel); return; } @@ -736,7 +742,7 @@ namespace OpcUaWebServer webSocketConfig_->ioThread()->slotTimer()->stop(webSocketChannel->slotTimerElement_); if (error) { - Log(Debug, "WebSocketServer receive message content2 error; close channel") + Log(Error, "WebSocketServer receive message content8 error; close channel") .parameter("Address", webSocketChannel->partner_.address().to_string()) .parameter("Port", webSocketChannel->partner_.port()) .parameter("ChannelId", webSocketChannel->id_); @@ -772,19 +778,23 @@ namespace OpcUaWebServer webSocketConfig_->strand(), webSocketChannel->recvBuffer_, length+4, - [this, webSocketChannel](const boost::system::error_code& error, std::size_t bytes_transfered) { - handleReceiveMessageContent(error, bytes_transfered, webSocketChannel); + [this, webSocketChannel, webSocketMessageContext](const boost::system::error_code& error, std::size_t bytes_transfered) { + handleReceiveMessageContent(error, bytes_transfered, webSocketChannel, webSocketMessageContext); } ); } void - WebSocketServerBase::handleReceiveMessageContent(const boost::system::error_code& error, std::size_t bytes_transfered, WebSocketChannel* webSocketChannel) + WebSocketServerBase::handleReceiveMessageContent( + const boost::system::error_code& error, + std::size_t bytes_transfered, + WebSocketChannel* webSocketChannel, + WebSocketMessageContext::SPtr webSocketMessageContext) { webSocketChannel->asyncRead_ = false; if (webSocketChannel->timeout_ || error || webSocketChannel->shutdown_) { - Log(Debug, "WebSocketServer handle receive message content timeout, error or shutdown"); + Log(Error, "WebSocketServer handle receive message content timeout, error or shutdown"); closeWebSocketChannel(webSocketChannel); return; } @@ -793,7 +803,7 @@ namespace OpcUaWebServer webSocketConfig_->ioThread()->slotTimer()->stop(webSocketChannel->slotTimerElement_); if (error) { - Log(Debug, "WebSocketServer receive message content error; close channel") + Log(Error, "WebSocketServer receive message content error; close channel") .parameter("Address", webSocketChannel->partner_.address().to_string()) .parameter("Port", webSocketChannel->partner_.port()) .parameter("ChannelId", webSocketChannel->id_); @@ -802,9 +812,45 @@ namespace OpcUaWebServer return; } - auto webSocketMessage = boost::make_shared(); - webSocketMessage->channelId_ = webSocketChannel->id_; - webSocketMessage->message_ = ""; + WebSocketMessage::SPtr webSocketMessage = nullptr; + + auto frameType = webSocketMessageContext->getFrameType(); + switch (frameType) + { + case WebSocketMessageContext::EnumFrameType::FrameType_SingleFrame: + { + webSocketMessage = boost::make_shared(); + webSocketMessage->channelId_ = webSocketChannel->id_; + webSocketMessage->message_ = ""; + break; + } + case WebSocketMessageContext::EnumFrameType::FrameType_FirstFrame: + { + webSocketMessage = boost::make_shared(); + webSocketMessage->channelId_ = webSocketChannel->id_; + webSocketMessage->message_ = ""; + webSocketMessageContext->setMessageFragment(webSocketMessage); + break; + } + case WebSocketMessageContext::EnumFrameType::FrameType_Frame: + case WebSocketMessageContext::EnumFrameType::FrameType_LastFrame: + { + webSocketMessage = webSocketMessageContext->getMessageFragments(); + break; + } + default: + break; + } + + if (webSocketMessage == nullptr) { + Log(Error, "WebSocketServer missing web socket message; close channel") + .parameter("Address", webSocketChannel->partner_.address().to_string()) + .parameter("Port", webSocketChannel->partner_.port()) + .parameter("ChannelId", webSocketChannel->id_); + + closeWebSocketChannel(webSocketChannel); + return; + } std::istream is(&webSocketChannel->recvBuffer_); @@ -847,8 +893,13 @@ namespace OpcUaWebServer return; } - if (receiveMessageCallback_) receiveMessageCallback_(webSocketMessage); - receiveMessage(webSocketChannel); + if (frameType == WebSocketMessageContext::EnumFrameType::FrameType_LastFrame || + frameType == WebSocketMessageContext::EnumFrameType::FrameType_SingleFrame) { + webSocketMessageContext->clear(); + if (receiveMessageCallback_) receiveMessageCallback_(webSocketMessage); + } + + receiveMessage(webSocketChannel, webSocketMessageContext); } void diff --git a/src/OpcUaWebServer/WebSocket/WebSocketServerBase.h b/src/OpcUaWebServer/WebSocket/WebSocketServerBase.h index 040a379..d1142c4 100644 --- a/src/OpcUaWebServer/WebSocket/WebSocketServerBase.h +++ b/src/OpcUaWebServer/WebSocket/WebSocketServerBase.h @@ -21,6 +21,7 @@ #include "OpcUaWebServer/WebSocket/WebSocketConfig.h" #include "OpcUaWebServer/WebSocket/WebSocketChannel.h" #include "OpcUaWebServer/WebSocket/WebSocketMessage.h" +#include "OpcUaWebServer/WebSocket/WebSocketMessageContext.h" namespace OpcUaWebServer { @@ -103,11 +104,15 @@ namespace OpcUaWebServer // // handle receive message // - void receiveMessage(WebSocketChannel* webSocketChannel); - void handleReceiveMessageHeader(const boost::system::error_code& error, std::size_t bytes_transfered, WebSocketChannel* webSocketChannel); - void handleReceiveMessageLength2(const boost::system::error_code& error, std::size_t bytes_transfered, WebSocketChannel* webSocketChannel); - void handleReceiveMessageLength8(const boost::system::error_code& error, std::size_t bytes_transfered, WebSocketChannel* webSocketChannel); - void handleReceiveMessageContent(const boost::system::error_code& error, std::size_t bytes_transfered, WebSocketChannel* webSocketChannel); + void receiveMessage(WebSocketChannel* webSocketChannel, WebSocketMessageContext::SPtr webSocketMessageContext = nullptr); + void handleReceiveMessageHeader(const boost::system::error_code& error, std::size_t bytes_transfered + , WebSocketChannel* webSocketChannel, WebSocketMessageContext::SPtr webSocketMessageContext); + void handleReceiveMessageLength2(const boost::system::error_code& error, std::size_t bytes_transfered + , WebSocketChannel* webSocketChannel, WebSocketMessageContext::SPtr webSocketMessageContext); + void handleReceiveMessageLength8(const boost::system::error_code& error, std::size_t bytes_transfered + , WebSocketChannel* webSocketChannel, WebSocketMessageContext::SPtr webSocketMessageContext); + void handleReceiveMessageContent(const boost::system::error_code& error, std::size_t bytes_transfered, + WebSocketChannel* webSocketChannel, WebSocketMessageContext::SPtr webSocketMessageContext); void idleTimeoutWebSocketChannel(WebSocketChannel* webSocketChannel, const std::string& location);