Skip to content

Commit

Permalink
Issue 187: Added MQTT to Pravega bridge sample
Browse files Browse the repository at this point in the history
Signed-off-by: Claudio Fahey <[email protected]>
  • Loading branch information
Claudio Fahey committed Aug 13, 2019
1 parent 13661b4 commit 0c6deca
Show file tree
Hide file tree
Showing 16 changed files with 576 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The related documentation and instructions are [here](hadoop-connector-examples)
| [`turbineheatprocessor`](scenarios/turbine-heat-processor) | A Flink streaming application for processing temperature data from a Pravega stream produced by the `turbineheatsensor` app. The application computes a daily summary of the temperature range observed on that day by each sensor. | [Java](scenarios/turbine-heat-processor/src/main/java/io/pravega/turbineheatprocessor), [Scala](scenarios/turbine-heat-processor/src/main/scala/io/pravega/turbineheatprocessor)
| [`anomaly-detection`](scenarios/anomaly-detection) | A Flink streaming application for detecting anomalous input patterns using a finite-state machine. | [Java](scenarios/anomaly-detection/src/main/java/io/pravega/anomalydetection)
| [`pravega-flink-connector-sql-samples`](scenarios/pravega-flink-connector-sql-samples) | Flink connector table api/sql samples. | [Java](scenarios/pravega-flink-connector-sql-samples/src/main/java/io/pravega/connectors.nytaxi)
| [`mqtt-pravega-bridge`](scenarios/mqtt-pravega-bridge) | A sample application reads events from MQTT and writes them to a Pravega stream. | [Java](scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega)


# Build Instructions
Expand Down
7 changes: 7 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
### Pravega dependencies
pravegaVersion=0.6.0-50.076afef-SNAPSHOT

#3rd party Versions
guavaVersion=20.0
junitVersion=4.12
qosLogbackVersion=1.2.3
pahoClientMqttv3Version=1.2.0
slf4jApiVersion=1.7.25

### Pravega-samples output library
samplesVersion=0.6.0-SNAPSHOT

Expand Down
41 changes: 41 additions & 0 deletions scenarios/mqtt-pravega-bridge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# MQTT to Pravega Bridge

This sample application reads events from MQTT and writes them to a Pravega stream.

## Usage

- Install Mosquitto MQTT broker and clients.
```
sudo apt-get install mosquitto mosquitto-clients
```

- If not automatically started, start Mosquitto broker.
```
mosquitto
```

- Edit the file src/main/dist/conf/bridge.properties
to specify your Pravega controller URI (controllerUri) as
`tcp://HOST_IP:9090`.

- Run the application:
```
../../gradlew run
```

- Alternatively, you may run in IntelliJ.
Run the class ApplicationMain with the following parameters:
```
scenarios/mqtt-pravega-bridge/src/main/dist/conf
```

- Publish a sample MQTT message.
Note that the topic must be formatted as "topic/car_id" as shown below.
```
mosquitto_pub -t center/0001 -m "12,34,56.78"
```

- You should see the following application output:
```
[MQTT Call: CanDataReader] io.pravega.example.mqtt.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null
```
55 changes: 55 additions & 0 deletions scenarios/mqtt-pravega-bridge/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
plugins {
id 'com.github.johnrengelman.shadow' version '1.2.4'
}

apply plugin: 'java'
apply plugin: "distribution"
apply plugin: 'application'

version = samplesVersion
sourceCompatibility = 1.8
targetCompatibility = 1.8
mainClassName = 'io.pravega.example.mqtt.ApplicationMain'
applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"]
archivesBaseName = 'pravega-mqtt-bridge'

repositories {
mavenCentral()
maven {
url "https://repository.apache.org/snapshots"
}
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
}
}

dependencies {
compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:${pahoClientMqttv3Version}"
compile "org.slf4j:slf4j-api:${slf4jApiVersion}"
compile "ch.qos.logback:logback-classic:${qosLogbackVersion}"
compile "com.google.guava:guava:${guavaVersion}"
compile "io.pravega:pravega-client:${pravegaVersion}"
testCompile "junit:junit:${junitVersion}"
}

