-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Issue 187: Add MQTT to Pravega bridge sample
Signed-off-by: Claudio Fahey <[email protected]> # Conflicts: # README.md # settings.gradle
- Loading branch information
Claudio Fahey
committed
Apr 18, 2019
1 parent
e0d152c
commit 306d00b
Showing
15 changed files
with
622 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# MQTT to Pravega Bridge | ||
|
||
This sample application reads events from MQTT and writes them to a Pravega stream. | ||
|
||
## Components | ||
|
||
- Pravega: Pravega provides a new storage abstraction - a stream - for continuous and unbounded data. | ||
A Pravega stream is a durable, elastic, append-only, unbounded sequence of bytes that has good performance and strong consistency. | ||
|
||
Pravega provides dynamic scaling that can increase and decrease parallelism to automatically respond | ||
to changes in the event rate. | ||
|
||
See <http://pravega.io> for more information. | ||
|
||
- Pravega Video Demo: This is a simple command-line application that demonstrates how to write video files to Pravega. | ||
It also demonstrates how to read the video files from Pravega and decode them. | ||
|
||
- Docker: This demo uses Docker and Docker Compose to greatly simplify the deployment of the various | ||
components on Linux and/or Windows servers, desktops, or even laptops. | ||
For more information, see <https://en.wikipedia.org/wiki/Docker_(software)>. | ||
|
||
## Building and Running the Demo | ||
|
||
### Install Java 8 | ||
|
||
``` | ||
apt-get install openjdk-8-jdk | ||
``` | ||
|
||
### Install IntelliJ | ||
|
||
Install from <https://www.jetbrains.com/idea>. | ||
Enable the Lombok plugin. | ||
Enable Annotations (settings -> build, execution, deployment, -> compiler -> annotation processors). | ||
|
||
### Install Docker and Docker Compose | ||
|
||
See <https://docs.docker.com/install/linux/docker-ce/ubuntu/> | ||
and <https://docs.docker.com/compose/install/>. | ||
|
||
### Run Pravega | ||
|
||
This will run a development instance of Pravega locally. | ||
In the command below, replace x.x.x.x with the IP address of a local network interface such as eth0. | ||
|
||
``` | ||
cd | ||
git clone https://github.com/pravega/pravega | ||
cd pravega/docker/compose | ||
export HOST_IP=x.x.x.x | ||
docker-compose up -d | ||
``` | ||
|
||
You can view the Pravega logs with `docker-compose logs --follow`. | ||
You can view the stream files stored on HDFS with `docker-compose exec hdfs hdfs dfs -ls -h -R /`. | ||
|
||
### Usage | ||
|
||
- Install Mosquitto MQTT broker and clients. | ||
``` | ||
sudo apt-get install mosquitto mosquitto-clients | ||
``` | ||
|
||
- If not automatically started, start Mosquitto broker. | ||
``` | ||
mosquitto | ||
``` | ||
|
||
- Edit the file src/main/dist/conf/bridge.properties | ||
to specify your Pravega controller URI (controllerUri) as | ||
`tcp://HOST_IP:9090`. | ||
|
||
- In IntelliJ, run the class com.dell.mqtt.pravega.ApplicationMain with the following parameters: | ||
``` | ||
scenarios/mqtt-pravega-bridge/src/main/dist/conf | ||
``` | ||
|
||
- Publish a sample MQTT message. | ||
``` | ||
mosquitto_pub -t center/0001 -m "12,34,56.78" | ||
``` | ||
|
||
- You should see the following application output: | ||
``` | ||
[MQTT Call: CanDataReader] com.dell.mqtt.pravega.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
plugins { | ||
id 'com.github.johnrengelman.shadow' version '1.2.4' | ||
} | ||
|
||
group 'com.dell.mqtt' | ||
version = samplesVersion | ||
|
||
task wrapper(type: Wrapper) { | ||
gradleVersion = '3.1' | ||
distributionUrl = "https://services.gradle.org/distributions/gradle-$gradleVersion-all.zip" | ||
} | ||
|
||
apply plugin: 'java' | ||
apply plugin: "distribution" | ||
apply plugin: 'application' | ||
|
||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
mainClassName = 'com.dell.mqtt.pravega.ApplicationMain' | ||
applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"] | ||
archivesBaseName = 'pravega-mqtt-bridge' | ||
|
||
repositories { | ||
mavenCentral() | ||
maven { | ||
url "https://repository.apache.org/snapshots" | ||
} | ||
maven { | ||
url "https://oss.sonatype.org/content/repositories/snapshots" | ||
} | ||
} | ||
|
||
dependencies { | ||
testCompile group: 'junit', name: 'junit', version: '4.11' | ||
compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0" | ||
compile "org.slf4j:slf4j-api:1.6.6" | ||
compile "ch.qos.logback:logback-classic:1.0.9" | ||
compile "com.google.guava:guava:20.0" | ||
compile "io.pravega:pravega-client:${pravegaVersion}" | ||
} | ||
|
||
shadowJar { | ||
dependencies { | ||
include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0") | ||
} | ||
} | ||
|
||
distributions { | ||
main { | ||
baseName = archivesBaseName | ||
contents { | ||
into('lib') { | ||
from shadowJar | ||
from(project.configurations.shadow) | ||
} | ||
} | ||
} | ||
} |
14 changes: 14 additions & 0 deletions
14
scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Pravega Properties | ||
controllerUri=tcp://127.0.0.1:9090 | ||
scope=examples | ||
stream=mqtt-example | ||
scaling.targetRate=100 | ||
scaling.scaleFactor=3 | ||
scaling.minNumSegments=3 | ||
|
||
# MQTT Properties | ||
brokerUri=tcp://127.0.0.1:1883 | ||
topic=center/# | ||
clientId=CanDataReader | ||
userName=admin | ||
password=password |
13 changes: 13 additions & 0 deletions
13
scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
log4j.rootLogger=INFO, stdout, file | ||
|
||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender | ||
log4j.appender.stdout.Target=System.out | ||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n | ||
|
||
log4j.appender.file=org.apache.log4j.RollingFileAppender | ||
log4j.appender.file.File=workspace.log | ||
log4j.appender.file.MaxFileSize=10MB | ||
log4j.appender.file.MaxBackupIndex=10 | ||
log4j.appender.file.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n |
24 changes: 24 additions & 0 deletions
24
scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
--> | ||
<configuration> | ||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<target>System.out</target> | ||
<encoder> | ||
<pattern>%-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="INFO"> | ||
<appender-ref ref="STDOUT" /> | ||
</root> | ||
</configuration> |
32 changes: 32 additions & 0 deletions
32
scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
<configuration> | ||
|
||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<!-- encoders are assigned the type | ||
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> | ||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> | ||
<file>logFile.log</file> | ||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> | ||
<!-- daily rollover --> | ||
<fileNamePattern>workspace.%d{yyyy-MM-dd}.log</fileNamePattern> | ||
|
||
<!-- keep 1 days' worth of history --> | ||
<maxHistory>5</maxHistory> | ||
</rollingPolicy> | ||
|
||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="info" additivity="false"> | ||
<appender-ref ref="STDOUT"/> | ||
<appender-ref ref="FILE"/> | ||
</root> | ||
|
||
|
||
</configuration> |
76 changes: 76 additions & 0 deletions
76
scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationArguments.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package com.dell.mqtt.pravega; | ||
|
||
import com.google.common.base.Preconditions; | ||
|
||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.InputStream; | ||
import java.util.Properties; | ||
|
||
public class ApplicationArguments { | ||
|
||
private final PravegaArgs pravegaArgs = new PravegaArgs(); | ||
private final MqttArgs mqttArgs = new MqttArgs(); | ||
|
||
public ApplicationArguments(String confDir) throws Exception { | ||
loadProperties(confDir); | ||
} | ||
|
||
private void loadProperties(String confDir) throws Exception{ | ||
Properties prop = new Properties(); | ||
try ( | ||
InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties"); | ||
) | ||
{ | ||
prop.load(inputStream); | ||
|
||
pravegaArgs.controllerUri = prop.getProperty("controllerUri"); | ||
pravegaArgs.scope = prop.getProperty("scope"); | ||
pravegaArgs.stream = prop.getProperty("stream"); | ||
pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate")); | ||
pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor")); | ||
pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments")); | ||
|
||
mqttArgs.brokerUri = prop.getProperty("brokerUri"); | ||
mqttArgs.topic = prop.getProperty("topic"); | ||
mqttArgs.clientId = prop.getProperty("clientId"); | ||
mqttArgs.userName = prop.getProperty("userName"); | ||
mqttArgs.password = prop.getProperty("password"); | ||
|
||
Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing"); | ||
Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing"); | ||
Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing"); | ||
|
||
Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing"); | ||
Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing"); | ||
Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing"); | ||
Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing"); | ||
Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing"); | ||
} | ||
} | ||
|
||
public PravegaArgs getPravegaArgs() { | ||
return pravegaArgs; | ||
} | ||
|
||
public MqttArgs getMqttArgs() { | ||
return mqttArgs; | ||
} | ||
|
||
public static class PravegaArgs { | ||
protected String controllerUri; | ||
protected String scope; | ||
protected String stream; | ||
protected int targetRate; | ||
protected int scaleFactor; | ||
protected int minNumSegments; | ||
} | ||
|
||
public static class MqttArgs { | ||
protected String brokerUri; | ||
protected String topic; | ||
protected String clientId; | ||
protected String userName; | ||
protected String password; | ||
} | ||
} |
56 changes: 56 additions & 0 deletions
56
scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationMain.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package com.dell.mqtt.pravega; | ||
|
||
import org.eclipse.paho.client.mqttv3.MqttClient; | ||
import org.eclipse.paho.client.mqttv3.MqttException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
|
||
public class ApplicationMain { | ||
|
||
private static Logger log = LoggerFactory.getLogger( ApplicationMain.class ); | ||
|
||
public static void main(String ... args) { | ||
|
||
if (args.length != 1) { | ||
log.error("Missing required arguments. Usage: java com.dell.mqtt.pravega.ApplicationMain <CONF_DIR_PATH>"); | ||
return; | ||
} | ||
|
||
String confDir = args[0]; | ||
log.info("loading configurations from {}", confDir); | ||
|
||
final CountDownLatch latch = new CountDownLatch(1); | ||
|
||
try { | ||
ApplicationArguments applicationArguments = new ApplicationArguments(confDir); | ||
MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs()); | ||
|
||
MqttConnectionBuilder builder = new MqttConnectionBuilder(); | ||
builder.brokerUri(applicationArguments.getMqttArgs().brokerUri); | ||
builder.topic(applicationArguments.getMqttArgs().topic); | ||
builder.clientId(applicationArguments.getMqttArgs().clientId); | ||
builder.userName(applicationArguments.getMqttArgs().userName); | ||
builder.password(applicationArguments.getMqttArgs().password); | ||
builder.bridge(listener); | ||
|
||
MqttClient mqttClient = builder.connect(); | ||
|
||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | ||
log.info("Going to close the application"); | ||
if (mqttClient != null) { | ||
try { | ||
mqttClient.close(); | ||
} catch (MqttException e) { | ||
log.error("Exception Occurred while closing MQTT client", e); | ||
} | ||
} | ||
latch.countDown(); | ||
})); | ||
} catch (Exception e) { | ||
log.error("Exception Occurred", e); | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.