Skip to content

Commit d8da8bf

Browse files
extract common websocket functionality from connection handlers #64 (#72)
extract common code that handles poco websocket
1 parent f81da93 commit d8da8bf

23 files changed

+657
-545
lines changed

goldenmaster/apigear/olink/CMakeLists.txt

+2-6
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,7 @@ set (SOURCES
3838
private/olinkwebsockethandler.cpp
3939
private/requesthandlerfactory.cpp
4040
private/connectionstorage.cpp
41-
private/olinkremote.h
42-
private/olinkwebsockethandler.h
43-
private/requesthandlerfactory.h
44-
private/connectionstorage.h
45-
private/iconnectionstorage.h
41+
private/socketwrapper.cpp
4642
logger/logger.cpp
4743
)
4844
add_library(poco-olink SHARED ${SOURCES})
@@ -64,7 +60,7 @@ install(TARGETS poco-olink
6460
LIBRARY DESTINATION lib COMPONENT Runtime
6561
ARCHIVE DESTINATION lib COMPONENT Development)
6662
# install includes
67-
FILE(GLOB APIGEAR_INCLUDES *.h)
63+
FILE(GLOB_RECURSE APIGEAR_INCLUDES *.h)
6864
install(FILES ${APIGEAR_INCLUDES}
6965
DESTINATION include/apigear/olink)
7066

goldenmaster/apigear/olink/olinkconnection.cpp

+27-86
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
#include <Poco/Net/HTTPClientSession.h>
33
#include <Poco/Net/HTTPRequest.h>
44
#include <Poco/Net/HTTPResponse.h>
5+
#include <Poco/Net/WebSocket.h>
56

7+
#include "olinkconnection.h"
68
#include "olink/clientregistry.h"
79
#include "olink/iobjectsink.h"
8-
#include "olinkconnection.h"
910