shadowJar {
dependencies {
include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0")
}
}

distributions {
main {
baseName = archivesBaseName
contents {
into('lib') {
from shadowJar
from(project.configurations.shadow)
}
}
}
}

run {
args = ["src/main/dist/conf"]
}
14 changes: 14 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Pravega Properties
controllerUri=tcp://127.0.0.1:9090
scope=examples
stream=mqtt-example
scaling.targetRate=100
scaling.scaleFactor=3
scaling.minNumSegments=3

# MQTT Properties
brokerUri=tcp://127.0.0.1:1883
topic=center/#
clientId=CanDataReader
userName=admin
password=password
13 changes: 13 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
log4j.rootLogger=INFO, stdout, file

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=workspace.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
24 changes: 24 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
32 changes: 32 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logFile.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover -->
<fileNamePattern>workspace.%d{yyyy-MM-dd}.log</fileNamePattern>

<!-- keep 1 days' worth of history -->
<maxHistory>5</maxHistory>
</rollingPolicy>

<encoder>
<pattern>%d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
</encoder>
</appender>

<root level="info" additivity="false">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</root>


</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.pravega.example.mqtt;

import com.google.common.base.Preconditions;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;

public class ApplicationArguments {

private final PravegaArgs pravegaArgs = new PravegaArgs();
private final MqttArgs mqttArgs = new MqttArgs();

public ApplicationArguments(String confDir) throws Exception {
loadProperties(confDir);
}

private void loadProperties(String confDir) throws Exception{
Properties prop = new Properties();
try (
InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties");
)
{
prop.load(inputStream);

pravegaArgs.controllerUri = prop.getProperty("controllerUri");
pravegaArgs.scope = prop.getProperty("scope");
pravegaArgs.stream = prop.getProperty("stream");
pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate"));
pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor"));
pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments"));

mqttArgs.brokerUri = prop.getProperty("brokerUri");
mqttArgs.topic = prop.getProperty("topic");
mqttArgs.clientId = prop.getProperty("clientId");
mqttArgs.userName = prop.getProperty("userName");
mqttArgs.password = prop.getProperty("password");

Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing");
Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing");
Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing");

Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing");
Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing");
Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing");
Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing");
Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing");
}
}

public PravegaArgs getPravegaArgs() {
return pravegaArgs;
}

public MqttArgs getMqttArgs() {
return mqttArgs;
}

public static class PravegaArgs {
protected String controllerUri;
protected String scope;
protected String stream;
protected int targetRate;
protected int scaleFactor;
protected int minNumSegments;
}

public static class MqttArgs {
protected String brokerUri;
protected String topic;
protected String clientId;
protected String userName;
protected String password;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.pravega.example.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

public class ApplicationMain {

private static Logger log = LoggerFactory.getLogger( ApplicationMain.class );

public static void main(String ... args) {

if (args.length != 1) {
log.error("Missing required arguments. Usage: java io.pravega.example.mqtt.ApplicationMain <CONF_DIR_PATH>");
return;
}

String confDir = args[0];
log.info("loading configurations from {}", confDir);

final CountDownLatch latch = new CountDownLatch(1);

try {
ApplicationArguments applicationArguments = new ApplicationArguments(confDir);
MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs());

MqttConnectionBuilder builder = new MqttConnectionBuilder();
builder.brokerUri(applicationArguments.getMqttArgs().brokerUri);
builder.topic(applicationArguments.getMqttArgs().topic);
builder.clientId(applicationArguments.getMqttArgs().clientId);
builder.userName(applicationArguments.getMqttArgs().userName);
builder.password(applicationArguments.getMqttArgs().password);
builder.bridge(listener);

MqttClient mqttClient = builder.connect();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Going to close the application");
if (mqttClient != null) {
try {
mqttClient.close();
} catch (MqttException e) {
log.error("Exception Occurred while closing MQTT client", e);
}
}
latch.countDown();
}));
} catch (Exception e) {
log.error("Exception Occurred", e);
}

}
}
Loading

0 comments on commit 0c6deca

Please sign in to comment.