Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for TCP Searches #192

Open
wants to merge 83 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
9f76c2f
add ability to connect to server channels when UDP search is not feas…
sveseli Jan 24, 2024
63548a1
remove un-needed code
sveseli Jan 24, 2024
99bfee2
allow control of the UDP sender port via the EPICS_PVA_UDP_SENDER_POR…
sveseli Jan 25, 2024
a0c436f
add tests for udp search, name server search, and direct connection
sveseli Mar 11, 2024
6e7420a
set connected status on state change
sveseli Mar 11, 2024
4de9a83
add method for retrieving search server port
sveseli Mar 11, 2024
6976fdc
add new tests to Makefile
sveseli Mar 11, 2024
6fe5a79
fix typo
sveseli Mar 11, 2024
8af89e7
add name server search
sveseli Mar 11, 2024
7caa5d3
add protocol constants
sveseli Mar 11, 2024
424090f
use constant for type
sveseli Mar 11, 2024
57c03c8
support for name server search
sveseli Mar 11, 2024
291d97f
use constant for tcp transport
sveseli Mar 11, 2024
03feff9
add interface for getting name server transport
sveseli Mar 11, 2024
f1bdbb2
add support for tcp search
sveseli Mar 11, 2024
c5dc3af
add support for name server channel discovery
sveseli Mar 12, 2024
8b32dba
add tcp acceptor for channel search
sveseli Mar 12, 2024
bcaa794
support for tcp channel search
sveseli Mar 12, 2024
48fb839
introduce new interfaces for managing tcp transport
sveseli Mar 13, 2024
d78b5a1
use different channel priority for name server searches; move name se…
sveseli Mar 13, 2024
98ec0bc
stop search if channel is found via direct connection
sveseli Mar 13, 2024
9425a18
use default address list if provided string is empty
sveseli Mar 15, 2024
bac2944
allow initializing server context with different set of response hand…
sveseli Mar 18, 2024
581f611
expose additional header
sveseli Mar 18, 2024
0cc8c2b
fix logging typo
sveseli Mar 19, 2024
53e15c4
add utility function to convert string to inet address
sveseli Mar 20, 2024
b3cb0d7
move utility functions to pvutils.cpp
sveseli Mar 20, 2024
d8a1dba
add PVA name server utility
sveseli Mar 20, 2024
797aa77
add name server utility description
sveseli Mar 20, 2024
fd115a8
add utilities for manipulating strings
sveseli Mar 21, 2024
9dcd131
add additional utility methods
sveseli Mar 21, 2024
6719d20
add support for a static channel map read from a file
sveseli Mar 21, 2024
3c598c2
update description
sveseli Mar 21, 2024
18a7e2a
send search response immediately
sveseli Mar 21, 2024
f629385
send name server search response immediately
sveseli Mar 21, 2024
cdf914c
merge changes for udp sender port
sveseli Mar 21, 2024
94e9042
update for older gcc compiler
sveseli Mar 22, 2024
db58c6f
fix windows build
sveseli Mar 22, 2024
f63cb89
move base nameserver code to the library
sveseli Mar 25, 2024
249e7f0
fix build on older gcc version
sveseli Mar 25, 2024
56f504a
fixes for mingw builds
sveseli Mar 25, 2024
0e237d9
fix few log formats
sveseli Mar 25, 2024
1007a9c
fix few codacy complaints; avoid duplicate symbold on rtems
sveseli Mar 26, 2024
8d0135a
move string constants to pvaConstants
sveseli Mar 27, 2024
26ff188
include missing header
sveseli Mar 27, 2024
70e52cf
fix few log statement formats
sveseli Mar 28, 2024
270b472
add diagnostics output for channel discovery tests
sveseli Mar 28, 2024
3223142
initialize udp sender port
sveseli Mar 28, 2024
278e24a
add more diagnostics
sveseli Mar 28, 2024
e75b24a
prevent direct connection attempt to unknown port
sveseli Mar 28, 2024
7b4604d
restore logging level
sveseli Mar 28, 2024
c9e23d1
resolve codacy complaints
sveseli Mar 29, 2024
bfbde63
reduce test logging level
sveseli Mar 29, 2024
d14f6d8
fix setsockopt call for mingw
sveseli Mar 29, 2024
b4a9f7a
add missing 2 bytes
sveseli Mar 29, 2024
fc33afb
revert protocol type back to hardcoded string
sveseli Mar 29, 2024
658d57d
use static cast for %lu format
sveseli Mar 29, 2024
10242e8
move stuff around to avoid exposing additional headers
sveseli Mar 30, 2024
c9227f9
few more changes for mingw
sveseli Mar 31, 2024
4527a0b
fix codacy complaint about explicit constructors
sveseli Mar 31, 2024
7832fc4
restore original export defs; remove unused constants
sveseli Mar 31, 2024
6b19c20
fix format warning
sveseli Apr 1, 2024
55bd40e
add test disgnostics output
sveseli Apr 1, 2024
cc5d2d6
new test added
sveseli Apr 1, 2024
1e530ba
remove pvans utility
sveseli Apr 12, 2024
afbd308
remove nameserver support classes
sveseli Apr 12, 2024
b521694
remove string utility tests
sveseli Apr 15, 2024
731bba1
remove ability to set udp sender port
sveseli Apr 15, 2024
0dca5d7
disable code that initiates direct connection to server based on EPIC…
sveseli Apr 15, 2024
899cde9
remove search acceptor; name server searches have to go through the r…
sveseli Apr 15, 2024
487b11e
remove unused method
sveseli Apr 15, 2024
4e8569a
add default implementation of isUsed()
sveseli Apr 15, 2024
046c1f2
add default implementation name server search methods
sveseli Apr 15, 2024
80c708c
remove pvaConstants.cpp, restore pvaVersion.cpp, remove PVA_UDP_SENDE…
sveseli Apr 17, 2024
5f001f5
remove udp sender port variables
sveseli Apr 17, 2024
fd15386
remove unused utility method for converting string to inet address
sveseli May 16, 2024
0179b3c
remove unused methods, restore original initialize() method for serve…
sveseli May 16, 2024
fe0be26
remove unused server search response handler class
sveseli May 16, 2024
39c6106
reduce logging for request start
sveseli May 16, 2024
2383407
force name server transport release if channel is not found after max…
sveseli May 16, 2024
7a64947
remove blocking call for getting name server connections, and instead…
sveseli May 29, 2024
bb5fce6
do not release all name server connections if channel search was unsu…
sveseli May 29, 2024
0a1f8f6
wait for name server transport only if the search has been just sched…
sveseli Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/remote/blockingUDPTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock
{
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
length, _remoteName.c_str(), inetAddressToString(address).c_str());
LOG(logLevelDebug, "UDP Tx (%lu) %s -> %s.",
static_cast<unsigned long>(length), _remoteName.c_str(), inetAddressToString(address).c_str());
}

