-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Issue 187: Added MQTT to Pravega bridge sample
Signed-off-by: Claudio Fahey <[email protected]>
- Loading branch information
Claudio Fahey
committed
Aug 13, 2019
1 parent
13661b4
commit 7b2d38e
Showing
16 changed files
with
576 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# MQTT to Pravega Bridge | ||
|
||
This sample application reads events from MQTT and writes them to a Pravega stream. | ||
|
||
## Usage | ||
|
||
- Install Mosquitto MQTT broker and clients. | ||
``` | ||
sudo apt-get install mosquitto mosquitto-clients | ||
``` | ||
|
||
- If not automatically started, start Mosquitto broker. | ||
``` | ||
mosquitto | ||
``` | ||
|
||
- Edit the file src/main/dist/conf/bridge.properties | ||
to specify your Pravega controller URI (controllerUri) as | ||
`tcp://HOST_IP:9090`. | ||
|
||
- Run the application: | ||
``` | ||
../../gradlew run | ||
``` | ||
|
||
- Alternatively, you may run in IntelliJ. | ||
Run the class ApplicationMain with the following parameters: | ||
``` | ||
scenarios/mqtt-pravega-bridge/src/main/dist/conf | ||
``` | ||
|
||
- Publish a sample MQTT message. | ||
Note that the topic must be formatted as "topic/car_id" as shown below. | ||
``` | ||
mosquitto_pub -t center/0001 -m "12,34,56.78" | ||
``` | ||
|
||
- You should see the following application output: | ||
``` | ||
[MQTT Call: CanDataReader] io.pravega.example.mqtt.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
plugins { | ||
id 'com.github.johnrengelman.shadow' version '1.2.4' | ||
} | ||
|
||
apply plugin: 'java' | ||
apply plugin: "distribution" | ||
apply plugin: 'application' | ||
|
||
version = samplesVersion | ||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
mainClassName = 'io.pravega.example.mqtt.ApplicationMain' | ||
applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"] | ||
archivesBaseName = 'pravega-mqtt-bridge' | ||
|
||
repositories { | ||
mavenCentral() | ||
maven { | ||
url "https://repository.apache.org/snapshots" | ||
} | ||
maven { | ||
url "https://oss.sonatype.org/content/repositories/snapshots" | ||
} | ||
} | ||
|
||
dependencies { | ||
compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:${pahoClientMqttv3Version}" | ||
compile "org.slf4j:slf4j-api:${slf4jApiVersion}" | ||
compile "ch.qos.logback:logback-classic:${qosLogbackVersion}" | ||
compile "com.google.guava:guava:${guavaVersion}" | ||
compile "io.pravega:pravega-client:${pravegaVersion}" | ||
testCompile "junit:junit:${junitVersion}" | ||
} | ||
|
||
shadowJar { | ||
dependencies { | ||
include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0") | ||
} | ||
} | ||
|
||
distributions { | ||
main { | ||
baseName = archivesBaseName | ||
contents { | ||
into('lib') { | ||
from shadowJar | ||
from(project.configurations.shadow) | ||
} | ||
} | ||
} | ||
} | ||
|
||
run { | ||
args = ["src/main/dist/conf"] | ||
} |
14 changes: 14 additions & 0 deletions
14
scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Pravega Properties | ||
controllerUri=tcp://127.0.0.1:9090 | ||
scope=examples | ||
stream=mqtt-example | ||
scaling.targetRate=100 | ||
scaling.scaleFactor=3 | ||
scaling.minNumSegments=3 | ||
|
||
# MQTT Properties | ||
brokerUri=tcp://127.0.0.1:1883 | ||
topic=center/# | ||
clientId=CanDataReader | ||
userName=admin | ||
password=password |
13 changes: 13 additions & 0 deletions
13
scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
log4j.rootLogger=INFO, stdout, file | ||
|
||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender | ||
log4j.appender.stdout.Target=System.out | ||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n | ||
|
||
log4j.appender.file=org.apache.log4j.RollingFileAppender | ||
log4j.appender.file.File=workspace.log | ||
log4j.appender.file.MaxFileSize=10MB | ||
log4j.appender.file.MaxBackupIndex=10 | ||
log4j.appender.file.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n |
24 changes: 24 additions & 0 deletions
24
scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
--> | ||
<configuration> | ||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<target>System.out</target> | ||
<encoder> | ||
<pattern>%-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="INFO"> | ||
<appender-ref ref="STDOUT" /> | ||
</root> | ||
</configuration> |
32 changes: 32 additions & 0 deletions
32
scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
<configuration> | ||
|
||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<!-- encoders are assigned the type | ||
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> | ||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> | ||
<file>logFile.log</file> | ||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> | ||
<!-- daily rollover --> | ||
<fileNamePattern>workspace.%d{yyyy-MM-dd}.log</fileNamePattern> | ||
|
||
<!-- keep 1 days' worth of history --> | ||
<maxHistory>5</maxHistory> | ||
</rollingPolicy> | ||
|
||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="info" additivity="false"> | ||
<appender-ref ref="STDOUT"/> | ||
<appender-ref ref="FILE"/> | ||
</root> | ||
|
||
|
||
</configuration> |
76 changes: 76 additions & 0 deletions
76
...arios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationArguments.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package io.pravega.example.mqtt; | ||
|
||
import com.google.common.base.Preconditions; | ||
|
||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.InputStream; | ||
import java.util.Properties; | ||
|
||
public class ApplicationArguments { | ||
|
||
private final PravegaArgs pravegaArgs = new PravegaArgs(); | ||
private final MqttArgs mqttArgs = new MqttArgs(); | ||
|
||
public ApplicationArguments(String confDir) throws Exception { | ||
loadProperties(confDir); | ||
} | ||
|
||
private void loadProperties(String confDir) throws Exception{ | ||
Properties prop = new Properties(); | ||
try ( | ||
InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties"); | ||
) | ||
{ | ||
prop.load(inputStream); | ||
|
||
pravegaArgs.controllerUri = prop.getProperty("controllerUri"); | ||
pravegaArgs.scope = prop.getProperty("scope"); | ||
pravegaArgs.stream = prop.getProperty("stream"); | ||
pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate")); | ||
pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor")); | ||
pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments")); | ||
|
||
mqttArgs.brokerUri = prop.getProperty("brokerUri"); | ||
mqttArgs.topic = prop.getProperty("topic"); | ||
mqttArgs.clientId = prop.getProperty("clientId"); | ||
mqttArgs.userName = prop.getProperty("userName"); | ||
mqttArgs.password = prop.getProperty("password"); | ||
|
||
Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing"); | ||
Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing"); | ||
Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing"); | ||
|
||
Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing"); | ||
Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing"); | ||
Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing"); | ||
Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing"); | ||
Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing"); | ||
} | ||
} | ||
|
||
public PravegaArgs getPravegaArgs() { | ||
return pravegaArgs; | ||
} | ||
|
||
public MqttArgs getMqttArgs() { | ||
return mqttArgs; | ||
} | ||
|
||
public static class PravegaArgs { | ||
protected String controllerUri; | ||
protected String scope; | ||
protected String stream; | ||
protected int targetRate; | ||
protected int scaleFactor; | ||
protected int minNumSegments; | ||
} | ||
|
||
public static class MqttArgs { | ||
protected String brokerUri; | ||
protected String topic; | ||
protected String clientId; | ||
protected String userName; | ||
protected String password; | ||
} | ||
} |
56 changes: 56 additions & 0 deletions
56
scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationMain.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package io.pravega.example.mqtt; | ||
|
||
import org.eclipse.paho.client.mqttv3.MqttClient; | ||
import org.eclipse.paho.client.mqttv3.MqttException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
|
||
public class ApplicationMain { | ||
|
||
private static Logger log = LoggerFactory.getLogger( ApplicationMain.class ); | ||
|
||
public static void main(String ... args) { | ||
|
||
if (args.length != 1) { | ||
log.error("Missing required arguments. Usage: java io.pravega.example.mqtt.ApplicationMain <CONF_DIR_PATH>"); | ||
return; | ||
} | ||
|
||
String confDir = args[0]; | ||
log.info("loading configurations from {}", confDir); | ||
|
||
final CountDownLatch latch = new CountDownLatch(1); | ||
|
||
try { | ||
ApplicationArguments applicationArguments = new ApplicationArguments(confDir); | ||
MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs()); | ||
|
||
MqttConnectionBuilder builder = new MqttConnectionBuilder(); | ||
builder.brokerUri(applicationArguments.getMqttArgs().brokerUri); | ||
builder.topic(applicationArguments.getMqttArgs().topic); | ||
builder.clientId(applicationArguments.getMqttArgs().clientId); | ||
builder.userName(applicationArguments.getMqttArgs().userName); | ||
builder.password(applicationArguments.getMqttArgs().password); | ||
builder.bridge(listener); | ||
|
||
MqttClient mqttClient = builder.connect(); | ||
|
||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | ||
log.info("Going to close the application"); | ||
if (mqttClient != null) { | ||
try { | ||
mqttClient.close(); | ||
} catch (MqttException e) { | ||
log.error("Exception Occurred while closing MQTT client", e); | ||
} | ||
} | ||
latch.countDown(); | ||
})); | ||
} catch (Exception e) { | ||
log.error("Exception Occurred", e); | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.