Skip to content

Commit

Permalink
save state
Browse files Browse the repository at this point in the history
  • Loading branch information
leviathan747 committed Feb 9, 2024
1 parent 6b41d9c commit 3a92bfc
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
4 changes: 3 additions & 1 deletion core-cpp/kafka/include/kafka/Consumer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class MessageQueue {
public:
void enqueue(cppkafka::Message &msg);
Message dequeue();
std::vector<Message> dequeue_all();
bool empty() { return queue.empty(); }

private:
std::queue<Message> queue;
Expand All @@ -35,7 +37,7 @@ public:
private:
MessageQueue messageQueue;

void handleMessage();
void handleMessages();

void createTopics(cppkafka::Consumer& consumer, std::vector<std::string> topics);
};
Expand Down
38 changes: 24 additions & 14 deletions core-cpp/kafka/src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void Consumer::run() {

// create a signal listener
SWA::RealTimeSignalListener listener(
[this](int pid, int uid) { this->handleMessage(); },
[this](int pid, int uid) { this->handleMessages(); },
SWA::Process::getInstance().getActivityMonitor());

// Now run the dispatcher, providing a callback to handle messages, one to
Expand All @@ -77,22 +77,22 @@ void Consumer::run() {
);
}

void Consumer::handleMessage() {
// handle the next message in the queue
try {
// dequeue the message
Message msg = messageQueue.dequeue();
void Consumer::handleMessages() {
// drain the message queue
if (!messageQueue.empty()) {
std::vector<Message> msgs = messageQueue.dequeue_all();
for (auto it = msgs.begin(); it != msgs.end(); it++) {
Message msg = *it;

// create an input stream for the parameter data
BufferedInputStream buf(msg.second.begin(), msg.second.end());
// create an input stream for the parameter data
BufferedInputStream buf(msg.second.begin(), msg.second.end());

// get the service invoker
Callable service = ProcessHandler::getInstance().getServiceHandler(msg.first).getInvoker(buf);
// get the service invoker
Callable service = ProcessHandler::getInstance().getServiceHandler(msg.first).getInvoker(buf);

// run the service
service();
} catch (std::out_of_range &e) {
// the queue is empty
// run the service
service();
}
}
}

Expand Down Expand Up @@ -189,4 +189,14 @@ Message MessageQueue::dequeue() {
return msg;
}

std::vector<Message> MessageQueue::dequeue_all() {
std::lock_guard<std::mutex> lock(mutex);
std::vector<Message> result;
while (!queue.empty()) {
result.push_back(queue.front());
queue.pop();
}
return result;
}

} // namespace Kafka
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.xtuml.masl.translate.main.object.ObjectTranslator;

import java.util.*;
import java.util.stream.Collectors;

@Alias("Main")
@Default
Expand Down Expand Up @@ -407,7 +408,7 @@ private void addServices() {
servicesEnum = new EnumerationType("ServiceIds", DomainNamespace.get(domain));
interfaceDomainHeader.addEnumerateDeclaration(servicesEnum);

for (final DomainService service : domain.getServices()) {
for (final DomainService service : domain.getServices().stream().sorted(Comparator.comparing(DomainService::getName)).collect(Collectors.toList())) {
final String enumName = "serviceId_" + Mangler.mangleName(service);
final Enumerator serviceId = servicesEnum.addEnumerator(enumName, null);

Expand Down

0 comments on commit 3a92bfc

Please sign in to comment.