int retval = sendto(_channel, buffer,
Expand All @@ -460,8 +460,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address)

if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(address).c_str());
}

int retval = sendto(_channel, buffer->getBuffer(),
Expand Down Expand Up @@ -498,8 +498,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {

if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
LOG(logLevelDebug, "Sending %lu bytes %s -> %s.",
static_cast<unsigned long>(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
}

int retval = sendto(_channel, buffer->getBuffer(),
Expand Down Expand Up @@ -684,7 +684,7 @@ void initializeUDPTransports(bool serverFlag,
}
}
LOG(logLevelDebug,
"Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(),
"Broadcast address #%lu: %s. (%sunicast)", static_cast<unsigned long>(i), inetAddressToString(list[i]).c_str(),
isunicast[i]?"":"not ");
}

Expand Down Expand Up @@ -714,7 +714,7 @@ void initializeUDPTransports(bool serverFlag,
getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0);

//
// Setup UDP trasport(s) (per interface)
// Setup UDP transport(s) (per interface)
//

InetAddrVector tappedNIF;
Expand Down
98 changes: 97 additions & 1 deletion src/remote/channelSearchManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1;
static const int MAX_FRAMES_AT_ONCE = 10;
static const int DELAY_BETWEEN_FRAMES_MS = 50;

static const int MAX_NAME_SERVER_SEARCH_COUNT = 3;

ChannelSearchManager::ChannelSearchManager(Context::shared_pointer const & context) :
m_context(context),
m_responseAddress(), // initialized in activate()
m_canceled(),
m_sequenceNumber(0),
m_nsSearchCounter(0),
m_sendBuffer(MAX_UDP_UNFRAGMENTED_SEND),
m_channels(),
m_lastTimeSent(),
Expand Down Expand Up @@ -135,6 +137,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer
if (m_canceled.get())
return;

LOG(logLevelDebug, "Registering search instance: %s", channel->getSearchInstanceName().c_str());
bool immediateTrigger;
{
Lock guard(m_channelMutex);
Expand All @@ -154,6 +157,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer

void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel)
{
LOG(logLevelDebug, "Unregistering search instance: %s", channel->getSearchInstanceName().c_str());
Lock guard(m_channelMutex);
pvAccessID id = channel->getSearchInstanceID();
m_channels.erase(id);
Expand All @@ -180,6 +184,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
SearchInstance::shared_pointer si(channelsIter->second.lock());

// remove from search list
LOG(logLevelDebug, "Removing cid %d from the channel map", cid);
m_channels.erase(cid);

guard.unlock();
Expand All @@ -188,6 +193,30 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci
if(si)
si->searchResponse(guid, minorRevision, serverAddress);
}
// Release all NS connections if there are
// no more channels to search for
releaseNameServerTransport();
}

void ChannelSearchManager::releaseNameServerTransport(bool forceRelease)
{
bool releaseAllConnections = true;
if(m_channels.size() == 0)
{
// No more channels to search for, release all connections
m_nsSearchCounter = 0;
m_context.lock()->releaseNameServerSearchTransport(m_nsTransport, releaseAllConnections);
m_nsTransport.reset();
}
else if(forceRelease)
{
// There are channels to search for, release only connection
// that is currently used
releaseAllConnections = false;
m_nsSearchCounter = 0;
m_context.lock()->releaseNameServerSearchTransport(m_nsTransport, releaseAllConnections);
m_nsTransport.reset();
}
}

void ChannelSearchManager::newServerDetected()
Expand All @@ -196,6 +225,46 @@ void ChannelSearchManager::newServerDetected()
callback();
}

