Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dphan/extracting protocol format #42

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ add_subdirectory (src)
add_subdirectory (tests)

if(PERFORMANCE_TESTS)
add_subdirectory (performance)
#add_subdirectory (performance)
endif()

if(BUILD_EXAMPLES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ std::future<int> TestApiClient::funcIntAsync(int paramInt)
static const auto operationId = ApiGear::ObjectLink::Name::createMemberId(olinkObjectName(), "funcInt");
m_node->invokeRemote(operationId,
nlohmann::json::array({paramInt}), [&resultPromise](ApiGear::ObjectLink::InvokeReplyArg arg) {
const int& value = arg.value.get<int>();
const int& value = arg.value.content.get<int>();
resultPromise.set_value(value);
});
return resultPromise.get_future().get();
Expand Down Expand Up @@ -157,7 +157,7 @@ std::future<float> TestApiClient::funcFloatAsync(float paramFloat)
static const auto operationId = ApiGear::ObjectLink::Name::createMemberId(olinkObjectName(), "funcFloat");
m_node->invokeRemote(operationId,
nlohmann::json::array({paramFloat}), [&resultPromise](ApiGear::ObjectLink::InvokeReplyArg arg) {
const float& value = arg.value.get<float>();
const float& value = arg.value.content.get<float>();
resultPromise.set_value(value);
});
return resultPromise.get_future().get();
Expand Down Expand Up @@ -188,7 +188,7 @@ std::future<std::string> TestApiClient::funcStringAsync(const std::string& param
static const auto operationId = ApiGear::ObjectLink::Name::createMemberId(olinkObjectName(), "funcString");
m_node->invokeRemote(operationId,
nlohmann::json::array({paramString}), [&resultPromise](ApiGear::ObjectLink::InvokeReplyArg arg) {
const std::string& value = arg.value.get<std::string>();
const std::string& value = arg.value.content.get<std::string>();
resultPromise.set_value(value);
});
return resultPromise.get_future().get();
Expand Down
58 changes: 44 additions & 14 deletions src/olink/clientnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ void ClientNode::linkRemote(const std::string& objectId)
{
static const std::string linkRemoteLog = "ClientNode.linkRemote: ";
emitLog(LogLevel::Info, linkRemoteLog, objectId);
emitWrite(Protocol::linkMessage(objectId));
auto serializer = getSerializer();
if (serializer)
{
auto writer = serializer->createWriter();
if (writer)
{
emitWrite(Protocol::linkMessage(*writer, objectId));
}
}
m_registry.unsetNode(objectId);
m_registry.setNode(m_nodeId, objectId);
}
Expand All @@ -41,28 +49,50 @@ void ClientNode::unlinkRemote(const std::string& objectId)
if (sink){
sink->olinkOnRelease();
}
emitWrite(Protocol::unlinkMessage(objectId));
auto serializer = getSerializer();
if (serializer)
{
auto writer = serializer->createWriter();
if (writer)
{
emitWrite(Protocol::unlinkMessage(*writer, objectId));
}
}
m_registry.unsetNode(objectId);
}

void ClientNode::invokeRemote(const std::string& methodId, const nlohmann::json& args, InvokeReplyFunc func)
void ClientNode::invokeRemote(const std::string& methodId, const OLinkContent& args, InvokeReplyFunc func)
{
static const std::string invokeRemoteLog = "ClientNode.invokeRemote: ";
emitLog(LogLevel::Info, invokeRemoteLog, methodId);
int requestId = nextRequestId();
std::unique_lock<std::mutex> lock(m_pendingInvokesMutex);
m_invokesPending[requestId] = func;
lock.unlock();
nlohmann::json msg = Protocol::invokeMessage(requestId, methodId, args);
emitWrite(msg);
auto serializer = getSerializer();
if (serializer)
{
auto writer = serializer->createWriter();
if (writer)
{
emitWrite(Protocol::invokeMessage(*writer, requestId, methodId, args));
}
}
}

void ClientNode::setRemoteProperty(const std::string& propertyId, const nlohmann::json& value)
void ClientNode::setRemoteProperty(const std::string& propertyId, const OLinkContent& value)
{
static const std::string setRemotePropertyLog = "ClientNode.setRemoteProperty: ";
emitLog(LogLevel::Info, setRemotePropertyLog, propertyId);
nlohmann::json msg = Protocol::setPropertyMessage(propertyId, value);
emitWrite(msg);
auto serializer = getSerializer();
if (serializer)
{
auto writer = serializer->createWriter();
if (writer)
{
emitWrite(Protocol::setPropertyMessage(*writer, propertyId, value));
}
}
}

ClientRegistry& ClientNode::registry()
Expand All @@ -85,7 +115,7 @@ unsigned long ClientNode::getNodeId() const
return m_nodeId;
}