1011
#include <iostream>
1112
#include <memory>
@@ -15,7 +16,6 @@ using namespace ApiGear::PocoImpl;
1516
namespace{
1617
const std::string jsonContentType = "application/json";
1718
const std::string defaultGatewayUrl = "ws://localhost:8000/ws";
18-
const std::string closeFramePayload = "bye";
1919
const std::string pingFramePayload = "ping";
2020
long retryInterval = 500; //Milliseconds
2121
long smallDelay = 10; //Milliseconds
@@ -24,8 +24,8 @@ namespace{
2424

2525
OlinkConnection::OlinkConnection(ApiGear::ObjectLink::ClientRegistry& registry)
2626
: m_node(ApiGear::ObjectLink::ClientNode::create(registry)),
27-
m_isConnecting(false),
28-
m_disconnectRequested(false)
27+
m_socket(*this),
28+
m_isConnecting(false)
2929
{
3030
auto writeFunction = [this](const auto& msg) {
3131
std::unique_lock<std::timed_mutex> lock(m_queueMutex);
@@ -36,58 +36,24 @@ OlinkConnection::OlinkConnection(ApiGear::ObjectLink::ClientRegistry& registry)
3636
m_node->onWrite(writeFunction);
3737
}
3838

39+
void OlinkConnection::onConnectionClosedFromNetwork()
40+
{
41+
onDisconnected();
42+
}
43+
3944
OlinkConnection::~OlinkConnection()
4045
{
41-
m_disconnectRequested = true;
4246
// DisconnectAndUnlink modifies collection.
4347
auto copyObjectLinkStatus = m_objectLinkStatus;
4448
for(auto& object : copyObjectLinkStatus){
4549
disconnectAndUnlink(object.first);
4650
}
4751
closeQueue();
48-
if (m_receivingDone.valid()){
49-
m_receivingDone.wait();
50-
}
51-
onDisconnected();
52-
}
53-
54-
void OlinkConnection::receiveInLoop()
55-
{
56-
onConnected();
57-
auto serverClosedConnection = false;
58-
do{
59-
try {
60-
// receiveFrame requires pocobuffer with initial size 0, as it always extends it with adding frame content.
61-
Poco::Buffer<char> pocobuffer(0);
62-
int flags;
63-
auto canSocketRead = m_socket ? m_socket->poll(Poco::Timespan(10000), Poco::Net::WebSocket::SELECT_READ) : false;
64-
if (canSocketRead && !m_disconnectRequested && m_socket) {
65-
std::unique_lock<std::timed_mutex> lock(m_socketMutex);
66-
auto frameSize = m_socket->receiveFrame(pocobuffer, flags);
67-
lock.unlock();
68-
auto messagePayload = std::string(pocobuffer.begin(), frameSize);
69-
auto frameOpCode = flags & Poco::Net::WebSocket::FRAME_OP_BITMASK;
70-
if (frameOpCode == Poco::Net::WebSocket::FRAME_OP_PING){
71-
writeMessage(messagePayload, Poco::Net::WebSocket::FRAME_OP_PONG);
72-
} else if (frameOpCode == Poco::Net::WebSocket::FRAME_OP_PONG) {
73-
// handle pong
74-
} else if (frameSize == 0 || frameOpCode == Poco::Net::WebSocket::FRAME_OP_CLOSE){
75-
std::cout << "close connection" << std::endl;
76-
serverClosedConnection = true;
77-
} else {
78-
handleTextMessage(messagePayload);
79-
}
80-
}
81-
}
82-
catch(Poco::Exception& e) {
83-
serverClosedConnection = true;
84-
std::cout << "connection closed with exception:" << e.what() << std::endl;
85-
}
86-
} while (!serverClosedConnection && !m_disconnectRequested);
87-
if (serverClosedConnection)
52+
if (!m_socket.isClosed())
8853
{
89-
onDisconnected();
54+
m_socket.close();
9055
}
56+
onDisconnected();
9157
}
9258

9359
void OlinkConnection::connectAndLinkObject(std::shared_ptr<ApiGear::ObjectLink::IObjectSink> object)
@@ -98,7 +64,7 @@ void OlinkConnection::connectAndLinkObject(std::shared_ptr<ApiGear::ObjectLink::
9864
}
9965

10066
m_node->registry().addSink(object);
101-
if (m_socket){
67+
if (!m_socket.isClosed()){
10268
m_node->linkRemote(object->olinkObjectName());
10369
m_objectLinkStatus[object->olinkObjectName()] = LinkStatus::Linked;
10470
}
@@ -125,15 +91,15 @@ void OlinkConnection::connectToHost(Poco::URI url)
12591
if (m_processMessagesTask){
12692
m_processMessagesTask->cancel();
12793
}
128-
m_disconnectRequested = false;
94+
12995
if(url.empty()) {
13096
m_serverUrl = Poco::URI(defaultGatewayUrl);
13197
std::clog << "No host url provided" << std::endl;
13298
} else {
13399
m_serverUrl = url;
134100
}
135101

136-
if(!m_socket && !m_isConnecting) {
102+
if(m_socket.isClosed() && !m_isConnecting) {
137103
std::clog << "Connecting to host " << url.toString() << std::endl;
138104
try {
139105
m_isConnecting = true;
@@ -142,15 +108,12 @@ void OlinkConnection::connectToHost(Poco::URI url)
142108
request.setKeepAlive(true);
143109
request.setContentType(jsonContentType);
144110
Poco::Net::HTTPResponse response;
145-
146-
m_socket = std::make_unique<Poco::Net::WebSocket>(session, request, response);
147-
if (m_socket){
148-
// Common default maximum frame size is 1Mb
149-
m_socket->setMaxPayloadSize(1048576);
150-
}
151-
m_receivingDone = std::async(std::launch::async, [this](){receiveInLoop(); });
111+
auto socket = std::make_unique<Poco::Net::WebSocket>(session, request, response);
112+
m_socket.changeSocket(std::move(socket));
113+
onConnected();
114+
152115
} catch (std::exception &e) {
153-
m_socket.reset();
116+
m_socket.close();
154117
std::cerr << "Exception " << e.what() << std::endl;
155118
}
156119
m_isConnecting = false;
@@ -162,17 +125,13 @@ void OlinkConnection::connectToHost(Poco::URI url)
162125

163126
void OlinkConnection::disconnect() {
164127
std::clog << " request to disconnect socket" << std::endl;
165-
m_disconnectRequested = true;
166-
167128
for (auto& object : m_objectLinkStatus){
168129
if (object.second != LinkStatus::NotLinked){
169130
m_node->unlinkRemote(object.first);
170131
}
171132
}
172133
closeQueue();
173-
if (m_receivingDone.valid()){
174-
m_receivingDone.wait();
175-
}
134+
m_socket.close();
176135
onDisconnected();
177136
}
178137

@@ -182,7 +141,6 @@ void OlinkConnection::closeQueue()
182141
m_processMessagesTask->cancel();
183142
}
184143
flushMessages();
185-
writeMessage(closeFramePayload, Poco::Net::WebSocket::FRAME_OP_CLOSE);
186144
}
187145

188146

@@ -208,7 +166,6 @@ void OlinkConnection::onDisconnected()
208166
{
209167
object.second = LinkStatus::NotLinked;
210168
}
211-
m_socket.reset();
212169
std::clog << " socket disconnected" << std::endl;
213170
}
214171

@@ -233,18 +190,19 @@ void OlinkConnection::scheduleProcessMessages(long delayMiliseconds)
233190

234191
void OlinkConnection::processMessages(Poco::Util::TimerTask& /*task*/)
235192
{
236-
if (!m_socket && !m_disconnectRequested) {
193+
if (m_socket.isClosed()) {
237194
connectToHost(m_serverUrl);
238195
scheduleProcessMessages(tryReconnectDelay);
239196
return;
240197
}
241-
writeMessage(pingFramePayload, Poco::Net::WebSocket::FRAME_OP_PING);
198+
199+
m_socket.writeMessage(pingFramePayload, Poco::Net::WebSocket::FRAME_OP_PING);
242200
flushMessages();
243201
}
244202

245203
void OlinkConnection::flushMessages()
246204
{
247-
if (m_socket)
205+
if (!m_socket.isClosed())
248206
{
249207
std::deque<std::string> copyQueue;
250208
std::unique_lock<std::timed_mutex> lock(m_queueMutex);
@@ -255,12 +213,12 @@ void OlinkConnection::flushMessages()
255213
auto message = copyQueue.front();
256214
std::clog << "write message to socket " << message << std::endl;
257215
// if we are using JSON we need to use txt message otherwise binary messages
258-
auto messageSent = writeMessage(message, Poco::Net::WebSocket::FRAME_TEXT);
216+
auto messageSent = m_socket.writeMessage(message, Poco::Net::WebSocket::FRAME_TEXT);
259217
if (messageSent){
260218
copyQueue.pop_front();
261219
}
262220
else {
263-
if (!m_disconnectRequested){
221+
if (!m_socket.isClosed()){
264222
lock.lock();
265223
// push again not sent elements to queue front
266224
m_queue.insert(m_queue.begin(), copyQueue.begin(), copyQueue.end());
@@ -272,20 +230,3 @@ void OlinkConnection::flushMessages()
272230
}
273231
}
274232
}
275-
276-
bool OlinkConnection::writeMessage(std::string message, int frameOpCode)
277-
{
278-
bool succeed = false;
279-
try {
280-
if (m_socket) {
281-
std::unique_lock<std::timed_mutex> lock(m_socketMutex);
282-
m_socket->sendFrame(message.c_str(), static_cast<int>(message.size()), frameOpCode);
283-
lock.unlock();
284-
succeed = true;
285-
}
286-
}
287-
catch (std::exception& e) {
288-
std::cerr << "Exception " << e.what() << std::endl;
289-
}
290-
return succeed;
291-
}

goldenmaster/apigear/olink/olinkconnection.h

+20-48
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,20 @@
11
#pragma once
22

3-
#if defined _WIN32 || defined __CYGWIN__
4-
#ifdef __GNUC__
5-
#define APIGEAR_OLINK_EXPORT __attribute__ ((dllexport))
6-
#else
7-
#define APIGEAR_OLINK_EXPORT __declspec(dllexport)
8-
#endif
9-
#else
10-
#if __GNUC__ >= 4
11-
#define APIGEAR_OLINK_EXPORT __attribute__ ((visibility ("default")))
12-
#else
13-
#define APIGEAR_OLINK_EXPORT
14-
#endif
15-
#endif
3+
#include "iolinkconnector.h"
4+
#include "private/socketwrapper.h"
5+
#include "private/apigear_olink.h"
6+
#include "private/isocketuser.h"
7+
8+
#include "olink/clientnode.h"
169

1710
#include <Poco/URI.h>
1811
#include <Poco/Util/Timer.h>
1912
#include <Poco/Util/TimerTask.h>
20-
#include <Poco/Mutex.h>
21-
#include <Poco/Net/WebSocket.h>
22-
23-
#include "olink/clientnode.h"
24-
#include "olink/consolelogger.h"
25-
26-
#include "iolinkconnector.h"
2713

2814
#include <atomic>
29-
#include <queue>
3015
#include <deque>
31-
#include <set>
3216
#include <memory>
33-
#include <future>
17+
#include <map>
3418

3519
namespace ApiGear {
3620

@@ -47,7 +31,7 @@ namespace PocoImpl {
4731
* It handles linking and unlinking with remote service for the sink with regard to the connection state.
4832
* Implements a message queue.
4933
*/
50-
class APIGEAR_OLINK_EXPORT OlinkConnection: public ApiGear::PocoImpl::IOlinkConnector
34+
class APIGEAR_OLINK_EXPORT OlinkConnection: public ApiGear::PocoImpl::IOlinkConnector, public ISocketUser
5135
{
5236
public:
5337
/**
@@ -82,11 +66,18 @@ class APIGEAR_OLINK_EXPORT OlinkConnection: public ApiGear::PocoImpl::IOlinkConn
8266
/** IOlinkConnector::disconnectAndUnlink implementation*/
8367
void disconnectAndUnlink(const std::string& objectId) override;
8468

85-
private:
86-
/**
87-
* Implements handling incoming messages in a loop.
69+
/**
70+
* ISocketUser::handleTextMessage implementation.
71+
* Handler for raw message received.
8872
*/
89-
void receiveInLoop();
73+
void handleTextMessage(const std::string& message) override;
74+
/**
75+
* ISocketUser::onConnectionClosedFromNetwork implementation
76+
* A callback to inform the socket user that connection was closed with close frame received from network.
77+
*/
78+
void onConnectionClosedFromNetwork() override;
79+
80+
private:
9081
/** Sends all the waiting messages when the connection is up. */
9182
void onConnected();
9283
/** Cleans up resources after connection closed either from server side(close frame received)
@@ -96,9 +87,6 @@ class APIGEAR_OLINK_EXPORT OlinkConnection: public ApiGear::PocoImpl::IOlinkConn
9687

9788
/* Sends all queued messages and sends close frame*/
9889
void closeQueue();
99-
100-
/** Handler for raw messages.*/
101-
void handleTextMessage(const std::string& message);
10290
/**
10391
* Processes queued messages.
10492
* @param task. Parameter is not used. The function uses most recent task stored as a member.
@@ -112,15 +100,6 @@ class APIGEAR_OLINK_EXPORT OlinkConnection: public ApiGear::PocoImpl::IOlinkConn
112100
/** Sends all stored messages*/
113101
void flushMessages();
114102

115-
/** Tries to send message immediately without queuing.
116-
* If the connection is not working, message will not be stored to resend.
117-
* @param message a message in network format to be send as it is.
118-
* @param the frame opcode. Use FRAME_TEXT or FRAME_BINARY for regular text messages
119-
* see Poco::Net::WebSocket Frame Opcodes for more info.
120-
* @return true if message was sent, false otherwise.
121-
*/
122-
bool writeMessage(std::string message, int frameOpCode);
123-
124103
/** Client node that separates sinks Objects from created socket, and handles incoming and outgoing messages. */
125104
std::shared_ptr<ApiGear::ObjectLink::ClientNode> m_node;
126105

@@ -134,13 +113,7 @@ class APIGEAR_OLINK_EXPORT OlinkConnection: public ApiGear::PocoImpl::IOlinkConn
134113
/** The server url to which socket connects. */
135114
Poco::URI m_serverUrl;
136115
/** The websocket used for connection.*/
137-
std::unique_ptr<Poco::Net::WebSocket> m_socket;
138-
/** A mutex for the socket */
139-
std::timed_mutex m_socketMutex;
140-
/** Flag handled between the threads with information that the connection should be closed. */
141-
std::atomic<bool> m_disconnectRequested;
142-
/** Result of receiveInLoop. Used to wait for end of its work after m_disconnectRequested is set to true*/
143-
std::future<void> m_receivingDone;
116+
SocketWrapper m_socket;
144117

145118
/** Flag to protect against opening a connection from many threads at the same time*/
146119
std::atomic<bool> m_isConnecting;
@@ -156,6 +129,5 @@ class APIGEAR_OLINK_EXPORT OlinkConnection: public ApiGear::PocoImpl::IOlinkConn
156129
/** A mutex for the message queue */
157130
std::timed_mutex m_queueMutex;
158131

159-
160132
};
161133
}} // namespace ApiGear::PocoImpl

0 commit comments

Comments
 (0)