void ChannelSearchManager::send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
{
control->startMessage(CMD_SEARCH, 4+1+3+16+2+1+4+2);
buffer->putInt(m_sequenceNumber);
buffer->putByte((int8_t)QOS_REPLY_REQUIRED); // CAST_POSITION
buffer->putByte((int8_t)0); // reserved
buffer->putShort((int16_t)0); // reserved

osiSockAddr anyAddress;
memset(&anyAddress, 0, sizeof(anyAddress));
anyAddress.ia.sin_family = AF_INET;
anyAddress.ia.sin_port = htons(0);
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
encodeAsIPv6Address(buffer, &anyAddress);
buffer->putShort((int16_t)ntohs(anyAddress.ia.sin_port));
buffer->putByte((int8_t)1);
SerializeHelper::serializeString("tcp", buffer, control);
buffer->putShort((int16_t)0); // initial channel count

vector<SearchInstance::shared_pointer> toSend;
{
Lock guard(m_channelMutex);
toSend.reserve(m_channels.size());

for(m_channels_t::iterator channelsIter = m_channels.begin();
channelsIter != m_channels.end(); channelsIter++)
{
SearchInstance::shared_pointer inst(channelsIter->second.lock());
if(!inst) continue;
toSend.push_back(inst);
}
}

vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
for (; siter != toSend.end(); siter++)
{
generateSearchRequestMessage(*siter, buffer, control);
}
}

void ChannelSearchManager::initializeSendBuffer()
{
// for now OK, since it is only set here
Expand Down Expand Up @@ -236,15 +305,41 @@ void ChannelSearchManager::flushSendBuffer()
{
Lock guard(m_mutex);

// UDP transport
Transport::shared_pointer tt = m_context.lock()->getSearchTransport();
BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast<BlockingUDPTransport>(tt);

// UDP search
m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x80); // unicast, no reply required
ut->send(&m_sendBuffer, inetAddressType_unicast);

m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x00); // b/m-cast, no reply required
ut->send(&m_sendBuffer, inetAddressType_broadcast_multicast);

// Name server transport
if(m_nsTransport)
{
// Reset transport (current connection only)
// after max. number of attempts is reached.
if (m_nsSearchCounter >= MAX_NAME_SERVER_SEARCH_COUNT)
{
LOG(logLevelDebug, "Resetting name server transport after %d search attempts", m_nsSearchCounter);
releaseNameServerTransport(true);
}
}

if(!m_nsTransport)
{
m_nsTransport = m_context.lock()->getNameServerSearchTransport();
}

// Name server search
if(m_nsTransport)
{
m_nsSearchCounter++;
LOG(logLevelDebug, "Initiating name server search for %d channels, search attempt %d", int(m_channels.size()), m_nsSearchCounter);
m_nsTransport->enqueueSendRequest(shared_from_this());
}
initializeSendBuffer();
}

Expand All @@ -253,7 +348,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
ByteBuffer* requestMessage, TransportSendControl* control)
{
epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);

dataCount++;

/*
Expand All @@ -262,6 +356,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p
*/

const std::string& name(channel->getSearchInstanceName());
LOG(logLevelDebug, "Searching for channel: %s", name.c_str());

