diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 81d741d4..caa2d0d3 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -435,8 +435,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock { if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.", - length, _remoteName.c_str(), inetAddressToString(address).c_str()); + LOG(logLevelDebug, "UDP Tx (%lu) %s -> %s.", + static_cast(length), _remoteName.c_str(), inetAddressToString(address).c_str()); } int retval = sendto(_channel, buffer, @@ -460,8 +460,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address) if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Sending %zu bytes %s -> %s.", - buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str()); + LOG(logLevelDebug, "Sending %lu bytes %s -> %s.", + static_cast(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(address).c_str()); } int retval = sendto(_channel, buffer->getBuffer(), @@ -498,8 +498,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) { if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Sending %zu bytes %s -> %s.", - buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str()); + LOG(logLevelDebug, "Sending %lu bytes %s -> %s.", + static_cast(buffer->getRemaining()), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str()); } int retval = sendto(_channel, buffer->getBuffer(), @@ -684,7 +684,7 @@ void initializeUDPTransports(bool serverFlag, } } LOG(logLevelDebug, - "Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(), + "Broadcast address #%lu: %s. (%sunicast)", static_cast(i), inetAddressToString(list[i]).c_str(), isunicast[i]?"":"not "); } @@ -714,7 +714,7 @@ void initializeUDPTransports(bool serverFlag, getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0); // - // Setup UDP trasport(s) (per interface) + // Setup UDP transport(s) (per interface) // InetAddrVector tappedNIF; diff --git a/src/remote/channelSearchManager.cpp b/src/remote/channelSearchManager.cpp index 6dd9b33e..545288e5 100644 --- a/src/remote/channelSearchManager.cpp +++ b/src/remote/channelSearchManager.cpp @@ -71,12 +71,14 @@ static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1; static const int MAX_FRAMES_AT_ONCE = 10; static const int DELAY_BETWEEN_FRAMES_MS = 50; +static const int MAX_NAME_SERVER_SEARCH_COUNT = 3; ChannelSearchManager::ChannelSearchManager(Context::shared_pointer const & context) : m_context(context), m_responseAddress(), // initialized in activate() m_canceled(), m_sequenceNumber(0), + m_nsSearchCounter(0), m_sendBuffer(MAX_UDP_UNFRAGMENTED_SEND), m_channels(), m_lastTimeSent(), @@ -135,6 +137,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer if (m_canceled.get()) return; + LOG(logLevelDebug, "Registering search instance: %s", channel->getSearchInstanceName().c_str()); bool immediateTrigger; { Lock guard(m_channelMutex); @@ -154,6 +157,7 @@ void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel) { + LOG(logLevelDebug, "Unregistering search instance: %s", channel->getSearchInstanceName().c_str()); Lock guard(m_channelMutex); pvAccessID id = channel->getSearchInstanceID(); m_channels.erase(id); @@ -180,6 +184,7 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci SearchInstance::shared_pointer si(channelsIter->second.lock()); // remove from search list + LOG(logLevelDebug, "Removing cid %d from the channel map", cid); m_channels.erase(cid); guard.unlock(); @@ -188,6 +193,30 @@ void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID ci if(si) si->searchResponse(guid, minorRevision, serverAddress); } + // Release all NS connections if there are + // no more channels to search for + releaseNameServerTransport(); +} + +void ChannelSearchManager::releaseNameServerTransport(bool forceRelease) +{ + bool releaseAllConnections = true; + if(m_channels.size() == 0) + { + // No more channels to search for, release all connections + m_nsSearchCounter = 0; + m_context.lock()->releaseNameServerSearchTransport(m_nsTransport, releaseAllConnections); + m_nsTransport.reset(); + } + else if(forceRelease) + { + // There are channels to search for, release only connection + // that is currently used + releaseAllConnections = false; + m_nsSearchCounter = 0; + m_context.lock()->releaseNameServerSearchTransport(m_nsTransport, releaseAllConnections); + m_nsTransport.reset(); + } } void ChannelSearchManager::newServerDetected() @@ -196,6 +225,46 @@ void ChannelSearchManager::newServerDetected() callback(); } +void ChannelSearchManager::send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) +{ + control->startMessage(CMD_SEARCH, 4+1+3+16+2+1+4+2); + buffer->putInt(m_sequenceNumber); + buffer->putByte((int8_t)QOS_REPLY_REQUIRED); // CAST_POSITION + buffer->putByte((int8_t)0); // reserved + buffer->putShort((int16_t)0); // reserved + + osiSockAddr anyAddress; + memset(&anyAddress, 0, sizeof(anyAddress)); + anyAddress.ia.sin_family = AF_INET; + anyAddress.ia.sin_port = htons(0); + anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); + encodeAsIPv6Address(buffer, &anyAddress); + buffer->putShort((int16_t)ntohs(anyAddress.ia.sin_port)); + buffer->putByte((int8_t)1); + SerializeHelper::serializeString("tcp", buffer, control); + buffer->putShort((int16_t)0); // initial channel count + + vector toSend; + { + Lock guard(m_channelMutex); + toSend.reserve(m_channels.size()); + + for(m_channels_t::iterator channelsIter = m_channels.begin(); + channelsIter != m_channels.end(); channelsIter++) + { + SearchInstance::shared_pointer inst(channelsIter->second.lock()); + if(!inst) continue; + toSend.push_back(inst); + } + } + + vector::iterator siter = toSend.begin(); + for (; siter != toSend.end(); siter++) + { + generateSearchRequestMessage(*siter, buffer, control); + } +} + void ChannelSearchManager::initializeSendBuffer() { // for now OK, since it is only set here @@ -236,15 +305,41 @@ void ChannelSearchManager::flushSendBuffer() { Lock guard(m_mutex); + // UDP transport Transport::shared_pointer tt = m_context.lock()->getSearchTransport(); BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast(tt); + // UDP search m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x80); // unicast, no reply required ut->send(&m_sendBuffer, inetAddressType_unicast); m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x00); // b/m-cast, no reply required ut->send(&m_sendBuffer, inetAddressType_broadcast_multicast); + // Name server transport + if(m_nsTransport) + { + // Reset transport (current connection only) + // after max. number of attempts is reached. + if (m_nsSearchCounter >= MAX_NAME_SERVER_SEARCH_COUNT) + { + LOG(logLevelDebug, "Resetting name server transport after %d search attempts", m_nsSearchCounter); + releaseNameServerTransport(true); + } + } + + if(!m_nsTransport) + { + m_nsTransport = m_context.lock()->getNameServerSearchTransport(); + } + + // Name server search + if(m_nsTransport) + { + m_nsSearchCounter++; + LOG(logLevelDebug, "Initiating name server search for %d channels, search attempt %d", int(m_channels.size()), m_nsSearchCounter); + m_nsTransport->enqueueSendRequest(shared_from_this()); + } initializeSendBuffer(); } @@ -253,7 +348,6 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p ByteBuffer* requestMessage, TransportSendControl* control) { epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION); - dataCount++; /* @@ -262,6 +356,8 @@ bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_p */ const std::string& name(channel->getSearchInstanceName()); + LOG(logLevelDebug, "Searching for channel: %s", name.c_str()); + // not nice... const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); if(((int)requestMessage->getRemaining()) < addedPayloadSize) diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 7d4b9a16..e3497de3 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -1608,8 +1608,8 @@ void BlockingServerTCPTransportCodec::destroyAllChannels() { { LOG( logLevelDebug, - "Transport to %s still has %zu channel(s) active and closing...", - _socketName.c_str(), _channels.size()); + "Transport to %s still has %lu channel(s) active and closing...", + _socketName.c_str(), static_cast(_channels.size())); } _channels_t temp; @@ -1761,7 +1761,7 @@ bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Acquiring transport to %s.", _socketName.c_str()); + LOG(logLevelDebug, "Acquiring transport to %s for channel cid %d.", _socketName.c_str(), client->getID()); } _owners[client->getID()] = ClientChannelImpl::weak_pointer(client); @@ -1789,8 +1789,8 @@ void BlockingClientTCPTransportCodec::internalClose() { { LOG( logLevelDebug, - "Transport to %s still has %zu client(s) active and closing...", - _socketName.c_str(), refs); + "Transport to %s still has %lu client(s) active and closing...", + _socketName.c_str(), static_cast(refs)); } TransportClientMap_t::iterator it = _owners.begin(); @@ -1828,6 +1828,11 @@ void BlockingClientTCPTransportCodec::release(pvAccessID clientID) { } } +bool BlockingClientTCPTransportCodec::isUsed() +{ + return (_owners.size() > 0); +} + void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer, TransportSendControl* control) { diff --git a/src/remote/pv/channelSearchManager.h b/src/remote/pv/channelSearchManager.h index 5d8b5bb8..e57ba717 100644 --- a/src/remote/pv/channelSearchManager.h +++ b/src/remote/pv/channelSearchManager.h @@ -53,6 +53,7 @@ class SearchInstance { class ChannelSearchManager : public epics::pvData::TimerCallback, + public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -99,6 +100,9 @@ class ChannelSearchManager : /// Timer stooped callback. virtual void timerStopped() OVERRIDE FINAL; + // Transport sender interface. + virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL; + /** * Private constructor. * @param context @@ -106,6 +110,9 @@ class ChannelSearchManager : ChannelSearchManager(Context::shared_pointer const & context); void activate(); + // Releases name server transport. + void releaseNameServerTransport(bool forceRelease=false); + private: bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush); @@ -140,6 +147,16 @@ class ChannelSearchManager : */ int32_t m_sequenceNumber; + /** + * Name server search attempt counter. + */ + int m_nsSearchCounter; + + /** + * Name server transport + */ + Transport::shared_pointer m_nsTransport; + /** * Send byte buffer (frame) */ diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index 32e86410..0eb8f886 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -612,6 +612,8 @@ class BlockingClientTCPTransportCodec : virtual void release(pvAccessID clientId) OVERRIDE FINAL; + virtual bool isUsed() OVERRIDE FINAL; + virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL; diff --git a/src/remote/pv/remote.h b/src/remote/pv/remote.h index b70b0e38..46c6cfe3 100644 --- a/src/remote/pv/remote.h +++ b/src/remote/pv/remote.h @@ -18,7 +18,6 @@ #include #include -#include #include #include #include @@ -185,6 +184,11 @@ class epicsShareClass Transport : public epics::pvData::DeserializableControl { */ virtual void release(pvAccessID clientId) = 0; + /** + * Is transport used + */ + virtual bool isUsed() {return false;} + /** * Get protocol type (tcp, udp, ssl, etc.). * @return protocol type. @@ -305,6 +309,8 @@ class Context { virtual std::tr1::shared_ptr getChannel(pvAccessID id) = 0; virtual Transport::shared_pointer getSearchTransport() = 0; + virtual Transport::shared_pointer getNameServerSearchTransport() {return Transport::shared_pointer();} + virtual void releaseNameServerSearchTransport(const Transport::shared_pointer& nsTransport=Transport::shared_pointer(), bool releaseAllConnections=true) {} }; /** diff --git a/src/remote/transportRegistry.cpp b/src/remote/transportRegistry.cpp index 13fdc24e..6c32bed9 100644 --- a/src/remote/transportRegistry.cpp +++ b/src/remote/transportRegistry.cpp @@ -127,7 +127,7 @@ void TransportRegistry::clear() if(temp.empty()) return; - LOG(logLevelDebug, "Context still has %zu transport(s) active and closing...", temp.size()); + LOG(logLevelDebug, "Context still has %lu transport(s) active and closing...", static_cast(temp.size())); for(transports_t::iterator it(temp.begin()), end(temp.end()); it != end; ++it) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index eff79d4a..0ff0330b 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,7 @@ #include #include +#include #include #include #include @@ -43,6 +45,9 @@ //#include +#define PVA_CHANNEL_SEARCH_PRIORITY 98 +#define MAX_NS_TRANSPORT_WAIT_TIME 0.1 + using std::tr1::dynamic_pointer_cast; using std::tr1::static_pointer_cast; @@ -2584,25 +2589,43 @@ class SearchResponseHandler : public AbstractClientResponseHandler { // NOTE: htons might be a macro (e.g. vxWorks) int16 port = payloadBuffer->getShort(); serverAddress.ia.sin_port = htons(port); + char strBuffer[24]; + ipAddrToDottedIP(&serverAddress.ia, strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "Server address decoded as %s, transport type is %s", strBuffer, transport->getType().c_str()); /*string protocol =*/ SerializeHelper::deserializeString(payloadBuffer, transport.get()); + // Get channel search manager + ClientContextImpl::shared_pointer context(_context.lock()); + if(!context) + { + return; + } + std::tr1::shared_ptr csm = context->getChannelSearchManager(); + + // Did we find anything? transport->ensureData(1); bool found = payloadBuffer->getByte() != 0; if (!found) + { + if (transport->getType() == "tcp") + { + // Release only current NS connection + LOG(logLevelDebug, "No channels found, releasing current name server transport"); + csm->releaseNameServerTransport(true); + } return; + } // reads CIDs // TODO optimize - ClientContextImpl::shared_pointer context(_context.lock()); - if(!context) - return; - std::tr1::shared_ptr csm = context->getChannelSearchManager(); int16 count = payloadBuffer->getShort(); + LOG(logLevelDebug, "Found %hd channels", count); for (int i = 0; i < count; i++) { transport->ensureData(4); pvAccessID cid = payloadBuffer->getInt(); + LOG(logLevelDebug, "Invoking search response for channel cid: %d", cid); csm->searchResponse(guid, cid, searchSequenceId, version, &serverAddress); } @@ -2999,7 +3022,8 @@ class ClientResponseHandler : public ResponseHandler { { if (command < 0 || command >= (int8)m_handlerTable.size()) { - if(IS_LOGGABLE(logLevelError)) { + if(IS_LOGGABLE(logLevelError)) + { std::ios::fmtflags initialflags = std::cerr.flags(); std::cerr<<"Invalid (or unsupported) command: "< m_handlerTable; + +public: + + virtual ~NameServerClientResponseHandler() {} + /** + * @param context + */ + NameServerClientResponseHandler(ClientContextImpl::shared_pointer const & context) + :ResponseHandler(context.get(), "NameServerClientResponseHandler") + { + ResponseHandler::shared_pointer ignoreResponse(new NoopResponse(context, "Ignore")); + m_handlerTable.resize(CMD_CANCEL_REQUEST+1); + m_handlerTable[CMD_BEACON] = ignoreResponse; /* 0 */ + m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */ + m_handlerTable[CMD_ECHO] = ignoreResponse; /* 2 */ + m_handlerTable[CMD_SEARCH] = ignoreResponse; /* 3 */ + m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */ + m_handlerTable[CMD_AUTHNZ].reset(new AuthNZHandler(context.get())); /* 5 */ + m_handlerTable[CMD_ACL_CHANGE] = ignoreResponse; /* 6 */ + m_handlerTable[CMD_CREATE_CHANNEL] = ignoreResponse; /* 7 */ + m_handlerTable[CMD_DESTROY_CHANNEL] = ignoreResponse; /* 8 */ + m_handlerTable[CMD_CONNECTION_VALIDATED].reset(new ClientConnectionValidatedHandler(context)); /* 9 */ + m_handlerTable[CMD_GET] = ignoreResponse; /* 10 */ + m_handlerTable[CMD_PUT] = ignoreResponse; /* 11 */ + m_handlerTable[CMD_PUT_GET] = ignoreResponse; /* 12 */ + m_handlerTable[CMD_MONITOR] = ignoreResponse; /* 13 */ + m_handlerTable[CMD_ARRAY] = ignoreResponse; /* 14 */ + m_handlerTable[CMD_DESTROY_REQUEST] = ignoreResponse; /* 15 */ + m_handlerTable[CMD_PROCESS] = ignoreResponse; /* 16 */ + m_handlerTable[CMD_GET_FIELD] = ignoreResponse; /* 17 */ + m_handlerTable[CMD_MESSAGE] = ignoreResponse; /* 18 */ + m_handlerTable[CMD_MULTIPLE_DATA] = ignoreResponse; /* 19 */ + m_handlerTable[CMD_RPC] = ignoreResponse; /* 20 */ + m_handlerTable[CMD_CANCEL_REQUEST] = ignoreResponse; /* 21 */ + } + virtual void handleResponse(osiSockAddr* responseFrom, + Transport::shared_pointer const & transport, int8 version, int8 command, + size_t payloadSize, ByteBuffer* payloadBuffer) OVERRIDE FINAL + { + if (command < 0 || command >= (int8)m_handlerTable.size()) + { + if(IS_LOGGABLE(logLevelError)) + { + std::ios::fmtflags initialflags = std::cerr.flags(); + std::cerr<<"Invalid (or unsupported) command: "<handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + } +}; /** * Context state enum. @@ -3037,9 +3126,6 @@ enum ContextState { CONTEXT_DESTROYED }; - - - class InternalClientContextImpl : public ClientContextImpl, public ChannelProvider @@ -3097,9 +3183,19 @@ class InternalClientContextImpl : std::string const & addressesStr) OVERRIDE FINAL { InetAddrVector addresses; - getSocketAddressList(addresses, addressesStr, PVA_SERVER_PORT); + if (addressesStr.empty()) + { + LOG(logLevelDebug, "Creating channel using default address list: %s", m_addressList.c_str()); + getSocketAddressList(addresses, m_addressList, m_serverPort); + } + else + { + LOG(logLevelDebug, "Creating channel using address list: %s", addressesStr.c_str()); + getSocketAddressList(addresses, addressesStr, m_serverPort); + } + bool initiateSearch = true; - Channel::shared_pointer channel = createChannelInternal(channelName, channelRequester, priority, addresses); + Channel::shared_pointer channel = createChannelInternal(channelName, channelRequester, priority, addresses, initiateSearch); if (channel.get()) channelRequester->channelCreated(Status::Ok, channel); return channel; @@ -3213,7 +3309,7 @@ class InternalClientContextImpl : Mutex m_channelMutex; private: /** - * Flag indicting what message to send. + * Flag indicating what message to send. */ bool m_issueCreateMessage; @@ -3225,6 +3321,11 @@ class InternalClientContextImpl : */ ServerGUID m_guid; + /** + * Initiate search flag + */ + bool m_initiateSearch; + public: static size_t num_instances; static size_t num_active; @@ -3243,7 +3344,7 @@ class InternalClientContextImpl : string const & name, ChannelRequester::shared_pointer const & requester, short priority, - const InetAddrVector& addresses) : + const InetAddrVector& addresses, bool initiateSearch) : m_context(context), m_channelID(channelID), m_name(name), @@ -3255,7 +3356,8 @@ class InternalClientContextImpl : m_needSubscriptionUpdate(false), m_allowCreation(true), m_serverChannelID(0xFFFFFFFF), - m_issueCreateMessage(true) + m_issueCreateMessage(true), + m_initiateSearch(initiateSearch) { REFTRACE_INCREMENT(num_instances); } @@ -3278,10 +3380,11 @@ class InternalClientContextImpl : string const & name, ChannelRequester::shared_pointer requester, short priority, - const InetAddrVector& addresses) + const InetAddrVector& addresses, + bool initiateSearch) { std::tr1::shared_ptr internal( - new InternalChannelImpl(context, channelID, name, requester, priority, addresses)), + new InternalChannelImpl(context, channelID, name, requester, priority, addresses, initiateSearch)), external(internal.get(), epics::pvAccess::Destroyable::cleaner(internal)); const_cast(internal->m_internal_this) = internal; const_cast(internal->m_external_this) = external; @@ -3308,9 +3411,12 @@ class InternalClientContextImpl : m_getfield.reset(); - // stop searching... shared_pointer thisChannelPointer = internal_from_this(); - m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); + // stop searching... + if (m_initiateSearch) + { + m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); + } disconnectPendingIO(true); @@ -3530,9 +3636,13 @@ class InternalClientContextImpl : if (m_connectionState != CONNECTED) return; - if (!initiateSearch) { - // stop searching... - m_context->getChannelSearchManager()->unregisterSearchInstance(internal_from_this()); + if (!initiateSearch) + { + if (m_initiateSearch) + { + // stop searching... + m_context->getChannelSearchManager()->unregisterSearchInstance(internal_from_this()); + } } setConnectionState(DISCONNECTED); @@ -3567,6 +3677,7 @@ class InternalClientContextImpl : #define STATIC_SEARCH_BASE_DELAY_SEC 5 #define STATIC_SEARCH_MAX_MULTIPLIER 10 +#define STATIC_SEARCH_MIN_DELAY_SEC 1 /** * Initiate search (connect) procedure. @@ -3574,24 +3685,43 @@ class InternalClientContextImpl : void initiateSearch(bool penalize = false) { Lock guard(m_channelMutex); + if (!m_initiateSearch) + { + LOG(logLevelDebug, "Search will not be initiated for channel %s", m_name.c_str()); + return; + } m_allowCreation = true; - if (m_addresses.empty()) - { - m_context->getChannelSearchManager()->registerSearchInstance(internal_from_this(), penalize); - } - else - { - m_context->getTimer()->scheduleAfterDelay(internal_from_this(), - (m_addressIndex / m_addresses.size())*STATIC_SEARCH_BASE_DELAY_SEC); - } + // The following code forces direct tcp connection to server + // if (!m_addresses.empty()) { + // char strBuffer[24]; + // int index = m_addressIndex % m_addresses.size(); + // osiSockAddr* serverAddress = &m_addresses[index]; + // ipAddrToDottedIP(&serverAddress->ia, strBuffer, sizeof(strBuffer)); + // uint16_t port = ntohs(serverAddress->ia.sin_port); + // if (port > 0) { + // double delay = (m_addressIndex / m_addresses.size())*STATIC_SEARCH_BASE_DELAY_SEC+STATIC_SEARCH_MIN_DELAY_SEC; + // LOG(logLevelDebug, "Scheduling direct channel connection attempt for address %s with delay of %.3f seconds.", strBuffer, delay); + // m_context->getTimer()->scheduleAfterDelay(internal_from_this(), delay); + // } + // else { + // LOG(logLevelDebug, "Cannot schedule direct channel connection attempt for address %s (port not specified).", strBuffer); + // } + // } + m_context->getChannelSearchManager()->registerSearchInstance(internal_from_this(), penalize); } virtual void callback() OVERRIDE FINAL { // TODO cancellaction?! // TODO not in this timer thread !!! // TODO boost when a server (from address list) is started!!! IP vs address !!! + Transport::shared_pointer transport(m_transport); + if (transport) { + LOG(logLevelDebug, "Transport for channel %s is already active.", transport->getRemoteName().c_str()); + return; + } + int ix = m_addressIndex % m_addresses.size(); m_addressIndex++; if (m_addressIndex >= static_cast(m_addresses.size()*(STATIC_SEARCH_MAX_MULTIPLIER+1))) @@ -3667,7 +3797,12 @@ class InternalClientContextImpl : old_transport.swap(m_transport); m_transport.swap(transport); - m_transport->enqueueSendRequest(internal_from_this()); + shared_pointer thisChannelPointer = internal_from_this(); + m_transport->enqueueSendRequest(thisChannelPointer); + if (m_initiateSearch) + { + m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); + } } } @@ -3957,7 +4092,7 @@ class InternalClientContextImpl : static size_t num_instances; InternalClientContextImpl(const Configuration::shared_pointer& conf) : - m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), + m_addressList(""), m_autoAddressList(true), m_serverPort(PVA_SERVER_PORT), m_nsAddressList(""), m_nsAddressIndex(0), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), m_broadcastPort(PVA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_lastCID(0x10203040), m_lastIOID(0x80706050), @@ -3999,6 +4134,91 @@ class InternalClientContextImpl : return m_searchTransport; } + virtual Transport::shared_pointer getNameServerSearchTransport() + { + if (!m_nsAddresses.size()) + { + return Transport::shared_pointer(); + } + createNameServerConnector(); + if (!m_nsConnector.get()) + { + return Transport::shared_pointer(); + } + + m_nsTransportEvent.tryWait(); + Transport::shared_pointer nsTransport; + bool nsSearchScheduled = false; + for (unsigned int i = 0; i < m_nsAddresses.size(); i++) + { + { + Lock L(m_nsTransportMapMutex); + NsTransportMap::iterator it = m_nsTransportMap.find(m_nsAddressIndex); + if (it != m_nsTransportMap.end()) + { + nsTransport = it->second; + break; + } + else + { + NsTransportConnectCallback::shared_pointer c = m_nsTransportConnectCallbackMap[m_nsAddressIndex]; + Timer::shared_pointer t = m_nsTransportConnectCallbackTimerMap[m_nsAddressIndex]; + if (!c->inProgress()) + { + t->scheduleAfterDelay(c, 0); + nsSearchScheduled = true; + } + } + m_nsAddressIndex = (m_nsAddressIndex+1) % m_nsAddresses.size(); + } + } + + if (!nsTransport && nsSearchScheduled) + { + // Wait to get transport if we do not have it and if + // we just scheduled name server search + m_nsTransportEvent.wait(MAX_NS_TRANSPORT_WAIT_TIME); + Lock L(m_nsTransportMapMutex); + if (!m_nsTransportMap.empty()) + { + nsTransport = m_nsTransportMap.begin()->second; + } + } + return nsTransport; + } + + virtual void releaseNameServerSearchTransport(const Transport::shared_pointer& nsTransport=Transport::shared_pointer(), bool releaseAllConnections=true) + { + if (nsTransport) + { + LOG(logLevelDebug, "Releasing transport used for name server channel %d", m_nsChannel->getID()); + nsTransport->release(m_nsChannel->getID()); + } + { + Lock L(m_nsTransportMapMutex); + for (unsigned int i = 0; i < m_nsAddresses.size(); i++) + { + NsTransportMap::iterator it = m_nsTransportMap.find(i); + if (it != m_nsTransportMap.end()) + { + Transport::shared_pointer nst = it->second; + if (!nst->isUsed()) + { + // Release unused given connection only, unless + // other connections are no longer needed + if (nst == nsTransport || releaseAllConnections) + { + LOG(logLevelDebug, "Closing name server transport for address %s", m_nsTransportConnectCallbackMap[i]->getNsAddress().c_str()); + nst->close(); + nst.reset(); + m_nsTransportMap.erase(it); + } + } + } + } + } + } + virtual void initialize() OVERRIDE FINAL { Lock lock(m_contextMutex); @@ -4019,6 +4239,8 @@ class InternalClientContextImpl : out << "VERSION : " << m_version.getVersionString() << std::endl; out << "ADDR_LIST : " << m_addressList << std::endl; out << "AUTO_ADDR_LIST : " << (m_autoAddressList ? "true" : "false") << std::endl; + out << "SERVER_PORT : " << m_serverPort << std::endl; + out << "NAME_SERVERS : " << m_nsAddressList << std::endl; out << "CONNECTION_TIMEOUT : " << m_connectionTimeout << std::endl; out << "BEACON_PERIOD : " << m_beaconPeriod << std::endl; out << "BROADCAST_PORT : " << m_broadcastPort << std::endl;; @@ -4055,6 +4277,7 @@ class InternalClientContextImpl : // // cleanup // + releaseNameServerSearchTransport(); m_timer->close(); @@ -4110,11 +4333,42 @@ class InternalClientContextImpl : SET_LOG_LEVEL(logLevelDebug); m_addressList = m_configuration->getPropertyAsString("EPICS_PVA_ADDR_LIST", m_addressList); + LOG(logLevelDebug, "Configured PVA address list: %s", m_addressList.c_str()); m_autoAddressList = m_configuration->getPropertyAsBoolean("EPICS_PVA_AUTO_ADDR_LIST", m_autoAddressList); + m_serverPort = m_configuration->getPropertyAsInteger("EPICS_PVA_SERVER_PORT", m_serverPort); + LOG(logLevelDebug, "Configured server port: %d", m_serverPort); + m_nsAddressList = m_configuration->getPropertyAsString("EPICS_PVA_NAME_SERVERS", m_nsAddressList); + LOG(logLevelDebug, "Configured name server address list: %s", m_nsAddressList.c_str()); m_connectionTimeout = m_configuration->getPropertyAsFloat("EPICS_PVA_CONN_TMO", m_connectionTimeout); m_beaconPeriod = m_configuration->getPropertyAsFloat("EPICS_PVA_BEACON_PERIOD", m_beaconPeriod); m_broadcastPort = m_configuration->getPropertyAsInteger("EPICS_PVA_BROADCAST_PORT", m_broadcastPort); + LOG(logLevelDebug, "Configured broadcast port: %d", m_broadcastPort); m_receiveBufferSize = m_configuration->getPropertyAsInteger("EPICS_PVA_MAX_ARRAY_BYTES", m_receiveBufferSize); + getSocketAddressList(m_nsAddresses, m_nsAddressList, m_serverPort); + } + + void createNameServerConnector() + { + if (!m_nsConnector.get() && m_nsAddresses.size()) + { + // Create NS connector + LOG(logLevelDebug, "Creating internal name server channel and connector"); + InetAddrVector nsAddresses; // avoid direct connection attempts + InternalClientContextImpl::shared_pointer thisPointer(internal_from_this()); + std::string nsChannelName = "__NS_CHANNEL__"; + bool initiateSearch = false; + m_nsChannel = createChannelInternal(nsChannelName, DefaultChannelRequester::build(), PVA_CHANNEL_SEARCH_PRIORITY, nsAddresses, initiateSearch); + m_nsConnector.reset(new BlockingTCPConnector(thisPointer, m_receiveBufferSize, m_connectionTimeout)); + + // Create callback timers for each address + for (unsigned int i = 0; i < m_nsAddresses.size(); i++) + { + NsTransportConnectCallback::shared_pointer c(new NsTransportConnectCallback(thisPointer, i)); + m_nsTransportConnectCallbackMap[i] = c; + Timer::shared_pointer t(new Timer("NS connection timer", highPriority)); + m_nsTransportConnectCallbackTimerMap[i] = t; + } + } } void internalInitialize() { @@ -4127,6 +4381,7 @@ class InternalClientContextImpl : // stores many weak_ptr m_responseHandler.reset(new ClientResponseHandler(thisPointer)); + m_nsResponseHandler.reset(new NameServerClientResponseHandler(thisPointer)); m_channelSearchManager.reset(new ChannelSearchManager(thisPointer)); @@ -4385,8 +4640,7 @@ class InternalClientContextImpl : */ // TODO no minor version with the addresses // TODO what if there is an channel with the same name, but on different host! - ClientChannelImpl::shared_pointer createChannelInternal(std::string const & name, ChannelRequester::shared_pointer const & requester, short priority, - const InetAddrVector& addresses) OVERRIDE FINAL { // TODO addresses + ClientChannelImpl::shared_pointer createChannelInternal(std::string const & name, ChannelRequester::shared_pointer const & requester, short priority, const InetAddrVector& addresses, bool initiateSearch) OVERRIDE FINAL { // TODO addresses checkState(); checkChannelName(name); @@ -4404,7 +4658,7 @@ class InternalClientContextImpl : * as our channels. */ pvAccessID cid = generateCID(); - return InternalChannelImpl::create(internal_from_this(), cid, name, requester, priority, addresses); + return InternalChannelImpl::create(internal_from_this(), cid, name, requester, priority, addresses, initiateSearch); } catch(std::exception& e) { LOG(logLevelError, "createChannelInternal() exception: %s\n", e.what()); return ClientChannelImpl::shared_pointer(); @@ -4430,6 +4684,118 @@ class InternalClientContextImpl : */ bool m_autoAddressList; + /** + * Define server port + */ + int m_serverPort; + + /** + * A space-separated list of name server addresses for process variable name resolution. + * Each address must be of the form: ip.number:port or host.name:port + */ + string m_nsAddressList; + + /** + * List of name server addresses. + */ + InetAddrVector m_nsAddresses; + + /** + * Index of currently used name server address (rollover pointer in a list). + */ + int m_nsAddressIndex; + + /** + * Name server channel + */ + ClientChannelImpl::shared_pointer m_nsChannel; + + /** + * Name server connector + */ + epics::auto_ptr m_nsConnector; + + /** + * Name server transport + */ + Event m_nsTransportEvent; + typedef std::map NsTransportMap; + NsTransportMap m_nsTransportMap; + Mutex m_nsTransportMapMutex; + class NsTransportConnectCallback : public TimerCallback + { + public: + POINTER_DEFINITIONS(NsTransportConnectCallback); + NsTransportConnectCallback(const InternalClientContextImpl::shared_pointer& iccImpl, int nsAddrIndex) : + TimerCallback(), + m_iccImpl(iccImpl), + m_nsAddrIndex(nsAddrIndex), + m_inProgress(false) + { + char strBuffer[24]; + osiSockAddr* serverAddress = &iccImpl->m_nsAddresses[nsAddrIndex]; + ipAddrToDottedIP(&serverAddress->ia, strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "Initializing name server transport callback for address %s", strBuffer); + m_nsAddr = strBuffer; + } + virtual ~NsTransportConnectCallback() {} + virtual void callback() OVERRIDE FINAL + { + { + Lock L(m_iccImpl->m_nsTransportMapMutex); + if (m_iccImpl->m_nsTransportMap.find(m_nsAddrIndex) != m_iccImpl->m_nsTransportMap.end()) + { + LOG(logLevelDebug, "Already have name server transport for address %s", m_nsAddr.c_str()); + return; + } + LOG(logLevelDebug, "No name server transport for address %s", m_nsAddr.c_str()); + } + if (m_inProgress) + { + LOG(logLevelDebug, "Connection in progress to the name server with address %s", m_nsAddr.c_str()); + return; + } + m_inProgress = true; + try + { + osiSockAddr* serverAddress = &m_iccImpl->m_nsAddresses[m_nsAddrIndex]; + LOG(logLevelDebug, "Getting name server transport for address %s", m_nsAddr.c_str()); + Transport::shared_pointer nsTransport = m_iccImpl->m_nsConnector->connect(m_iccImpl->m_nsChannel, m_iccImpl->m_nsResponseHandler, *serverAddress, EPICS_PVA_MINOR_VERSION, PVA_CHANNEL_SEARCH_PRIORITY); + LOG(logLevelDebug, "Got name server transport for address %s", m_nsAddr.c_str()); + { + Lock L(m_iccImpl->m_nsTransportMapMutex); + m_iccImpl->m_nsTransportMap[m_nsAddrIndex] = nsTransport; + m_iccImpl->m_nsTransportEvent.signal(); + } + } + catch (std::exception& e) + { + LOG(logLevelDebug, "Could not get name server transport for %s: %s", m_nsAddr.c_str(), e.what()); + } + m_inProgress = false; + } + virtual void timerStopped() OVERRIDE FINAL + { + } + string getNsAddress() const + { + return m_nsAddr; + } + bool inProgress() const + { + return m_inProgress; + } + private: + InternalClientContextImpl::shared_pointer m_iccImpl; + int m_nsAddrIndex; + string m_nsAddr; + bool m_inProgress; + }; + typedef std::map NsTransportConnectCallbackMap; + NsTransportConnectCallbackMap m_nsTransportConnectCallbackMap; + typedef std::map NsTransportConnectCallbackTimerMap; + NsTransportConnectCallbackTimerMap m_nsTransportConnectCallbackTimerMap; + /** * If the context doesn't see a beacon from a server that it is connected to for * connectionTimeout seconds then a state-of-health message is sent to the server over TCP/IP. @@ -4484,6 +4850,11 @@ class InternalClientContextImpl : */ ClientResponseHandler::shared_pointer m_responseHandler; + /** + * Name server search response handler. + */ + NameServerClientResponseHandler::shared_pointer m_nsResponseHandler; + /** * Map of channels (keys are CIDs). */ diff --git a/src/remoteClient/pv/clientContextImpl.h b/src/remoteClient/pv/clientContextImpl.h index 77fe1e49..a713e471 100644 --- a/src/remoteClient/pv/clientContextImpl.h +++ b/src/remoteClient/pv/clientContextImpl.h @@ -94,10 +94,7 @@ class ClientContextImpl : public Context virtual void registerChannel(ClientChannelImpl::shared_pointer const & channel) = 0; virtual void unregisterChannel(ClientChannelImpl::shared_pointer const & channel) = 0; - virtual ClientChannelImpl::shared_pointer createChannelInternal(std::string const &name, - ChannelRequester::shared_pointer const & requester, - short priority, - const InetAddrVector& addresses) = 0; + virtual ClientChannelImpl::shared_pointer createChannelInternal(std::string const &name, ChannelRequester::shared_pointer const & requester, short priority, const InetAddrVector& addresses, bool initiateSearch) = 0; virtual ResponseRequest::shared_pointer getResponseRequest(pvAccessID ioid) = 0; virtual pvAccessID registerResponseRequest(ResponseRequest::shared_pointer const & request) = 0; diff --git a/src/server/pv/responseHandlers.h b/src/server/pv/responseHandlers.h index fb17a6d0..9888d5f0 100644 --- a/src/server/pv/responseHandlers.h +++ b/src/server/pv/responseHandlers.h @@ -148,7 +148,7 @@ class ServerChannelFindRequesterImpl: virtual ~ServerChannelFindRequesterImpl() {} void clear(); ServerChannelFindRequesterImpl* set(std::string _name, epics::pvData::int32 searchSequenceId, - epics::pvData::int32 cid, osiSockAddr const & sendTo, bool responseRequired, bool serverSearch); + epics::pvData::int32 cid, osiSockAddr const & sendTo, Transport::shared_pointer const & transport, bool responseRequired, bool serverSearch); virtual void channelFindResult(const epics::pvData::Status& status, ChannelFind::shared_pointer const & channelFind, bool wasFound) OVERRIDE FINAL; virtual std::tr1::shared_ptr getPeerInfo() OVERRIDE FINAL; @@ -163,6 +163,7 @@ class ServerChannelFindRequesterImpl: epics::pvData::int32 _searchSequenceId; epics::pvData::int32 _cid; osiSockAddr _sendTo; + Transport::shared_pointer _transport; bool _responseRequired; bool _wasFound; const ServerContextImpl::shared_pointer _context; diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index daa22b99..0b4b31bb 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -123,7 +123,6 @@ ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer c ,handle_cancel(context) ,m_handlerTable(CMD_CANCEL_REQUEST+1, &handle_bad) { - m_handlerTable[CMD_BEACON] = &handle_beacon; /* 0 */ m_handlerTable[CMD_CONNECTION_VALIDATION] = &handle_validation; /* 1 */ m_handlerTable[CMD_ECHO] = &handle_echo; /* 2 */ @@ -250,6 +249,9 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, size_t payloadSize, ByteBuffer* payloadBuffer) { + char strBuffer[24]; + ipAddrToDottedIP(&responseFrom->ia, strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "Server search handler is handling request from %s", strBuffer); AbstractServerResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); @@ -277,7 +279,15 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, // NOTE: htons might be a macro (e.g. vxWorks) int16 port = payloadBuffer->getShort(); - responseAddress.ia.sin_port = htons(port); + if (port) + { + responseAddress.ia.sin_port = htons(port); + } + else + { + LOG(logLevelDebug, "Server search handler is reusing connection port %d", ntohs(responseFrom->ia.sin_port)); + responseAddress.ia.sin_port = responseFrom->ia.sin_port; + } size_t protocolsCount = SerializeHelper::readSize(payloadBuffer, transport.get()); bool allowed = (protocolsCount == 0); @@ -285,22 +295,24 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, { string protocol = SerializeHelper::deserializeString(payloadBuffer, transport.get()); if (SUPPORTED_PROTOCOL == protocol) + { allowed = true; + } } // NOTE: we do not stop reading the buffer - transport->ensureData(2); const int32 count = payloadBuffer->getShort() & 0xFFFF; + LOG(logLevelDebug, "Search request from %s is allowed: %d, payload count is %d", strBuffer, int(allowed), count); // TODO DoS attack? // You bet! With a reply address encoded in the request we don't even need a forged UDP header. const bool responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0; // - // locally broadcast if unicast (qosCode & 0x80 == 0x80) via UDP + // locally broadcast if unicast (qosCode & QOS_GET_PUT == QOS_GET_PUT) via UDP // - if ((qosCode & 0x80) == 0x80) + if ((qosCode & QOS_GET_PUT) == QOS_GET_PUT) { BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast(transport); if (bt && bt->hasLocalMulticastAddress()) @@ -350,14 +362,14 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, const int32 cid = payloadBuffer->getInt(); const string name = SerializeHelper::deserializeString(payloadBuffer, transport.get()); // no name check here... - + LOG(logLevelDebug, "Search for channel %s, cid %d", name.c_str(), cid); if (allowed) { const std::vector& _providers = _context->getChannelProviders(); int providerCount = _providers.size(); std::tr1::shared_ptr tp(new ServerChannelFindRequesterImpl(_context, info, providerCount)); - tp->set(name, searchSequenceId, cid, responseAddress, responseRequired, false); + tp->set(name, searchSequenceId, cid, responseAddress, transport, responseRequired, false); for (int i = 0; i < providerCount; i++) _providers[i]->channelFind(name, tp); @@ -375,7 +387,7 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, delay = delay*0.1 + 0.05; std::tr1::shared_ptr tp(new ServerChannelFindRequesterImpl(_context, info, 1)); - tp->set("", searchSequenceId, 0, responseAddress, true, true); + tp->set("", searchSequenceId, 0, responseAddress, transport, true, true); TimerCallback::shared_pointer tc = tp; _context->getTimer()->scheduleAfterDelay(tc, delay); @@ -414,6 +426,7 @@ void ServerChannelFindRequesterImpl::timerStopped() } ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(std::string name, int32 searchSequenceId, int32 cid, osiSockAddr const & sendTo, + Transport::shared_pointer const & transport, bool responseRequired, bool serverSearch) { Lock guard(_mutex); @@ -421,6 +434,7 @@ ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(std::string _searchSequenceId = searchSequenceId; _cid = cid; _sendTo = sendTo; + _transport = transport; _responseRequired = responseRequired; _serverSearch = serverSearch; return this; @@ -436,7 +450,7 @@ void ServerChannelFindRequesterImpl::channelFindResult(const Status& /*status*/, { if ((_responseCount+1) == _expectedResponseCount) { - LOG(logLevelDebug,"[ServerChannelFindRequesterImpl::channelFindResult] More responses received than expected fpr channel '%s'!", _name.c_str()); + LOG(logLevelDebug,"[ServerChannelFindRequesterImpl::channelFindResult] More responses received than expected for channel '%s'!", _name.c_str()); } return; } @@ -455,12 +469,19 @@ void ServerChannelFindRequesterImpl::channelFindResult(const Status& /*status*/, _context->s_channelNameToProvider[_name] = channelFind->getChannelProvider(); } _wasFound = wasFound; - - BlockingUDPTransport::shared_pointer bt = _context->getBroadcastTransport(); - if (bt) + if (_transport && _transport->getType() == "tcp") { TransportSender::shared_pointer thisSender = shared_from_this(); - bt->enqueueSendRequest(thisSender); + _transport->enqueueSendRequest(thisSender); + } + else + { + BlockingUDPTransport::shared_pointer bt = _context->getBroadcastTransport(); + if (bt) + { + TransportSender::shared_pointer thisSender = shared_from_this(); + bt->enqueueSendRequest(thisSender); + } } } } @@ -472,6 +493,9 @@ std::tr1::shared_ptr ServerChannelFindRequesterImpl::getPeerInfo void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) { + char ipAddrStr[24]; + ipAddrToDottedIP(&_sendTo.ia, ipAddrStr, sizeof(ipAddrStr)); + LOG(logLevelDebug, "Search response will be sent to %s, was found: %d", ipAddrStr, int(_wasFound)); control->startMessage(CMD_SEARCH_RESPONSE, 12+4+16+2); Lock guard(_mutex); @@ -499,6 +523,9 @@ void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendContr } control->setRecipient(_sendTo); + + // send immediately + control->flush(true); } /****************************************************************************************/ @@ -2028,7 +2055,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* // This really shouldn't happen as the above ensures that _window_open *was* non-zero, // and only we (the sender) will decrement. message("Monitor Logic Error: send outside of window", epics::pvData::warningMessage); - LOG(logLevelError, "Monitor Logic Error: send outside of window %zu", _window_closed.size()); + LOG(logLevelError, "Monitor Logic Error: send outside of window %lu", static_cast(_window_closed.size())); } else { _window_closed.push_back(element.letGo()); diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index c4f47289..b75b31b0 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -9,6 +9,20 @@ ifneq (RTEMS,$(OS_CLASS)) TESTS += testChannelAccess endif +TESTPROD_HOST += testUdpChannelDiscovery +testUdpChannelDiscovery_SRCS = udpChannelDiscoveryTest.cpp +testUdpChannelDiscovery_SRCS += channelDiscoveryTest.cpp +TESTPROD_HOST += testNsChannelDiscovery +testNsChannelDiscovery_SRCS = nsChannelDiscoveryTest.cpp +testNsChannelDiscovery_SRCS += channelDiscoveryTest.cpp +ifneq (RTEMS,$(OS_CLASS)) +testHarness_SRCS += udpChannelDiscoveryTest.cpp +testHarness_SRCS += nsChannelDiscoveryTest.cpp +testHarness_SRCS += channelDiscoveryTest.cpp +TESTS += testUdpChannelDiscovery +TESTS += testNsChannelDiscovery +endif + TESTPROD_HOST += testCodec testCodec_SRCS = testCodec testHarness_SRCS += testCodec.cpp diff --git a/testApp/remote/channelAccessIFTest.cpp b/testApp/remote/channelAccessIFTest.cpp index 97f545ce..66786b18 100644 --- a/testApp/remote/channelAccessIFTest.cpp +++ b/testApp/remote/channelAccessIFTest.cpp @@ -59,7 +59,7 @@ inline bool compareWithTol(const T v1, const T v2, const T tol) int ChannelAccessIFTest::runAllTest() { - testPlan(152+EXTRA_STRESS_TESTS); + testPlan(153+EXTRA_STRESS_TESTS); epics::pvAccess::Configuration::shared_pointer base_config(ConfigurationBuilder() //.add("EPICS_PVA_DEBUG", "3") @@ -2007,6 +2007,13 @@ void ChannelAccessIFTest::test_channelArray() { return; } + succStatus = arrayReq->syncGetLength(false, getTimeoutSec()); + if (!succStatus) { + testFail("%s: an array getLength failed (3)", CURRENT_FUNCTION); + return; + } + testOk(arrayReq->getLength() == bigCapacity, "%s: retrieved length should be %lu", CURRENT_FUNCTION, (unsigned long) bigCapacity); + succStatus = arrayReq->syncGet(false, 0, 0, getTimeoutSec()); if (!succStatus) { testFail("%s: an array syncGet failed (9) ", CURRENT_FUNCTION); @@ -2019,6 +2026,12 @@ void ChannelAccessIFTest::test_channelArray() { CURRENT_FUNCTION, (unsigned long) bigCapacity); testOk(data4[0] == 1.1 , "%s: 4.check 0: %f == 1.1", CURRENT_FUNCTION, data4[0]); testOk(data4[1] == 2.2 , "%s: 4.check 1: %f == 2.2", CURRENT_FUNCTION, data4[1]); + if (data4.size() == bigCapacity) { + size_t i = bigCapacity-5; + for (; i < bigCapacity; i++) { + testDiag("%s: 4.check %d: %f", CURRENT_FUNCTION, int(i), data4[i]); + } + } /* if (data4.size() == bigCapacity) { size_t i = newCap; diff --git a/testApp/remote/channelDiscoveryTest.cpp b/testApp/remote/channelDiscoveryTest.cpp new file mode 100644 index 00000000..d722cc30 --- /dev/null +++ b/testApp/remote/channelDiscoveryTest.cpp @@ -0,0 +1,106 @@ +// disable buggy boost enable_shared_from_this assert code +#define BOOST_DISABLE_ASSERTS + +#include + +#include +#include + +#include +#include + +#include "channelDiscoveryTest.h" + +namespace TR1 = std::tr1; +namespace EPVA = epics::pvAccess; + +// int value, increment on process +const std::string ChannelDiscoveryTest::TEST_SIMPLECOUNTER_CHANNEL_NAME = "testSimpleCounter"; + +int ChannelDiscoveryTest::getNumberOfTests() +{ + return 1; +} + +int ChannelDiscoveryTest::runAllTests() +{ + testDiag("Starting channel discovery tests"); + m_provider = ChannelProviderRegistry::clients()->getProvider("pva"); + test_channelDiscovery(); + return testDone(); +} + +Channel::shared_pointer ChannelDiscoveryTest::createChannel(string channelName, bool debug ) +{ + TR1::shared_ptr channelReq(new SyncChannelRequesterImpl(debug)); + Channel::shared_pointer channel = getChannelProvider()->createChannel(channelName, channelReq); + return channel; +} + +Channel::shared_pointer ChannelDiscoveryTest::syncCreateChannel(string channelName, bool debug ) +{ + + TR1::shared_ptr channelReq(new SyncChannelRequesterImpl(debug)); + Channel::shared_pointer channel = getChannelProvider()->createChannel(channelName, channelReq); + bool isConnected = channelReq->waitUntilConnected(getTimeoutSec()); + if (!isConnected) { + std::cerr << "[" << channelName << "] failed to connect to the channel. " << std::endl; + return TR1::shared_ptr(); + } + + return channel; +} + + +SyncChannelGetRequesterImpl::shared_pointer ChannelDiscoveryTest::syncCreateChannelGet( + Channel::shared_pointer const & channel, string const & request, bool debug ) +{ + + TR1::shared_ptr + channelGetReq(new SyncChannelGetRequesterImpl(channel->getChannelName(), debug)); + + PVStructure::shared_pointer pvRequest = createRequest(request); + + ChannelGet::shared_pointer op(channel->createChannelGet(channelGetReq,pvRequest)); + bool succStatus = channelGetReq->waitUntilGetDone(getTimeoutSec()); + if (!succStatus) { + std::cerr << "[" << channel->getChannelName() << "] failed to get. " << std::endl; + return TR1::shared_ptr(); + } + return channelGetReq; +} + +void ChannelDiscoveryTest::test_channelGetInt(Channel::shared_pointer channel, string const & testMethodName) +{ + + string request = "record[process=true]field(value)"; + + SyncChannelGetRequesterImpl::shared_pointer channelGetReq = syncCreateChannelGet(channel, request); + if (!channelGetReq.get()) { + testFail("%s: channel get not created ", testMethodName.c_str()); + return; + } + + TR1::shared_ptr value = channelGetReq->getPVStructure()->getSubField("value"); + int previousValue = value->get(); + epicsThreadSleep(1.0); + bool succStatus = channelGetReq->syncGet(false, getTimeoutSec()); + if (!succStatus) { + testFail("%s: sync get failed ", testMethodName.c_str()); + return; + } + testOk((previousValue +1) == value->get(), "%s: testing the counter value change %d == %d", testMethodName.c_str(), previousValue +1, (int)value->get()); + channelGetReq->getChannelGet()->destroy(); + channel->destroy(); +} + +void ChannelDiscoveryTest::test_channelDiscovery() +{ + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + Channel::shared_pointer channel = syncCreateChannel(TEST_SIMPLECOUNTER_CHANNEL_NAME); + if (!channel.get()) { + testFail("%s: channel not created ", CURRENT_FUNCTION); + return; + } + test_channelGetInt(channel, CURRENT_FUNCTION); +} diff --git a/testApp/remote/channelDiscoveryTest.h b/testApp/remote/channelDiscoveryTest.h new file mode 100644 index 00000000..374ee078 --- /dev/null +++ b/testApp/remote/channelDiscoveryTest.h @@ -0,0 +1,37 @@ +#ifndef CHANNEL_DISCOVERY_H +#define CHANNEL_DISCOVERY_H + +#include +#include +#include + +#include "syncTestRequesters.h" + +class ChannelDiscoveryTest { + +public: + + int getNumberOfTests(); + int runAllTests(); + virtual ~ChannelDiscoveryTest() {} + +protected: + + static const std::string TEST_SIMPLECOUNTER_CHANNEL_NAME; + + ChannelProvider::shared_pointer getChannelProvider() { return m_provider; } + long getTimeoutSec() {return 5;} + bool isLocal() {return false;} + + Channel::shared_pointer createChannel(std::string channelName, bool debug=false); + Channel::shared_pointer syncCreateChannel(std::string channelName, bool debug=false); + SyncChannelGetRequesterImpl::shared_pointer syncCreateChannelGet(Channel::shared_pointer const & channel, std::string const & request, bool debug=false); + +private: + ChannelProvider::shared_pointer m_provider; + + void test_channelGetInt(Channel::shared_pointer channel, std::string const & testMethodName); + void test_channelDiscovery(); +}; + +#endif diff --git a/testApp/remote/nsChannelDiscoveryTest.cpp b/testApp/remote/nsChannelDiscoveryTest.cpp new file mode 100644 index 00000000..c74ec0b0 --- /dev/null +++ b/testApp/remote/nsChannelDiscoveryTest.cpp @@ -0,0 +1,71 @@ +// Test channel discovery using name server search + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "channelDiscoveryTest.h" + +#define TESTSERVERNOMAIN +#include "testServer.cpp" + +namespace EPVA = epics::pvAccess; + + +int runAllTests() { + ChannelDiscoveryTest cdTest; + testPlan(cdTest.getNumberOfTests()); + + testDiag("Channel discovery type: TCP (name server) search"); + EPVA::Configuration::shared_pointer baseConfig(ConfigurationBuilder() + .add("EPICS_PVAS_INTF_ADDR_LIST", "127.0.0.1") + .add("EPICS_PVA_SERVER_PORT", "0") + .add("EPICS_PVAS_BROADCAST_PORT", "0") + .push_map() + .build()); + + EPVA::Configuration::shared_pointer serverConfig(ConfigurationBuilder() + .push_config(baseConfig) + .push_map() + .build()); + + TestServer::shared_pointer testServer(new TestServer(serverConfig)); + std::ostringstream portStr; + portStr << "127.0.0.1:" << testServer->getServerPort(); + testDiag("Test server is using ports TCP: %u, UDP Broadcast: %u", + testServer->getServerPort(), + testServer->getBroadcastPort()); + + EPVA::Configuration::shared_pointer clientConfig(ConfigurationBuilder() + .push_config(baseConfig) + .add("EPICS_PVA_NAME_SERVERS", portStr.str()) + .push_map() + .build()); + + ConfigurationFactory::registerConfiguration("pvAccess-client", clientConfig); + testDiag("Starting client factory"); + epics::pvAccess::ClientFactory::start(); + + return cdTest.runAllTests(); +} + +MAIN(testNsChannelDiscovery) +{ + try{ + SET_LOG_LEVEL(logLevelError); + return runAllTests(); + } + catch(std::exception& e) { + PRINT_EXCEPTION(e); + std::cerr << "Unhandled exception: " << e.what() << "\n"; + return 1; + } +} diff --git a/testApp/remote/syncTestRequesters.h b/testApp/remote/syncTestRequesters.h index 0781cea1..d61f8aff 100644 --- a/testApp/remote/syncTestRequesters.h +++ b/testApp/remote/syncTestRequesters.h @@ -223,6 +223,14 @@ class SyncChannelRequesterImpl : public epics::pvAccess::ChannelRequester, publi { Lock lock(m_pointerMutex); m_stateChangeCount++; + if (connectionState == Channel::CONNECTED) + { + setConnectedStatus(true); + } + else + { + setConnectedStatus(false); + } } signalEvent(); diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index ce99443f..c6ea1640 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -345,9 +345,11 @@ class TestCodec: public AbstractCodec { void release(pvAccessID clientId) {} + bool isUsed() {return false;} + std::string getType() const { - return std::string("TCP"); + return "tcp"; } const osiSockAddr& getRemoteAddress() const { diff --git a/testApp/remote/udpChannelDiscoveryTest.cpp b/testApp/remote/udpChannelDiscoveryTest.cpp new file mode 100644 index 00000000..480a8d9c --- /dev/null +++ b/testApp/remote/udpChannelDiscoveryTest.cpp @@ -0,0 +1,69 @@ +// Test channel discovery over UDP + +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "channelDiscoveryTest.h" + +#define TESTSERVERNOMAIN +#include "testServer.cpp" + +namespace EPVA = epics::pvAccess; + + +int runAllTests() { + ChannelDiscoveryTest cdTest; + testPlan(cdTest.getNumberOfTests()); + testDiag("Channel discovery type: UDP search"); + + EPVA::Configuration::shared_pointer baseConfig(ConfigurationBuilder() + .add("EPICS_PVAS_INTF_ADDR_LIST", "127.0.0.1") + .add("EPICS_PVA_ADDR_LIST", "127.0.0.1") + .add("EPICS_PVA_AUTO_ADDR_LIST", "0") + .add("EPICS_PVA_SERVER_PORT", "0") + .add("EPICS_PVA_BROADCAST_PORT", "0") + .push_map() + .build()); + + EPVA::Configuration::shared_pointer serverConfig(ConfigurationBuilder() + .push_config(baseConfig) + .push_map() + .build()); + + TestServer::shared_pointer testServer(new TestServer(serverConfig)); + testDiag("Test server is using ports TCP: %u, UDP Broadcast: %u", + testServer->getServerPort(), + testServer->getBroadcastPort()); + + EPVA::Configuration::shared_pointer clientConfig(ConfigurationBuilder() + .push_config(baseConfig) + .add("EPICS_PVA_BROADCAST_PORT", testServer->getBroadcastPort()) + .push_map() + .build()); + + ConfigurationFactory::registerConfiguration("pvAccess-client", clientConfig); + testDiag("Starting client factory"); + epics::pvAccess::ClientFactory::start(); + return cdTest.runAllTests(); +} + +MAIN(testUdpChannelDiscovery) +{ + try{ + SET_LOG_LEVEL(logLevelError); + return runAllTests(); + } + catch(std::exception& e) { + PRINT_EXCEPTION(e); + std::cerr << "Unhandled exception: " << e.what() << "\n"; + return 1; + } +}