void ClientNode::handleInit(const std::string& objectId, const nlohmann::json& props)
void ClientNode::handleInit(const std::string& objectId, const OLinkContent& props)
{
static const std::string handeInitLog = "ClientNode.handleInit: ";
emitLogWithPayload(LogLevel::Info, props, handeInitLog, objectId);
Expand All @@ -99,7 +129,7 @@ void ClientNode::handleInit(const std::string& objectId, const nlohmann::json& p
}
}

void ClientNode::handlePropertyChange(const std::string& propertyId, const nlohmann::json& value)
void ClientNode::handlePropertyChange(const std::string& propertyId, const OLinkContent& value)
{
static const std::string handlePropertyChangedlog = "ClientNode.handlePropertyChange: ";
emitLogWithPayload(LogLevel::Info, value, handlePropertyChangedlog, propertyId);
Expand All @@ -113,7 +143,7 @@ void ClientNode::handlePropertyChange(const std::string& propertyId, const nlohm
}
}

void ClientNode::handleInvokeReply(int requestId, const std::string& methodId, const nlohmann::json& value)
void ClientNode::handleInvokeReply(int requestId, const std::string& methodId, const OLinkContent& value)
{
static const std::string handleInvokeLog = "ClientNode.handleInvokeReply: ";
emitLogWithPayload(LogLevel::Info, value, handleInvokeLog, methodId);
Expand All @@ -135,7 +165,7 @@ void ClientNode::handleInvokeReply(int requestId, const std::string& methodId, c
}
}

void ClientNode::handleSignal(const std::string& signalId, const nlohmann::json& args)
void ClientNode::handleSignal(const std::string& signalId, const OLinkContent& args)
{
static const std::string handleSignalLog ="ClientNode.handleSignal: ";
emitLog(LogLevel::Info, handleSignalLog, signalId);
Expand All @@ -148,10 +178,10 @@ void ClientNode::handleSignal(const std::string& signalId, const nlohmann::json&
}
}

void ClientNode::handleError(int msgType, int requestId, const std::string& error)
void ClientNode::handleError(MsgType msgType, int requestId, const std::string& error)
{
static const std::string errorLog = "ClientNode.handleError: ";
emitLog(LogLevel::Info, errorLog, std::to_string(msgType), std::to_string(requestId), error);
emitLog(LogLevel::Info, errorLog, toString(msgType), std::to_string(requestId), error);
}

int ClientNode::nextRequestId()
Expand Down
15 changes: 8 additions & 7 deletions src/olink/clientnode.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "core/olink_common.h"
#include "core/olinkcontent.h"
#include "core/types.h"
#include "iclientnode.h"
#include "core/basenode.h"
Expand Down Expand Up @@ -55,9 +56,9 @@ class OLINK_EXPORT ClientNode : public BaseNode, public IClientNode, public std:
/** IClientNode::unlinkRemote implementation. */
void unlinkRemote(const std::string& objectId) override;
/** IClientNode::invokeRemote implementation. */
void invokeRemote(const std::string& methodId, const nlohmann::json& args=nlohmann::json{}, InvokeReplyFunc func=nullptr) override;
void invokeRemote(const std::string& methodId, const OLinkContent& args= OLinkContent(), InvokeReplyFunc func=nullptr) override;
/** IClientNode::setRemoteProperty implementation. */
void setRemoteProperty(const std::string& propertyId, const nlohmann::json& value) override;
void setRemoteProperty(const std::string& propertyId, const OLinkContent& value) override;

/* The registry in which client is registered*/
ClientRegistry& registry();
Expand All @@ -70,15 +71,15 @@ class OLINK_EXPORT ClientNode : public BaseNode, public IClientNode, public std:

protected:
/** IProtocolListener::handleInit implementation */
void handleInit(const std::string& objectId, const nlohmann::json& props) override;
void handleInit(const std::string& objectId, const OLinkContent& props) override;
/** IProtocolListener::handlePropertyChange implementation */
void handlePropertyChange(const std::string& propertyId, const nlohmann::json& value) override;
void handlePropertyChange(const std::string& propertyId, const OLinkContent& value) override;
/** IProtocolListener::handleInvokeReply implementation */
void handleInvokeReply(int requestId, const std::string& methodId, const nlohmann::json& value) override;
void handleInvokeReply(int requestId, const std::string& methodId, const OLinkContent& value) override;
/** IProtocolListener::handleSignal implementation */
void handleSignal(const std::string& signalId, const nlohmann::json& args) override;
void handleSignal(const std::string& signalId, const OLinkContent& args) override;
/** IProtocolListener::handleError implementation */
void handleError(int msgType, int requestId, const std::string& error) override;
void handleError(MsgType msgType, int requestId, const std::string& error) override;

/**
* Returns a request id for outgoing messages.
Expand Down
38 changes: 21 additions & 17 deletions src/olink/core/basenode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,29 @@ namespace
static const std::string notImplementedLog = "not implemented ";
}

void BaseNode::onWrite(WriteMessageFunc func)
void BaseNode::onWrite(WriteMessageFunc func, std::shared_ptr<IMessageSerializer> serializer)
{
m_writeFunc = func;
m_serializer = serializer;
}

void BaseNode::emitWrite(const nlohmann::json& msg)
void BaseNode::emitWrite(const OLinkMessage& msg)
{
static const std::string writeMessageLog = "writeMessage: ";
emitLogWithPayload(LogLevel::Debug, msg, writeMessageLog);
if(m_writeFunc) {
m_writeFunc(m_converter.toString(msg));
if(m_writeFunc && m_serializer) {
m_writeFunc(m_serializer->toNetworkFormat(msg));
} else {
static const std::string noWriterSetLog = "Messages are not sent if the write function is not set";
emitLog(LogLevel::Warning, noWriterSetLog);
}
}
void BaseNode::setMessageFormat(MessageFormat format)
{
m_converter.setMessageFormat(format);
}

void BaseNode::handleMessage(const std::string& data)
{
m_protocol.handleMessage(m_converter.fromString(data), *this);
auto msg = m_serializer->fromNetworkFormat(data);
auto deserializer = m_serializer->createReader(msg);
m_protocol.handleMessage(*(deserializer.get()), *this);
}

void BaseNode::handleLink(const std::string& objectId)
Expand All @@ -44,39 +43,44 @@ void BaseNode::handleUnlink(const std::string& objectId)
emitLog(LogLevel::Warning, notImplementedLog, std::string(__func__), objectId);
}

void BaseNode::handleInvoke(int, const std::string& methodId, const nlohmann::json& args)
void BaseNode::handleInvoke(int, const std::string& methodId, const OLinkContent& args)
{
emitLogWithPayload(LogLevel::Warning, args, notImplementedLog, std::string(__func__), methodId, " args ");
}

void BaseNode::handleSetProperty(const std::string& propertyId, const nlohmann::json& value)
void BaseNode::handleSetProperty(const std::string& propertyId, const OLinkContent& value)
{
emitLogWithPayload(LogLevel::Warning, value, notImplementedLog, std::string(__func__), propertyId, " value ");
}

void BaseNode::handleInit(const std::string& objectId, const nlohmann::json& props)
void BaseNode::handleInit(const std::string& objectId, const OLinkContent& props)
{
emitLogWithPayload(LogLevel::Warning, props, notImplementedLog, std::string(__func__), objectId, " props ");
}

void BaseNode::handleInvokeReply(int requestId, const std::string& methodId, const nlohmann::json& value)
void BaseNode::handleInvokeReply(int requestId, const std::string& methodId, const OLinkContent& value)
{
emitLog(LogLevel::Warning, notImplementedLog, std::string(__func__), methodId, " requestId ", std::to_string(requestId), " value ", value);
emitLogWithPayload(LogLevel::Warning, value, notImplementedLog, std::string(__func__), methodId, " requestId ", std::to_string(requestId), " value ");
}

void BaseNode::handleSignal(const std::string& signalId, const nlohmann::json& args)
void BaseNode::handleSignal(const std::string& signalId, const OLinkContent& args)
{
emitLogWithPayload(LogLevel::Warning, args, notImplementedLog, std::string(__func__), signalId, " args ");
}

void BaseNode::handlePropertyChange(const std::string& propertyId, const nlohmann::json& value)
void BaseNode::handlePropertyChange(const std::string& propertyId, const OLinkContent& value)
{
emitLogWithPayload(LogLevel::Warning, value, notImplementedLog, std::string(__func__), propertyId, " value ");
}

void BaseNode::handleError(int, int requestId, const std::string& error)
void BaseNode::handleError(MsgType, int requestId, const std::string& error)
{
emitLog(LogLevel::Warning, notImplementedLog, std::string(__func__), " requestId ", std::to_string(requestId), " error ", error);
}

std::shared_ptr<IMessageSerializer> BaseNode::getSerializer()
{
return m_serializer;
}

} } // ApiGear::ObjectLink
30 changes: 13 additions & 17 deletions src/olink/core/basenode.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#include "protocol.h"
#include "types.h"
#include "olink_common.h"
#include "nlohmann/json.hpp"
#include <cstring>
#include "imessageserializer.h"

namespace ApiGear { namespace ObjectLink {

Expand All @@ -21,18 +21,13 @@ class OLINK_EXPORT BaseNode: public LoggerBase,
* Network layer implementation should deliver this function,
* with which messages are sent through network.
*/
void onWrite(WriteMessageFunc func);
void onWrite(WriteMessageFunc func, std::shared_ptr<IMessageSerializer> serializer);
/**
* Use this function to format message and send it through the network.
* It uses the WriteMessageFunc provided by network layer implementation with onWrite(WriteMessageFunc) call.
* @param j The data to send, translated according to chosen network message format before sending.
*/
virtual void emitWrite(const nlohmann::json& j);

