Skip to content

Commit

Permalink
Initial Tak CoT Conversion MCS Publish classes
Browse files Browse the repository at this point in the history
  • Loading branch information
keith.williams committed Mar 18, 2021
1 parent 52a1e9c commit 2e83e50
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,43 +1,85 @@
package tak.server.plugins;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import atakmap.commoncommo.protobuf.v1.MessageOuterClass.Message;
import tak.server.plugins.dto.EntityDto;
import tak.server.plugins.interfaces.PublishCallback;
import tak.server.plugins.takprocessing.TakCoTConsumer;
import tak.server.plugins.takprocessing.TakCoTProducer;
import tak.server.plugins.utilities.BrokerHelper;
import tak.server.plugins.utilities.CoTMcsConverter;

@TakServerPlugin(name = "MCS COP Receiver Plugin", description = "TAK Server plugin that consumes TAK CoT messages, converts them to MCS COP messages, and sends them to the MCS COP Message Broker")
public class McsLoggerReceiverPlugin extends MessageReceiverBase {
public class McsLoggerReceiverPlugin extends MessageReceiverBase implements PublishCallback {

private static final Logger _logger = LoggerFactory.getLogger(McsLoggerReceiverPlugin.class);

private Boolean _verboseLogging = false;
public static Boolean VerboseLogging = false;
private static int _queueSize = 10;

private TakCoTProducer _messageProducer;
private TakCoTConsumer _messageConsumer;
private BlockingQueue<Message> _blockingQueue;
private ExecutorService _executor = Executors.newFixedThreadPool(2);

public McsLoggerReceiverPlugin() throws ReservedConfigurationException {
_logger.info("create " + getClass().getName());
if (config.containsProperty("verboseLogging")){
_verboseLogging = (boolean)config.getProperty("verboseLogging");
VerboseLogging = (boolean)config.getProperty("verboseLogging");
}

_logger.info("logging = " + _verboseLogging.toString());
_logger.info("logging = " + VerboseLogging.toString());

_blockingQueue = new LinkedBlockingDeque<>(_queueSize);
_messageProducer = new TakCoTProducer(_executor, _blockingQueue);
_messageConsumer = new TakCoTConsumer(_executor, _blockingQueue, this);
}

@Override
public void start() {
_logger.info(getClass().getName() + " started");
_logger.info("Configuration Properties: " + config.getProperties());
_messageProducer.Start();
_messageConsumer.Start();
}

@Override
public void stop() {
try {
if (_messageConsumer != null) _messageConsumer.Stop();
if (_messageProducer != null) _messageProducer.Stop();
_executor.shutdown();
}
catch(Exception e){
_logger.error("Error stopping", e);
}
}

@Override
public void onMessage(Message message) {
if (_verboseLogging)
if (VerboseLogging)
_logger.info("plugin message received: " + message);

if(CoTMcsConverter.messageFromSender(message))
_logger.info("Message is from TAK Plugin");
if(CoTMcsConverter.messageFromSender(message)) {
if (VerboseLogging)
_logger.info("Message is from TAK Plugin");
//Bail - TODO it would be nice maybe to check flowtags or something else
return;
}

_messageProducer.AddMessage(message);
}

EntityDto EntityDto = CoTMcsConverter.convertToEntityDto(message);
String json = CoTMcsConverter.convertToJson(EntityDto);
_logger.info(json);
@Override
public void publishEntityMessage(String message) {
if (BrokerHelper.currentclient == null) return;
BrokerHelper.currentclient.publishEntityMessage(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
import tak.server.plugins.dto.EventDto;
import tak.server.plugins.dto.EntityDto;
import tak.server.plugins.interfaces.MessageCallback;
import tak.server.plugins.messagebroker.RabbitMQConsumer;
import tak.server.plugins.messagebroker.RabbitMQClient;
import tak.server.plugins.processing.MessageConsumer;
import tak.server.plugins.processing.MessageProducer;
import tak.server.plugins.processing.ProcessingMessage;
import tak.server.plugins.utilities.BrokerHelper;
import tak.server.plugins.utilities.McsCoTConverter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -42,7 +43,7 @@ public class McsSenderPlugin extends MessageSenderBase implements MessageCallbac
private static int _queueSize = 10;

private static final Logger _logger = LoggerFactory.getLogger(McsSenderPlugin.class);
private RabbitMQConsumer _rabbitMqConsumer;
private RabbitMQClient _rabbitMqConsumer;
private MessageProducer _messageProducer;
private MessageConsumer _messageConsumer;
private BlockingQueue<ProcessingMessage> _blockingQueue;
Expand All @@ -64,7 +65,7 @@ public McsSenderPlugin() {

_blockingQueue = new LinkedBlockingDeque<>(_queueSize);

_rabbitMqConsumer = new RabbitMQConsumer();
_rabbitMqConsumer = new RabbitMQClient();
_messageProducer = new MessageProducer(_executor, _blockingQueue);
_messageConsumer = new MessageConsumer(_executor, _blockingQueue, this);
}
Expand All @@ -83,6 +84,7 @@ public void start() {

private void setupConnection(){
_rabbitMqConsumer.SetupConsumption(_messageProducer, config);
BrokerHelper.currentclient = _rabbitMqConsumer;
_messageProducer.Start();
_messageConsumer.Start();
}
Expand All @@ -91,6 +93,7 @@ private void setupConnection(){
public void stop() {
try {
if (_rabbitMqConsumer != null) _rabbitMqConsumer.Stop();
BrokerHelper.currentclient = null;
if (_messageConsumer != null) _messageConsumer.Stop();
if (_messageProducer != null) _messageProducer.Stop();
_executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package tak.server.plugins.interfaces;

public interface PublishCallback {
void publishEntityMessage(String message);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tak.server.plugins.messagebroker;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -15,7 +16,7 @@
import tak.server.plugins.PluginConfiguration;
import tak.server.plugins.McsSenderPlugin;

public class RabbitMQConsumer {
public class RabbitMQClient {
private MessageProducer _producer;
private String _exchangeName = "dragonfly";
private String _entityRoutingKey = "dragonfly.demo_entities";
Expand All @@ -25,7 +26,7 @@ public class RabbitMQConsumer {
private String _username = "some-username";
private boolean _useRapidX = false;

private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class);
private static final Logger logger = LoggerFactory.getLogger(RabbitMQClient.class);
private Channel _channel;
private String _consumerTag = "";

Expand Down Expand Up @@ -87,6 +88,20 @@ public Boolean isEventKey(String key)
return key.equals(_eventRoutingKey);
}

public void publishEntityMessage(String message)
{
if (!_channel.isOpen()) return;

try {
byte[] entityBytes = message.getBytes(StandardCharsets.UTF_8);
_channel.basicPublish(_exchangeName, _entityRoutingKey, null, entityBytes);
}
catch (Exception e)
{
logger.error("error publishing to rabbitMQ", e);
}
}

private void SetupConfiguration(PluginConfiguration configuration) {
logger.info("Reading configuration");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package tak.server.plugins.takprocessing;

import tak.server.plugins.McsLoggerReceiverPlugin;
import tak.server.plugins.dto.EntityDto;
import tak.server.plugins.interfaces.PublishCallback;
import tak.server.plugins.utilities.CoTMcsConverter;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

import atakmap.commoncommo.protobuf.v1.MessageOuterClass.Message;

public class TakCoTConsumer {
private ExecutorService _executor;
private BlockingQueue<Message> _queue;
private static final Logger _logger = LoggerFactory.getLogger(TakCoTConsumer.class);
private PublishCallback _callback;
private AtomicBoolean _running = new AtomicBoolean(false);

public TakCoTConsumer(ExecutorService executorService, BlockingQueue<Message> queue, PublishCallback callback) {
_executor = executorService;
_queue = queue;
_callback = callback;
}

public boolean isRunning() {
return _running.get();
}

public void Start(){
_logger.info("Starting TakCoTConsumer");
_running.set(true);
Runnable consumerTask = () -> {
try {
while(_running.get()) {
Message cotMessage = _queue.take();
if (McsLoggerReceiverPlugin.VerboseLogging)
_logger.info("CoT message received in consumer converting to Entity and sending to publish");

try {
//TODO - Rework so that nested Try catches don't have to exist
EntityDto entityDto = CoTMcsConverter.convertToEntityDto(cotMessage);
if (entityDto == null) continue;
String json = CoTMcsConverter.convertToJson(entityDto);
_callback.publishEntityMessage(json);
}
catch (Exception e) {
_logger.error("error converting message from queue", e);
}
}
}
catch (InterruptedException e) {
_logger.info("thread interrupted", e);
}
catch (Exception e) {
_logger.error("error taking message from queue", e);
}
};

_executor.execute(consumerTask);
}

public void Stop(){
_logger.info("Stopping TakCoTConsumer");
_running.set(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package tak.server.plugins.takprocessing;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import atakmap.commoncommo.protobuf.v1.MessageOuterClass.Message;

public class TakCoTProducer {
private ExecutorService _executor;
private BlockingQueue<Message> _queue;
private static final Logger _logger = LoggerFactory.getLogger(TakCoTProducer.class);

public TakCoTProducer(ExecutorService executorService, BlockingQueue<Message> queue) {
_executor = executorService;
_queue = queue;
}

public void Start(){
//Notional
_logger.info("Starting TakCoTProducer");
}

public void Stop(){
//Notional
_logger.info("Stopping TakCoTProducer");
}

public void AddMessage(Message message) {
Runnable producerTask = () -> {
try {
_queue.put(message);
}
catch (InterruptedException e) {
_logger.info("thread interrupted", e);
}
catch (Exception e) {
_logger.error("error putting message on queue", e);
}
};

_executor.submit(producerTask);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tak.server.plugins.utilities;

import tak.server.plugins.messagebroker.RabbitMQClient;

public class BrokerHelper {
public static RabbitMQClient currentclient;
}

0 comments on commit 2e83e50

Please sign in to comment.