Skip to content

Commit

Permalink
Extend Kafka utility to allow marking outbound terminator services as…
Browse files Browse the repository at this point in the history
… polled kafka topics
  • Loading branch information
leviathan747 committed Jun 4, 2024
1 parent 7387d0f commit 12669a8
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 9 deletions.
2 changes: 2 additions & 0 deletions core-cpp/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ simple_add_shared_library (
NAME Kafka
SOURCES
Consumer.cc
DataConsumer.cc
Kafka.cc
ProcessHandler.cc
Producer.cc
Expand All @@ -24,6 +25,7 @@ simple_add_shared_library (
EXPORT MaslCore
INCLUDES
kafka/Consumer.hh
kafka/DataConsumer.hh
kafka/Kafka.hh
kafka/ProcessHandler.hh
kafka/Producer.hh
Expand Down
6 changes: 5 additions & 1 deletion core-cpp/kafka/include/kafka/Consumer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "cppkafka/consumer.h"
#include "cppkafka/message.h"

#include "DataConsumer.hh"

#include <condition_variable>
#include <mutex>
#include <queue>
Expand All @@ -29,15 +31,17 @@ private:
class Consumer {

public:
Consumer(std::string topic);
Consumer(std::vector<std::string> topics);
bool consumeOne(DataConsumer& dataConsumer);
void run();

private:
MessageQueue messageQueue;
std::unique_ptr<cppkafka::Consumer> consumer;

void initialize(std::vector<std::string> topics);
void handleMessages();

void createTopics(std::vector<std::string> topics);
};

Expand Down
18 changes: 18 additions & 0 deletions core-cpp/kafka/include/kafka/DataConsumer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef Kafka_DataConsumer_HH
#define Kafka_DataConsumer_HH

#include <cstdint>
#include <vector>

namespace Kafka {

class DataConsumer {
public:
virtual void accept(std::vector<std::uint8_t> data) const {
}
virtual ~DataConsumer();
};

} // namespace Kafka

#endif
2 changes: 0 additions & 2 deletions core-cpp/kafka/include/kafka/Producer.hh
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#ifndef Kafka_Producer_HH
#define Kafka_Producer_HH

#include <nlohmann/json.hpp>

#include "cppkafka/message_builder.h"
#include "cppkafka/producer.h"

Expand Down
3 changes: 2 additions & 1 deletion core-cpp/kafka/include/kafka/ServiceHandler.hh
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#ifndef Kafka_ServiceHandler_HH
#define Kafka_ServiceHandler_HH

#include <nlohmann/json.hpp>
#include <cstdint>
#include <functional>
#include <vector>

namespace Kafka {

Expand Down
25 changes: 23 additions & 2 deletions core-cpp/kafka/src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
namespace Kafka {

Consumer::Consumer(std::vector<std::string> topics) {
initialize(topics);
}

Consumer::Consumer(std::string topic) {
std::vector<std::string> topics;
topics.push_back(topic);
initialize(topics);
}

void Consumer::initialize(std::vector<std::string> topics) {
// Get command line options
const std::string brokers = SWA::CommandLine::getInstance().getOption(BrokersOption);
const std::string offsetReset = SWA::CommandLine::getInstance().getOption(OffsetResetOption, "earliest");
Expand All @@ -44,13 +54,13 @@ Consumer::Consumer(std::vector<std::string> topics) {
consumer = std::unique_ptr<cppkafka::Consumer>(new cppkafka::Consumer(config));

// create topics if they don't already exist
createTopics(ProcessHandler::getInstance().getTopicNames());
createTopics(topics);

// short delay to avoid race conditions if other processes initiated topic creation
SWA::delay(SWA::Duration::fromMillis(100));

// Subscribe to topics
consumer->subscribe(ProcessHandler::getInstance().getTopicNames());
consumer->subscribe(topics);
}

void Consumer::run() {
Expand Down Expand Up @@ -102,6 +112,17 @@ void Consumer::handleMessages() {
}
}

bool Consumer::consumeOne(DataConsumer& dataConsumer) {
cppkafka::Message msg = consumer->poll();
if (msg) {
dataConsumer.accept(std::vector<uint8_t>(msg.get_payload()));
consumer->commit(msg);
return true;
} else {
return false;
}
}

void Consumer::createTopics(std::vector<std::string> topics) {
// TODO clean up error handling in this routine
for (auto it = topics.begin(); it != topics.end(); it++) {
Expand Down
7 changes: 7 additions & 0 deletions core-cpp/kafka/src/DataConsumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include "kafka/DataConsumer.hh"

namespace Kafka {

DataConsumer::~DataConsumer() {}

} // namespace Kafka
Original file line number Diff line number Diff line change
@@ -1,17 +1,50 @@
package org.xtuml.masl.translate.kafka;


import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.xtuml.masl.cppgen.ArrayAccess;
import org.xtuml.masl.cppgen.BinaryExpression;
import org.xtuml.masl.cppgen.BinaryOperator;
import org.xtuml.masl.cppgen.Class;
import org.xtuml.masl.cppgen.CodeFile;
import org.xtuml.masl.cppgen.Comment;
import org.xtuml.masl.cppgen.DeclarationGroup;
import org.xtuml.masl.cppgen.Expression;
import org.xtuml.masl.cppgen.Function;
import org.xtuml.masl.cppgen.FundamentalType;
import org.xtuml.masl.cppgen.Library;
import org.xtuml.masl.cppgen.Literal;
import org.xtuml.masl.cppgen.Namespace;
import org.xtuml.masl.cppgen.ReturnStatement;
import org.xtuml.masl.cppgen.SharedLibrary;
import org.xtuml.masl.cppgen.Std;
import org.xtuml.masl.cppgen.TypeUsage;
import org.xtuml.masl.cppgen.Variable;
import org.xtuml.masl.cppgen.VariableDefinitionStatement;
import org.xtuml.masl.cppgen.Visibility;
import org.xtuml.masl.metamodel.common.ParameterDefinition;
import org.xtuml.masl.metamodel.common.ParameterDefinition.Mode;
import org.xtuml.masl.metamodel.common.Service;
import org.xtuml.masl.metamodel.domain.Domain;
import org.xtuml.masl.metamodel.type.TypeDeclaration;
import org.xtuml.masl.metamodel.domain.DomainTerminatorService;
import org.xtuml.masl.metamodel.type.BasicType;
import org.xtuml.masl.metamodel.type.TypeDefinition.ActualType;
import org.xtuml.masl.metamodelImpl.type.BooleanType;
import org.xtuml.masl.metamodelImpl.type.StringType;
import org.xtuml.masl.translate.Alias;
import org.xtuml.masl.translate.Default;
import org.xtuml.masl.translate.main.Architecture;
import org.xtuml.masl.translate.main.Mangler;
import org.xtuml.masl.translate.main.NlohmannJson;
import org.xtuml.masl.translate.main.ParameterTranslator;
import org.xtuml.masl.translate.main.TerminatorServiceTranslator;
import org.xtuml.masl.translate.main.Types;



@Alias("Kafka")
Expand Down Expand Up @@ -55,6 +88,7 @@ public void translate ()
{
// create code files
final CodeFile consumerCodeFile = library.createBodyFile("Kafka" + Mangler.mangleFile(domain));
final CodeFile pollerCodeFile = library.createBodyFile("Kafka_pollers" + Mangler.mangleFile(domain));
final CodeFile publisherCodeFile = interfaceLibrary.createBodyFile("Kafka_publishers" + Mangler.mangleFile(domain));

// create service translators
Expand Down Expand Up @@ -92,8 +126,155 @@ public void translate ()
{
serviceTranslator.addCustomTopicName(publisherCodeFile);
}

// TODO temp
final List<DomainTerminatorService> terminatorServices = domain.getTerminators().stream()
.flatMap(terminator -> terminator.getServices().stream())
.filter(service -> Optional.ofNullable(service.getReturnType()).map(t -> t.isAssignableFrom(BooleanType.createAnonymous())).orElse(false))
.filter(service -> service.getDeclarationPragmas().hasPragma(KAFKA_TOPIC_PRAGMA))
.collect(Collectors.toList());

for (final DomainTerminatorService terminatorService : terminatorServices) {

// add service handlers
addServiceHandler2(pollerCodeFile, terminatorService);

// add service overriders

// add consumer initializers

}


}

void addServiceHandler2 (final CodeFile codeFile, final DomainTerminatorService service)
{

// if the service has a single string parameter, just pass through without parsing JSON
final boolean noParseJson = service.getParameters().size() == 1 && service.getParameters().get(0).getType().isAssignableFrom(StringType.createAnonymous());

// create and add the consumer class to the file
final Class consumerClass = new Class(Mangler.mangleName(service) + "Consumer", domainNamespace);
consumerClass.addSuperclass(Kafka.dataConsumerClass, Visibility.PUBLIC);
final DeclarationGroup functions = consumerClass.createDeclarationGroup();
codeFile.addClassDeclaration(consumerClass);

// create the constructor
final Function constructor = consumerClass.createConstructor(functions, Visibility.PUBLIC);
constructor.declareInClass(true);

// create the accept function
final Function acceptFn = consumerClass.createMemberFunction(functions, "accept", Visibility.PUBLIC);
final Expression data = acceptFn.createParameter(new TypeUsage(Std.vector(new TypeUsage(Std.uint8))), "data").asExpression();
acceptFn.setConst(true);
final Predicate<BasicType> typeIsSerializable = paramType -> !(paramType.getBasicType().getActualType() == ActualType.EVENT ||
paramType.getBasicType().getActualType() == ActualType.DEVICE || paramType.getBasicType().getActualType() == ActualType.ANY_INSTANCE);
final Variable paramJson = new Variable(new TypeUsage(NlohmannJson.json), "params", NlohmannJson.parse(data));
if (!noParseJson && service.getParameters().stream().map(p -> p.getType().getBasicType()).anyMatch(typeIsSerializable)) {
// Only build a JSON object if it will be used
acceptFn.getCode().appendStatement(new VariableDefinitionStatement(paramJson));
}

// create the overrider function
final Function overrider = new Function(Mangler.mangleName(service), domainNamespace);
overrider.setReturnType(Types.getInstance().getType(service.getReturnType()));

final List<Expression> consumerArgs = new ArrayList<>();
final DeclarationGroup vars = consumerClass.createDeclarationGroup();
for (final ParameterDefinition param : service.getParameters()) {
final TypeUsage type = Types.getInstance().getType(param.getType());

// add the parameter to the overrider function
final ParameterTranslator paramTrans = new ParameterTranslator(param, overrider);

// only process "out" parameters
if (param.getMode() == Mode.OUT) {

// capture each parameter as a member variable
final Variable constructorParam = constructor.createParameter(type.getReferenceType(), Mangler.mangleName(param));
final Variable memberVar = consumerClass.createMemberVariable(vars, Mangler.mangleName(param), type.getReferenceType(), Visibility.PRIVATE);
constructor.setInitialValue(memberVar, constructorParam.asExpression());

// parse out each parameter and assign it to the member variable
if (typeIsSerializable.test(param.getType().getBasicType())) {
Expression paramAccess = Std.string.callConstructor(new Function("begin").asFunctionCall(data, false), new Function("end").asFunctionCall(data, false));
if (!noParseJson) {
paramAccess = NlohmannJson.get(service.getParameters().size() > 1 ? new ArrayAccess(paramJson.asExpression(), Literal.createStringLiteral(param.getName())) : paramJson.asExpression(), type);
}
acceptFn.getCode().appendStatement(new BinaryExpression(memberVar.asExpression(), BinaryOperator.ASSIGN, paramAccess).asStatement());
}

// add to the list for the consumer call
consumerArgs.add(paramTrans.getVariable().asExpression());
}


}

// create consumer instance
final Variable consumer = new Variable(new TypeUsage(Kafka.consumerClass), "consumer_" + Mangler.mangleName(service), domainNamespace,
Kafka.consumerClass.callConstructor(Std.string.callConstructor(
getTopicName(service)
)));

// add the call to consume to the overrider
final Variable dataConsumer = new Variable(new TypeUsage(consumerClass), "dataConsumer", consumerClass.callConstructor(consumerArgs));
overrider.getCode().appendStatement(
new VariableDefinitionStatement(dataConsumer));
overrider.getCode().appendStatement(
new ReturnStatement(
new Function("consumeOne").asFunctionCall(consumer.asExpression(), false,
dataConsumer.asExpression()
)));

// add the accept function definition to the file
codeFile.addFunctionDefinition(acceptFn);

// add the consumer variable to fine file
codeFile.addVariableDefinition(consumer);

// add the overrider function definition to the file
codeFile.addFunctionDefinition(overrider);

// register the overrider function
final Variable register = new Variable(new TypeUsage(FundamentalType.BOOL), "register_" + Mangler.mangleName(service), domainNamespace,
TerminatorServiceTranslator.getInstance(service).getRegisterOverride().asFunctionCall(overrider.asFunctionPointer()));
codeFile.addVariableDefinition(register);


}

private Expression getTopicName(final DomainTerminatorService service) {
if (service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).size() == 1) {
final String topicNameString = service.getDeclarationPragmas().getPragmaValues(DomainTranslator.KAFKA_TOPIC_PRAGMA).get(0);
if (!isBoolean(topicNameString) && !isNumeric(topicNameString)) {
return Literal.createStringLiteral(topicNameString);
}
}
final Expression processHandler = Kafka.processHandlerClass.callStaticFunction("getInstance");
final Expression domainId = new Function("getId").asFunctionCall(new Function("getDomain").asFunctionCall(Architecture.process, false, Literal.createStringLiteral(this.getDomain().getName())), false);
final Expression serviceId = TerminatorServiceTranslator.getInstance(service).getServiceId();
return new Function("getTopicName").asFunctionCall(processHandler, false, domainId, serviceId);
}

private boolean isBoolean(final String value) {
return "true".equals(value) || "false".equals(value);
}

private boolean isNumeric(final String value) {
try {
Double.parseDouble(value);
return true;
} catch (NumberFormatException e) {
return false;
}
}






Namespace getNamespace()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ public class Kafka
static CodeFile processHandlerInc = library.createInterfaceHeader("kafka/ProcessHandler.hh");
static CodeFile producerInc = library.createInterfaceHeader("kafka/Producer.hh");
static CodeFile serviceHandlerInc = library.createInterfaceHeader("kafka/ServiceHandler.hh");
static CodeFile dataConsumerInc = library.createInterfaceHeader("kafka/DataConsumer.hh");
static CodeFile consumerInc = library.createInterfaceHeader("kafka/Consumer.hh");

static Class processHandlerClass = new Class("ProcessHandler", kafkaNamespace, processHandlerInc);
static Class producerClass = new Class("Producer", kafkaNamespace, producerInc);
static Class callable = new Class("Callable", kafkaNamespace, processHandlerInc);
static Class serviceHandlerClass = new Class("ServiceHandler", kafkaNamespace, serviceHandlerInc);
static Class dataConsumerClass = new Class("DataConsumer", kafkaNamespace, dataConsumerInc);
static Class consumerClass = new Class("Consumer", kafkaNamespace, consumerInc);

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.xtuml.masl.metamodel.domain.DomainService;
import org.xtuml.masl.metamodel.type.BasicType;
import org.xtuml.masl.metamodel.type.TypeDefinition.ActualType;
import org.xtuml.masl.metamodelImpl.common.PragmaDefinition;
import org.xtuml.masl.metamodelImpl.type.StringType;
import org.xtuml.masl.translate.main.Architecture;
import org.xtuml.masl.translate.main.DomainServiceTranslator;
Expand Down Expand Up @@ -233,7 +232,7 @@ private boolean isBoolean(final String value) {

private boolean isNumeric(final String value) {
try {
double d = Double.parseDouble(value);
Double.parseDouble(value);
return true;
} catch (NumberFormatException e) {
return false;
Expand Down

0 comments on commit 12669a8

Please sign in to comment.