diff --git a/.gitmodules b/.gitmodules index d88eb124579..cb5d20fdadd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -10,9 +10,6 @@ [submodule "esp/services/ws_sql/libantlr3c"] path = esp/services/ws_sql/libantlr3c url = https://github.com/hpcc-systems/libantlr3c.git -[submodule "system/aeron"] - path = system/aeron - url = https://github.com/hpcc-systems/aeron.git [submodule "vcpkg"] path = vcpkg url = https://github.com/hpcc-systems/vcpkg.git diff --git a/cmake_modules/commonSetup.cmake b/cmake_modules/commonSetup.cmake index 1ccf2ee5607..c4ff5f4c911 100644 --- a/cmake_modules/commonSetup.cmake +++ b/cmake_modules/commonSetup.cmake @@ -136,11 +136,6 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "") endif() # The following options need to be set after the project() command - if (WIN32) - option(USE_AERON "Include the Aeron message protocol" OFF) - else() - option(USE_AERON "Include the Aeron message protocol" ON) - endif() if (APPLE OR WIN32) option(USE_NUMA "Configure use of numa" OFF) else() @@ -748,10 +743,6 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "") SET(CPPUNIT_LIBRARIES "") ENDIF(USE_CPPUNIT) - IF (USE_AERON) - add_definitions (-D_USE_AERON) - ENDIF(USE_AERON) - IF (CONTAINERIZED) add_definitions (-D_CONTAINERIZED) ENDIF(CONTAINERIZED) diff --git a/initfiles/componentfiles/configschema/xsd/roxie.xsd b/initfiles/componentfiles/configschema/xsd/roxie.xsd index d62276b30b0..14da617e884 100644 --- a/initfiles/componentfiles/configschema/xsd/roxie.xsd +++ b/initfiles/componentfiles/configschema/xsd/roxie.xsd @@ -338,9 +338,6 @@ - diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 422153cdfb8..c8786b9f79c 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -313,7 +313,6 @@ extern bool debugPermitted; extern bool useRemoteResources; extern bool checkFileDate; extern bool lazyOpen; -extern bool useAeron; extern bool ignoreOrphans; extern bool doIbytiDelay; extern bool copyResources; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 9f20d67fc63..9726d5643ab 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -127,7 +127,6 @@ bool checkFileDate; bool lazyOpen; bool localAgent = false; bool encryptInTransit; -bool useAeron; bool ignoreOrphans; bool doIbytiDelay = true; bool copyResources; @@ -1008,7 +1007,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) envInstallNASHooks(nas); } #endif - useAeron = topology->getPropBool("@useAeron", false); doIbytiDelay = topology->getPropBool("@doIbytiDelay", true); minIbytiDelay = topology->getPropInt("@minIbytiDelay", 2); initIbytiDelay = topology->getPropInt("@initIbytiDelay", 50); @@ -1095,7 +1093,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", udpFlowSocketsSize); udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", udpLocalWriteSocketSize); #if !defined(_CONTAINERIZED) && !defined(SUBCHANNELS_IN_HEADER) - roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true) && !useAeron; // enable use of multicast for sending requests to agents + roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true); // enable use of multicast for sending requests to agents #endif udpResendLostPackets = topology->getPropBool("@udpResendLostPackets", true); @@ -1337,9 +1335,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) #else topology->addPropBool("@linuxOS", true); #endif - if (useAeron) - setAeronProperties(topology); - #ifdef _CONTAINERIZED allQuerySetNames.append(roxieName); #else @@ -1693,8 +1688,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) #ifndef _CONTAINERIZED perfMonHook.clear(); #endif - stopAeronDriver(); - leakChecker = strdup("Make sure leak checking is working"); roxiemem::releaseRoxieHeap(); UseSysLogForOperatorMessages(false); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 8b340dba1d0..916daccdc7d 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -2947,25 +2947,6 @@ class RoxieUdpSocketQueueManager : public RoxieSocketQueueManager }; -class RoxieAeronSocketQueueManager : public RoxieSocketQueueManager -{ -public: - RoxieAeronSocketQueueManager(unsigned _numWorkers, bool encryptionInTransit) : RoxieSocketQueueManager(_numWorkers) - { - unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT); - SocketEndpoint ep(dataPort, myNode.getIpAddress()); - receiveManager.setown(createAeronReceiveManager(ep, encryptionInTransit)); - assertex(!myNode.getIpAddress().isNull()); - sendManager.setown(createAeronSendManager(dataPort, fastLaneQueue ? 3 : 2, myNode.getIpAddress(), encryptionInTransit)); - } - - virtual void abortPendingData(const SocketEndpoint &ep) override - { - } - -}; - - #ifdef _MSC_VER #pragma warning( pop ) #endif @@ -3768,8 +3749,6 @@ extern IRoxieOutputQueueManager *createOutputQueueManager(unsigned numWorkers, b { if (localAgent) return new RoxieLocalQueueManager(numWorkers); - else if (useAeron) - return new RoxieAeronSocketQueueManager(numWorkers, encrypted); else return new RoxieUdpSocketQueueManager(numWorkers, encrypted); diff --git a/roxie/roxie/roxie.hpp b/roxie/roxie/roxie.hpp index b0bece3235a..4025d40e24f 100644 --- a/roxie/roxie/roxie.hpp +++ b/roxie/roxie/roxie.hpp @@ -88,14 +88,6 @@ #define ROXIE_LAYOUT_MISMATCH ROXIE_ERROR_START+66 #define ROXIE_TLS_ERROR ROXIE_ERROR_START+67 -// Aeron layer errors - -#define ROXIE_AERON_ERROR ROXIE_ERROR_START+70 -#define ROXIE_PUBLICATION_CLOSED ROXIE_ERROR_START+71 -#define ROXIE_PUBLICATION_NOT_CONNECTED ROXIE_ERROR_START+72 - - - // MORE: move back to ccd.hpp when no longer public (used by roxieconfig) #define ROXIE_SLA_LOGIC diff --git a/roxie/udplib/udpaeron.cpp b/roxie/udplib/udpaeron.cpp deleted file mode 100644 index b86c773dce7..00000000000 --- a/roxie/udplib/udpaeron.cpp +++ /dev/null @@ -1,465 +0,0 @@ -/*############################################################################## - - HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -############################################################################## */ - -#include -#include "jexcept.hpp" -#include "jqueue.tpp" -#include "udplib.hpp" -#include "udpipmap.hpp" -#include "udpmsgpk.hpp" -#include "udpsha.hpp" -#include "udptrs.hpp" -#include "roxie.hpp" -#ifdef _USE_AERON -#include - -extern "C" { -#include "aeronmd.h" -#include "concurrent/aeron_atomic.h" -#include "aeron_driver_context.h" -#include "util/aeron_properties_util.h" -} - -// Configurable variables // MORE - add relevant code to Roxie -bool useEmbeddedAeronDriver = true; -unsigned aeronConnectTimeout = 5000; -unsigned aeronPollFragmentsLimit = 10; -unsigned aeronIdleSleepMs = 1; - -unsigned aeronMtuLength = 0; -unsigned aeronSocketRcvbuf = 0; -unsigned aeronSocketSndbuf = 0; -unsigned aeronInitialWindow = 0; - -extern UDPLIB_API void setAeronProperties(const IPropertyTree *config) -{ - useEmbeddedAeronDriver = config->getPropBool("@aeronUseEmbeddedDriver", true); - aeronConnectTimeout = config->getPropInt("@aeronConnectTimeout", 5000); - aeronPollFragmentsLimit = config->getPropInt("@aeronPollFragmentsLimit", 10); - aeronIdleSleepMs = config->getPropInt("@aeronIdleSleepMs", 1); - - aeronMtuLength = config->getPropInt("@aeronMtuLength", 0); - aeronSocketRcvbuf = config->getPropInt("@aeronSocketRcvbuf", 0); - aeronSocketSndbuf = config->getPropInt("@aeronSocketSndbuf", 0); - aeronInitialWindow = config->getPropInt("@aeronInitialWindow", 0); - -} - -static std::thread aeronDriverThread; -static InterruptableSemaphore driverStarted; - -std::atomic aeronDriverRunning = { false }; - -extern UDPLIB_API void stopAeronDriver() -{ - aeronDriverRunning = false; - if (aeronDriverThread.joinable()) - aeronDriverThread.join(); -} - -void sigint_handler(int signal) -{ - stopAeronDriver(); -} - -void termination_hook(void *state) -{ - stopAeronDriver(); -} - -inline bool is_running() -{ - return aeronDriverRunning; -} - -int startAeronDriver() -{ - aeron_driver_context_t *context = nullptr; - aeron_driver_t *driver = nullptr; - try - { - if (aeron_driver_context_init(&context) < 0) - throw makeStringExceptionV(MSGAUD_operator, -1, "AERON: error initializing context (%d) %s", aeron_errcode(), aeron_errmsg()); - - context->termination_hook_func = termination_hook; - context->dirs_delete_on_start = true; - context->warn_if_dirs_exist = false; - context->term_buffer_sparse_file = false; - - if (aeronMtuLength) context->mtu_length = aeronMtuLength; - if (aeronSocketRcvbuf) context->socket_rcvbuf = aeronSocketRcvbuf; - if (aeronSocketSndbuf) context->socket_sndbuf = aeronSocketSndbuf; - if (aeronInitialWindow) context->initial_window_length = aeronInitialWindow; - - if (aeron_driver_init(&driver, context) < 0) - throw makeStringExceptionV(MSGAUD_operator, -1, "AERON: error initializing driver (%d) %s", aeron_errcode(), aeron_errmsg()); - - if (aeron_driver_start(driver, true) < 0) - throw makeStringExceptionV(MSGAUD_operator, -1, "AERON: error starting driver (%d) %s", aeron_errcode(), aeron_errmsg()); - - driverStarted.signal(); - aeronDriverRunning = true; - while (is_running()) - { - aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)); - } - aeron_driver_close(driver); - aeron_driver_context_close(context); - } - catch (IException *E) - { - aeron_driver_close(driver); - aeron_driver_context_close(context); - driverStarted.interrupt(E); - } - catch (...) - { - aeron_driver_close(driver); - aeron_driver_context_close(context); - driverStarted.interrupt(makeStringException(0, "failed to start Aeron (unknown exception)")); - } - return 0; -} - -class CRoxieAeronReceiveManager : public CInterfaceOf -{ -private: - typedef std::map uid_map; - uid_map collators; - CriticalSection collatorsLock; // protects access to collators map - - std::shared_ptr aeron; - std::shared_ptr loSub; - std::shared_ptr hiSub; - std::shared_ptr slaSub; - std::thread receiveThread; - std::atomic running = { true }; - const std::chrono::duration idleSleepMs; - bool encrypted; -public: - CRoxieAeronReceiveManager(const SocketEndpoint &myEndpoint, bool _encrypted) - : idleSleepMs(aeronIdleSleepMs), encrypted(_encrypted) - { - if (useEmbeddedAeronDriver && !is_running()) - { - aeronDriverThread = std::thread([]() { startAeronDriver(); }); - driverStarted.wait(); - } - aeron::Context context; - - if (udpTraceLevel) - { - context.newSubscriptionHandler( - [](const std::string& channel, std::int32_t streamId, std::int64_t correlationId) - { - DBGLOG("AeronReceiver: Subscription: %s %" I64F "d %d", channel.c_str(), (__int64) correlationId, streamId); - }); - context.availableImageHandler([](aeron::Image &image) - { - DBGLOG("AeronReceiver: Available image correlationId=%" I64F "d, sessionId=%d at position %" I64F "d from %s", (__int64) image.correlationId(), image.sessionId(), (__int64) image.position(), image.sourceIdentity().c_str()); - }); - context.unavailableImageHandler([](aeron::Image &image) - { - DBGLOG("AeronReceiver: Unavailable image correlationId=%" I64F "d, sessionId=%d at position %" I64F "d from %s", (__int64) image.correlationId(), image.sessionId(), (__int64) image.position(), image.sourceIdentity().c_str()); - }); - } - aeron = aeron::Aeron::connect(context); - loSub = addSubscription(myEndpoint, 0); - hiSub = addSubscription(myEndpoint, 1); - slaSub = addSubscription(myEndpoint, 2); - aeron::fragment_handler_t handler = [this](const aeron::AtomicBuffer& buffer, aeron::util::index_t offset, aeron::util::index_t length, const aeron::Header& header) - { - collatePacket(buffer.buffer() + offset, length); - }; - - receiveThread = std::thread([this, handler]() - { - while (running) - { - int fragmentsRead = slaSub->poll(handler, aeronPollFragmentsLimit); - if (!fragmentsRead) - fragmentsRead = hiSub->poll(handler, aeronPollFragmentsLimit); - if (!fragmentsRead) - fragmentsRead = loSub->poll(handler, aeronPollFragmentsLimit); - if (!fragmentsRead) - std::this_thread::sleep_for(idleSleepMs); - } - }); - } - ~CRoxieAeronReceiveManager() - { - running = false; - receiveThread.join(); - } - - void collatePacket( std::uint8_t *buffer, aeron::util::index_t length) - { - const UdpPacketHeader *pktHdr = (UdpPacketHeader*) buffer; - assert(pktHdr->length == length); - - if (udpTraceLevel >= 4) - { - StringBuffer s; - DBGLOG("AeronReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%s", - pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->node.getTraceText(s).str()); - } - - Linked msgColl; - bool isDefault = false; // Don't trace inside the spinBlock! - { - CriticalBlock b(collatorsLock); - try - { - msgColl.set(collators[pktHdr->ruid]); - if (!msgColl) - { - msgColl.set(collators[RUID_DISCARD]); - // We could consider sending an abort to the agent, but it should have already been done by ccdserver code - isDefault = true; - unwantedDiscarded++; - } - } - catch (IException *E) - { - EXCLOG(E); - E->Release(); - } - catch (...) - { - IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run"); - EXCLOG(E); - E->Release(); - } - } - if (udpTraceLevel && isDefault) - { - StringBuffer s; - DBGLOG("AeronReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str()); - } - if (msgColl) - msgColl->attach_data(buffer, length); - } - - // Note - some of this code could be in a common base class with udpreceivemanager, but hope to kill that at some point - virtual IMessageCollator *createMessageCollator(roxiemem::IRowManager *rowManager, ruid_t ruid) override - { - CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted); - if (udpTraceLevel >= 2) - DBGLOG("AeronReceiver: createMessageCollator %p %u", msgColl, ruid); - { - CriticalBlock b(collatorsLock); - collators[ruid] = msgColl; - } - msgColl->Link(); - return msgColl; - } - - virtual void detachCollator(const IMessageCollator *msgColl) override - { - ruid_t ruid = msgColl->queryRUID(); - if (udpTraceLevel >= 2) - DBGLOG("AeronReceiver: detach %p %u", msgColl, ruid); - { - CriticalBlock b(collatorsLock); - collators.erase(ruid); - } - msgColl->Release(); - } - -private: - - std::shared_ptr addSubscription(const SocketEndpoint &myEndpoint, int queue) - { - StringBuffer channel("aeron:udp?endpoint="); - myEndpoint.getEndpointHostText(channel); - std::int64_t id = aeron->addSubscription(channel.str(), queue); - std::shared_ptr subscription = aeron->findSubscription(id); - while (!subscription) - { - std::this_thread::yield(); - subscription = aeron->findSubscription(id); - } - return subscription; - } -}; - -class UdpAeronReceiverEntry : public IUdpReceiverEntry -{ -private: - std::shared_ptr aeron; - unsigned numQueues; - std::vector> publications; - const IpAddress dest; - -public: - UdpAeronReceiverEntry(const IpAddress &_ip, unsigned _dataPort, std::shared_ptr _aeron, unsigned _numQueues) - : dest(_ip), aeron(_aeron), numQueues(_numQueues) - { - StringBuffer channel("aeron:udp?endpoint="); - dest.getHostText(channel); - channel.append(':').append(_dataPort); - for (unsigned queue = 0; queue < numQueues; queue++) - { - if (udpTraceLevel) - DBGLOG("AeronSender: Creating publication to channel %s for queue %d", channel.str(), queue); - std::int64_t id = aeron->addPublication(channel.str(), queue); - std::shared_ptr publication = aeron->findPublication(id); - // Wait for the publication to be valid - while (!publication) - { - std::this_thread::yield(); - publication = aeron->findPublication(id); - } - if ((unsigned) publication->maxPayloadLength() < DATA_PAYLOAD) - throw makeStringExceptionV(ROXIE_AERON_ERROR, "AeronSender: maximum payload %u too small (%u required)", (unsigned) publication->maxPayloadLength(), (unsigned) DATA_PAYLOAD); - if (udpTraceLevel <= 4) - DBGLOG("AeronSender: Publication maxima: %d %d", publication->maxPayloadLength(), publication->maxMessageLength()); - publications.push_back(publication); - // Wait for up to 5 seconds to connect to a subscriber - unsigned start = msTick(); - while (!publication->isConnected()) - { - Sleep(10); - if (msTick()-start > aeronConnectTimeout) - throw makeStringExceptionV(ROXIE_PUBLICATION_NOT_CONNECTED, "AeronSender: Publication not connected to channel %s after %d seconds ", channel.str(), aeronConnectTimeout); - } - } - } - void write(roxiemem::DataBuffer *buffer, unsigned len, unsigned queue) - { - unsigned backoff = 1; - aeron::concurrent::AtomicBuffer srcBuffer(reinterpret_cast(&buffer->data), len); - for (;;) - { - const std::int64_t result = publications[queue]->offer(srcBuffer, 0, len); - if (result < 0) - { - if (aeron::BACK_PRESSURED == result || aeron::ADMIN_ACTION == result) - { - // MORE - experiment with best policy. spinning without delay may be appropriate too, depending on cpu availability - // and whether data write thread is high priority - MilliSleep(backoff-1); // MilliSleep(0) just does a threadYield - if (backoff < 256) - backoff = backoff*2; - continue; - } - StringBuffer target; - dest.getHostText(target); - if (aeron::NOT_CONNECTED == result) - throw makeStringExceptionV(ROXIE_PUBLICATION_NOT_CONNECTED, "AeronSender: Offer failed because publisher is not connected to subscriber %s", target.str()); - else if (aeron::PUBLICATION_CLOSED == result) - throw makeStringExceptionV(ROXIE_PUBLICATION_CLOSED, "AeronSender: Offer failed because publisher is closed sending to %s", target.str()); - else - throw makeStringExceptionV(ROXIE_AERON_ERROR, "AeronSender: Offer failed for unknown reason %" I64F "d sending to %s", (__int64) result, target.str()); - } - break; - } - } -}; - -class CRoxieAeronSendManager : public CInterfaceOf -{ - std::shared_ptr aeron; - const unsigned dataPort = 0; - const unsigned numQueues = 0; - IpMapOf receiversTable; - const IpAddress myIP; - bool encrypted; - - std::atomic msgSeq{0}; - - inline unsigned getNextMessageSequence() - { - unsigned res; - do - { - res = ++msgSeq; - } while (unlikely(!res)); - return res; - } -public: - CRoxieAeronSendManager(unsigned _dataPort, unsigned _numQueues, const IpAddress &_myIP, bool _encrypted) - : dataPort(_dataPort), - numQueues(_numQueues), - receiversTable([this](const ServerIdentifier ip) { return new UdpAeronReceiverEntry(ip.getIpAddress(), dataPort, aeron, numQueues);}), - myIP(_myIP), - encrypted(_encrypted) - { - if (useEmbeddedAeronDriver && !is_running()) - { - aeronDriverThread = std::thread([]() { startAeronDriver(); }); - driverStarted.wait(); - } - aeron::Context context; - if (udpTraceLevel) - context.newPublicationHandler( - [](const std::string& channel, std::int32_t streamId, std::int32_t sessionId, std::int64_t correlationId) - { - DBGLOG("AeronSender: Publication %s, correlation %" I64F "d, streamId %d, sessionId %d", channel.c_str(), (__int64) correlationId, streamId, sessionId); - }); - - aeron = aeron::Aeron::connect(context); - } - virtual void writeOwn(IUdpReceiverEntry &receiver, roxiemem::DataBuffer *buffer, unsigned len, unsigned queue) override - { - assert(queue < numQueues); - static_cast(receiver).write(buffer, len, queue); - buffer->Release(); - } - virtual IMessagePacker *createMessagePacker(ruid_t id, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) override; - virtual bool dataQueued(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) override { return false; } - virtual bool abortData(ruid_t ruid, unsigned sequence, const ServerIdentifier &destNode) override { return false; } - virtual void abortAll(const ServerIdentifier &destNode) override { } - virtual bool allDone() override { return true; } -}; - -IMessagePacker *CRoxieAeronSendManager::createMessagePacker(ruid_t ruid, unsigned sequence, const void *messageHeader, unsigned headerSize, const ServerIdentifier &destNode, int queue) -{ - const IpAddress dest = destNode.getIpAddress(); - return ::createMessagePacker(ruid, sequence, messageHeader, headerSize, *this, receiversTable[dest], myIP, getNextMessageSequence(), queue, encrypted); -} - -extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted) -{ - return new CRoxieAeronReceiveManager(ep, encrypted); -} - -extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted) -{ - return new CRoxieAeronSendManager(dataPort, numQueues, myIP, encrypted); -} - -#else - -extern UDPLIB_API void setAeronProperties(const IPropertyTree *config) -{ -} - -extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted) -{ - UNIMPLEMENTED; -} - -extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted) -{ - UNIMPLEMENTED; -} - -extern UDPLIB_API void stopAeronDriver() -{ -} -#endif diff --git a/roxie/udplib/udplib.cmake b/roxie/udplib/udplib.cmake index c95f7b11f85..44e8e220012 100644 --- a/roxie/udplib/udplib.cmake +++ b/roxie/udplib/udplib.cmake @@ -27,7 +27,6 @@ project( udplib ) set ( SRCS udpmsgpk.cpp - udpaeron.cpp udpsha.cpp udptrr.cpp udptrs.cpp @@ -42,8 +41,6 @@ include_directories ( ./../../roxie/ccd ${HPCC_SOURCE_DIR}/testing/unittests ./../../roxie/roxie - ./../../system/aeron/aeron-client/src/main/cpp - ./../../system/aeron/aeron-driver/src/main/c/ ) HPCC_ADD_LIBRARY( udplib SHARED ${SRCS} ) @@ -56,13 +53,3 @@ target_link_libraries ( udplib jlib roxiemem ) - -if (USE_AERON) - target_link_libraries ( udplib - aeron_client - aeron_driver - ) - - install( TARGETS aeron_driver RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} ) -endif() - diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index 3d3061227df..79c90c534f9 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -155,14 +155,10 @@ interface ISendManager : extends IInterface extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int queue_size, bool encrypted); extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, const IpAddress &myIP, TokenBucket *rateLimiter, bool encryptionInTransit); -extern UDPLIB_API void setAeronProperties(const IPropertyTree *config); -extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted); -extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsigned numQueues, const IpAddress &myIP, bool encrypted); #if defined( __linux__) || defined(__APPLE__) extern UDPLIB_API void setLinuxThreadPriority(int level); #endif extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats); -extern UDPLIB_API void stopAeronDriver(); interface IRoxieQueryPacket; class RoxiePacketHeader; diff --git a/roxie/udplib/udptransport.cmake b/roxie/udplib/udptransport.cmake index a45d774e70d..3914e01e666 100644 --- a/roxie/udplib/udptransport.cmake +++ b/roxie/udplib/udptransport.cmake @@ -33,9 +33,6 @@ include_directories ( ./../../roxie/roxiemem ./../../system/include ./../../system/jlib - ./../../system/aeron/aeron-client/src/main/cpp - ./../../system/aeron/aeron-driver/src/main/c/ - ./../../system/aeron/aeron-samples/src/main/cpp # temporary ./../../roxie/ccd ) @@ -50,12 +47,4 @@ target_link_libraries ( udptransport jlib roxiemem udplib - ) - -if (USE_AERON) - target_link_libraries ( udptransport - aeron_client - aeron_driver - ) -endif() - + ) \ No newline at end of file diff --git a/roxie/udplib/uttest.cpp b/roxie/udplib/uttest.cpp index a5c19254f5e..1ca82b49a4a 100644 --- a/roxie/udplib/uttest.cpp +++ b/roxie/udplib/uttest.cpp @@ -42,7 +42,6 @@ void usage() printf("Options are:\n"); printf( "--jumboFrames\n" - "--useAeron\n" "--udpLocalWriteSocketSize nn\n" "--udpRetryBusySenders nn\n" "--maxPacketsPerSender nn\n" @@ -77,7 +76,6 @@ bool doSortSimulator = false; bool simpleSequential = true; float slowNodeSkew = 1.0; unsigned numSortSlaves = 50; -bool useAeron = false; bool doRawTest = false; unsigned rawBufferSize = 1024; @@ -177,13 +175,7 @@ class Receiver : public Thread virtual int run() { Owned rcvMgr; - if (useAeron) - { - SocketEndpoint myEP(7000, myNode.getIpAddress()); - rcvMgr.setown(createAeronReceiveManager(myEP, false)); - } - else - rcvMgr.setown(createReceiveManager(7000, 7001, 7002, udpQueueSize, false)); + rcvMgr.setown(createReceiveManager(7000, 7001, 7002, udpQueueSize, false)); Owned rowMgr = roxiemem::createRowManager(0, NULL, queryDummyContextLogger(), NULL, false); Owned collator = rcvMgr->createMessageCollator(rowMgr, 1); unsigned lastReport = 0; @@ -290,10 +282,7 @@ void testNxN() if (maxPacketsPerSender > udpQueueSize) maxPacketsPerSender = udpQueueSize; Owned sendMgr; - if (useAeron) - sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getIpAddress(), false)); - else - sendMgr.setown(createSendManager(7000, 7001, 7002, 100, udpNumQs, myNode.getIpAddress(), nullptr, false)); + sendMgr.setown(createSendManager(7000, 7001, 7002, 100, udpNumQs, myNode.getIpAddress(), nullptr, false)); Receiver receiver; IMessagePacker **packers = new IMessagePacker *[numNodes]; @@ -667,10 +656,6 @@ int main(int argc, char * argv[] ) { roxiemem::setDataAlignmentSize(0x2000); } - else if (strcmp(ip, "--useAeron")==0) - { - useAeron = true; - } else if (strcmp(ip, "--rawSpeedTest")==0) { doRawTest = true; diff --git a/system/CMakeLists.txt b/system/CMakeLists.txt index e9c18182a4b..49845c4e32f 100644 --- a/system/CMakeLists.txt +++ b/system/CMakeLists.txt @@ -26,18 +26,4 @@ if (NOT JLIB_ONLY) HPCC_ADD_SUBDIRECTORY (xmllib) HPCC_ADD_SUBDIRECTORY (xmllibtest "PLATFORM") HPCC_ADD_SUBDIRECTORY (masking) - - if (USE_AERON) - project (aeron_include) - SET(CMAKE_UNITY_BUILD FALSE) - remove_definitions(-fvisibility=hidden) - if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANGXX) - SET (CMAKE_SAVED_AERON_CXX_FLAGS "${CMAKE_CXX_FLAGS}") - SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format-nonliteral") - endif() - HPCC_ADD_SUBDIRECTORY (aeron) - if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANGXX) - SET (CMAKE_CXX_FLAGS "${CMAKE_SAVED_AERON_CXX_FLAGS}") - endif() - endif() endif( ) diff --git a/system/aeron b/system/aeron deleted file mode 160000 index 80317713655..00000000000 --- a/system/aeron +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8031771365551e5b4d5b19cef4510757fb16d619