// not nice...
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
if(((int)requestMessage->getRemaining()) < addedPayloadSize)
Expand Down
15 changes: 10 additions & 5 deletions src/remote/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1608,8 +1608,8 @@ void BlockingServerTCPTransportCodec::destroyAllChannels() {
{
LOG(
logLevelDebug,
"Transport to %s still has %zu channel(s) active and closing...",
_socketName.c_str(), _channels.size());
"Transport to %s still has %lu channel(s) active and closing...",
_socketName.c_str(), static_cast<unsigned long>(_channels.size()));
}

_channels_t temp;
Expand Down Expand Up @@ -1761,7 +1761,7 @@ bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer

if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Acquiring transport to %s.", _socketName.c_str());
LOG(logLevelDebug, "Acquiring transport to %s for channel cid %d.", _socketName.c_str(), client->getID());
}

_owners[client->getID()] = ClientChannelImpl::weak_pointer(client);
Expand Down Expand Up @@ -1789,8 +1789,8 @@ void BlockingClientTCPTransportCodec::internalClose() {
{
LOG(
logLevelDebug,
"Transport to %s still has %zu client(s) active and closing...",
_socketName.c_str(), refs);
"Transport to %s still has %lu client(s) active and closing...",
_socketName.c_str(), static_cast<unsigned long>(refs));
}

TransportClientMap_t::iterator it = _owners.begin();
Expand Down Expand Up @@ -1828,6 +1828,11 @@ void BlockingClientTCPTransportCodec::release(pvAccessID clientID) {
}
}

bool BlockingClientTCPTransportCodec::isUsed()
{
return (_owners.size() > 0);
}

void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
TransportSendControl* control)
{
Expand Down
17 changes: 17 additions & 0 deletions src/remote/pv/channelSearchManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SearchInstance {

class ChannelSearchManager :
public epics::pvData::TimerCallback,
public TransportSender,
public std::tr1::enable_shared_from_this<ChannelSearchManager>
{
public:
Expand Down Expand Up @@ -99,13 +100,19 @@ class ChannelSearchManager :
/// Timer stooped callback.
virtual void timerStopped() OVERRIDE FINAL;

// Transport sender interface.
virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL;

/**
* Private constructor.
* @param context
*/
ChannelSearchManager(Context::shared_pointer const & context);
void activate();

// Releases name server transport.
void releaseNameServerTransport(bool forceRelease=false);

private:

bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush);
Expand Down Expand Up @@ -140,6 +147,16 @@ class ChannelSearchManager :
*/
int32_t m_sequenceNumber;

/**
* Name server search attempt counter.
*/
int m_nsSearchCounter;

/**
* Name server transport
*/
Transport::shared_pointer m_nsTransport;

/**
* Send byte buffer (frame)
*/
Expand Down
2 changes: 2 additions & 0 deletions src/remote/pv/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,8 @@ class BlockingClientTCPTransportCodec :

virtual void release(pvAccessID clientId) OVERRIDE FINAL;

virtual bool isUsed() OVERRIDE FINAL;

virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control) OVERRIDE FINAL;

Expand Down
8 changes: 7 additions & 1 deletion src/remote/pv/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <osiSock.h>

#include <pv/serialize.h>
#include <pv/pvType.h>
#include <pv/byteBuffer.h>
#include <pv/timer.h>
#include <pv/pvData.h>
Expand Down Expand Up @@ -185,6 +184,11 @@ class epicsShareClass Transport : public epics::pvData::DeserializableControl {
*/
virtual void release(pvAccessID clientId) = 0;

/**
* Is transport used
*/
virtual bool isUsed() {return false;}

/**
* Get protocol type (tcp, udp, ssl, etc.).
* @return protocol type.
Expand Down Expand Up @@ -305,6 +309,8 @@ class Context {

virtual std::tr1::shared_ptr<Channel> getChannel(pvAccessID id) = 0;
virtual Transport::shared_pointer getSearchTransport() = 0;
virtual Transport::shared_pointer getNameServerSearchTransport() {return Transport::shared_pointer();}
virtual void releaseNameServerSearchTransport(const Transport::shared_pointer& nsTransport=Transport::shared_pointer(), bool releaseAllConnections=true) {}
};

/**
Expand Down
2 changes: 1 addition & 1 deletion src/remote/transportRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void TransportRegistry::clear()
if(temp.empty())
return;

LOG(logLevelDebug, "Context still has %zu transport(s) active and closing...", temp.size());
LOG(logLevelDebug, "Context still has %lu transport(s) active and closing...", static_cast<unsigned long>(temp.size()));

for(transports_t::iterator it(temp.begin()), end(temp.end());
it != end; ++it)
Expand Down
Loading
Loading