diff --git a/core-cpp/kafka/include/kafka/Consumer.hh b/core-cpp/kafka/include/kafka/Consumer.hh index ed26d606..a25c1611 100644 --- a/core-cpp/kafka/include/kafka/Consumer.hh +++ b/core-cpp/kafka/include/kafka/Consumer.hh @@ -19,6 +19,8 @@ class MessageQueue { public: void enqueue(cppkafka::Message &msg); Message dequeue(); + std::vector dequeue_all(); + bool empty() { return queue.empty(); } private: std::queue queue; @@ -35,7 +37,7 @@ public: private: MessageQueue messageQueue; - void handleMessage(); + void handleMessages(); void createTopics(cppkafka::Consumer& consumer, std::vector topics); }; diff --git a/core-cpp/kafka/src/Consumer.cc b/core-cpp/kafka/src/Consumer.cc index a129802a..c697dbcf 100644 --- a/core-cpp/kafka/src/Consumer.cc +++ b/core-cpp/kafka/src/Consumer.cc @@ -61,7 +61,7 @@ void Consumer::run() { // create a signal listener SWA::RealTimeSignalListener listener( - [this](int pid, int uid) { this->handleMessage(); }, + [this](int pid, int uid) { this->handleMessages(); }, SWA::Process::getInstance().getActivityMonitor()); // Now run the dispatcher, providing a callback to handle messages, one to @@ -77,22 +77,22 @@ void Consumer::run() { ); } -void Consumer::handleMessage() { - // handle the next message in the queue - try { - // dequeue the message - Message msg = messageQueue.dequeue(); +void Consumer::handleMessages() { + // drain the message queue + if (!messageQueue.empty()) { + std::vector msgs = messageQueue.dequeue_all(); + for (auto it = msgs.begin(); it != msgs.end(); it++) { + Message msg = *it; - // create an input stream for the parameter data - BufferedInputStream buf(msg.second.begin(), msg.second.end()); + // create an input stream for the parameter data + BufferedInputStream buf(msg.second.begin(), msg.second.end()); - // get the service invoker - Callable service = ProcessHandler::getInstance().getServiceHandler(msg.first).getInvoker(buf); + // get the service invoker + Callable service = ProcessHandler::getInstance().getServiceHandler(msg.first).getInvoker(buf); - // run the service - service(); - } catch (std::out_of_range &e) { - // the queue is empty + // run the service + service(); + } } } @@ -189,4 +189,14 @@ Message MessageQueue::dequeue() { return msg; } +std::vector MessageQueue::dequeue_all() { + std::lock_guard lock(mutex); + std::vector result; + while (!queue.empty()) { + result.push_back(queue.front()); + queue.pop(); + } + return result; +} + } // namespace Kafka diff --git a/core-java/src/main/java/org/xtuml/masl/translate/main/DomainTranslator.java b/core-java/src/main/java/org/xtuml/masl/translate/main/DomainTranslator.java index 6bcd6bfd..ef917741 100644 --- a/core-java/src/main/java/org/xtuml/masl/translate/main/DomainTranslator.java +++ b/core-java/src/main/java/org/xtuml/masl/translate/main/DomainTranslator.java @@ -40,6 +40,7 @@ import org.xtuml.masl.translate.main.object.ObjectTranslator; import java.util.*; +import java.util.stream.Collectors; @Alias("Main") @Default @@ -407,7 +408,7 @@ private void addServices() { servicesEnum = new EnumerationType("ServiceIds", DomainNamespace.get(domain)); interfaceDomainHeader.addEnumerateDeclaration(servicesEnum); - for (final DomainService service : domain.getServices()) { + for (final DomainService service : domain.getServices().stream().sorted(Comparator.comparing(DomainService::getName)).collect(Collectors.toList())) { final String enumName = "serviceId_" + Mangler.mangleName(service); final Enumerator serviceId = servicesEnum.addEnumerator(enumName, null);