/**
* Use to change messages network format.
*/
void setMessageFormat(MessageFormat format);
virtual void emitWrite(const OLinkMessage& j);

// Implementation::IMessageHandler
void handleMessage(const std::string& data) override;
Expand All @@ -42,24 +37,25 @@ class OLINK_EXPORT BaseNode: public LoggerBase,
// Empty, logging only implementation of IProtocolListener::handleUnlink, should be overwritten on server side.
void handleUnlink(const std::string& objectId) override;
// Empty, logging only implementation of IProtocolListener::handleInvoke, should be overwritten on server side.
void handleInvoke(int requestId, const std::string& methodId, const nlohmann::json& args) override;
void handleInvoke(int requestId, const std::string& methodId, const OLinkContent& args) override;
// Empty, logging only implementation of IProtocolListener::handleSetProperty, should be overwritten on server side.
void handleSetProperty(const std::string& propertyId, const nlohmann::json& value) override;
void handleSetProperty(const std::string& propertyId, const OLinkContent& value) override;
// Empty, logging only implementation of IProtocolListener::handleInit, should be overwritten on client side.
void handleInit(const std::string& objectId, const nlohmann::json& props) override;
void handleInit(const std::string& objectId, const OLinkContent& props) override;
// Empty, logging only implementation of IProtocolListener::handleInvokeReply, should be overwritten on client side.
void handleInvokeReply(int requestId, const std::string& methodId, const nlohmann::json& value) override;
void handleInvokeReply(int requestId, const std::string& methodId, const OLinkContent& value) override;
// Empty, logging only implementation of IProtocolListener::handleSignal, should be overwritten on client side.
void handleSignal(const std::string& signalId, const nlohmann::json& args) override;
void handleSignal(const std::string& signalId, const OLinkContent& args) override;
// Empty, logging only implementation of IProtocolListener::handlePropertyChange, should be overwritten on client side.
void handlePropertyChange(const std::string& propertyId, const nlohmann::json& value) override;
void handlePropertyChange(const std::string& propertyId, const OLinkContent& value) override;
// Empty, logging only implementation of IProtocolListener::handleError, should be overwritten on both client and server side.
void handleError(int msgType, int requestId, const std::string& error) override;
void handleError(MsgType msgType, int requestId, const std::string& error) override;
std::shared_ptr<IMessageSerializer> getSerializer();
private:
/** Function with which messages are sent through network after translation to chosen network format */
WriteMessageFunc m_writeFunc = nullptr;
/** A message converter, translates messages to and from chosen network format*/
MessageConverter m_converter = MessageFormat::JSON;
/** A message serializer, that specifies how to encode and decode messages. Used by protocol. */
std::shared_ptr<IMessageSerializer> m_serializer;
/** ObjectLink protocol*/
Protocol m_protocol;
};
Expand Down
Loading