-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 187: Add MQTT to Pravega bridge sample #188
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# MQTT to Pravega Bridge | ||
|
||
This sample application reads events from MQTT and writes them to a Pravega stream. | ||
|
||
## Usage | ||
|
||
- Install Mosquitto MQTT broker and clients. | ||
``` | ||
sudo apt-get install mosquitto mosquitto-clients | ||
``` | ||
|
||
- If not automatically started, start Mosquitto broker. | ||
``` | ||
mosquitto | ||
``` | ||
|
||
- Edit the file src/main/dist/conf/bridge.properties | ||
to specify your Pravega controller URI (controllerUri) as | ||
`tcp://HOST_IP:9090`. | ||
|
||
- Run the application: | ||
``` | ||
../../gradlew run | ||
``` | ||
|
||
- Alternatively, you may run in IntelliJ. | ||
Run the class ApplicationMain with the following parameters: | ||
``` | ||
scenarios/mqtt-pravega-bridge/src/main/dist/conf | ||
``` | ||
|
||
- Publish a sample MQTT message. | ||
Note that the topic must be formatted as "topic/car_id" as shown below. | ||
``` | ||
mosquitto_pub -t center/0001 -m "12,34,56.78" | ||
``` | ||
|
||
- You should see the following application output: | ||
``` | ||
[MQTT Call: CanDataReader] io.pravega.example.mqtt.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
plugins { | ||
id 'com.github.johnrengelman.shadow' version '1.2.4' | ||
} | ||
|
||
apply plugin: 'java' | ||
apply plugin: "distribution" | ||
apply plugin: 'application' | ||
|
||
version = samplesVersion | ||
sourceCompatibility = 1.8 | ||
targetCompatibility = 1.8 | ||
mainClassName = 'io.pravega.example.mqtt.ApplicationMain' | ||
applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"] | ||
archivesBaseName = 'pravega-mqtt-bridge' | ||
|
||
repositories { | ||
mavenCentral() | ||
maven { | ||
url "https://repository.apache.org/snapshots" | ||
} | ||
maven { | ||
url "https://oss.sonatype.org/content/repositories/snapshots" | ||
} | ||
} | ||
|
||
dependencies { | ||
compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:${pahoClientMqttv3Version}" | ||
compile "org.slf4j:slf4j-api:${slf4jApiVersion}" | ||
compile "ch.qos.logback:logback-classic:${qosLogbackVersion}" | ||
compile "com.google.guava:guava:${guavaVersion}" | ||
compile "io.pravega:pravega-client:${pravegaVersion}" | ||
testCompile "junit:junit:${junitVersion}" | ||
} | ||
|
||
shadowJar { | ||
dependencies { | ||
include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0") | ||
} | ||
} | ||
|
||
distributions { | ||
main { | ||
baseName = archivesBaseName | ||
contents { | ||
into('lib') { | ||
from shadowJar | ||
from(project.configurations.shadow) | ||
} | ||
} | ||
} | ||
} | ||
|
||
run { | ||
args = ["src/main/dist/conf"] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Pravega Properties | ||
controllerUri=tcp://127.0.0.1:9090 | ||
scope=examples | ||
stream=mqtt-example | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a sample, writing to a single stream is ok, but in general, isn't it possible that we have use cases in which the same bridge writes to different streams, perhaps inferring the stream from the event data? |
||
scaling.targetRate=100 | ||
scaling.scaleFactor=3 | ||
scaling.minNumSegments=3 | ||
|
||
# MQTT Properties | ||
brokerUri=tcp://127.0.0.1:1883 | ||
topic=center/# | ||
clientId=CanDataReader | ||
userName=admin | ||
password=password |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
log4j.rootLogger=INFO, stdout, file | ||
|
||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender | ||
log4j.appender.stdout.Target=System.out | ||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n | ||
|
||
log4j.appender.file=org.apache.log4j.RollingFileAppender | ||
log4j.appender.file.File=workspace.log | ||
log4j.appender.file.MaxFileSize=10MB | ||
log4j.appender.file.MaxBackupIndex=10 | ||
log4j.appender.file.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
|
||
Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove year from copyright statement. |
||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
--> | ||
<configuration> | ||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<target>System.out</target> | ||
<encoder> | ||
<pattern>%-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="INFO"> | ||
<appender-ref ref="STDOUT" /> | ||
</root> | ||
</configuration> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
<configuration> | ||
|
||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<!-- encoders are assigned the type | ||
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> | ||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> | ||
<file>logFile.log</file> | ||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> | ||
<!-- daily rollover --> | ||
<fileNamePattern>workspace.%d{yyyy-MM-dd}.log</fileNamePattern> | ||
|
||
<!-- keep 1 days' worth of history --> | ||
<maxHistory>5</maxHistory> | ||
</rollingPolicy> | ||
|
||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="info" additivity="false"> | ||
<appender-ref ref="STDOUT"/> | ||
<appender-ref ref="FILE"/> | ||
</root> | ||
|
||
|
||
</configuration> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package io.pravega.example.mqtt; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing license header. |
||
|
||
import com.google.common.base.Preconditions; | ||
|
||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.InputStream; | ||
import java.util.Properties; | ||
|
||
public class ApplicationArguments { | ||
|
||
private final PravegaArgs pravegaArgs = new PravegaArgs(); | ||
private final MqttArgs mqttArgs = new MqttArgs(); | ||
|
||
public ApplicationArguments(String confDir) throws Exception { | ||
loadProperties(confDir); | ||
} | ||
|
||
private void loadProperties(String confDir) throws Exception{ | ||
Properties prop = new Properties(); | ||
try ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: In Pravega core, the style of these statements is in one line. |
||
InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties"); | ||
) | ||
{ | ||
prop.load(inputStream); | ||
|
||
pravegaArgs.controllerUri = prop.getProperty("controllerUri"); | ||
pravegaArgs.scope = prop.getProperty("scope"); | ||
pravegaArgs.stream = prop.getProperty("stream"); | ||
pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate")); | ||
pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor")); | ||
pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments")); | ||
|
||
mqttArgs.brokerUri = prop.getProperty("brokerUri"); | ||
mqttArgs.topic = prop.getProperty("topic"); | ||
mqttArgs.clientId = prop.getProperty("clientId"); | ||
mqttArgs.userName = prop.getProperty("userName"); | ||
mqttArgs.password = prop.getProperty("password"); | ||
|
||
Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing"); | ||
Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing"); | ||
Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing"); | ||
|
||
Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing"); | ||
Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing"); | ||
Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing"); | ||
Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing"); | ||
Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing"); | ||
} | ||
} | ||
|
||
public PravegaArgs getPravegaArgs() { | ||
return pravegaArgs; | ||
} | ||
|
||
public MqttArgs getMqttArgs() { | ||
return mqttArgs; | ||
} | ||
|
||
public static class PravegaArgs { | ||
protected String controllerUri; | ||
protected String scope; | ||
protected String stream; | ||
protected int targetRate; | ||
protected int scaleFactor; | ||
protected int minNumSegments; | ||
} | ||
|
||
public static class MqttArgs { | ||
protected String brokerUri; | ||
protected String topic; | ||
protected String clientId; | ||
protected String userName; | ||
protected String password; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package io.pravega.example.mqtt; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing license header. |
||
|
||
import org.eclipse.paho.client.mqttv3.MqttClient; | ||
import org.eclipse.paho.client.mqttv3.MqttException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
|
||
public class ApplicationMain { | ||
|
||
private static Logger log = LoggerFactory.getLogger( ApplicationMain.class ); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: extra spaces within parenthesis. |
||
|
||
public static void main(String ... args) { | ||
|
||
if (args.length != 1) { | ||
log.error("Missing required arguments. Usage: java io.pravega.example.mqtt.ApplicationMain <CONF_DIR_PATH>"); | ||
return; | ||
} | ||
|
||
String confDir = args[0]; | ||
log.info("loading configurations from {}", confDir); | ||
|
||
final CountDownLatch latch = new CountDownLatch(1); | ||
|
||
try { | ||
ApplicationArguments applicationArguments = new ApplicationArguments(confDir); | ||
MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs()); | ||
|
||
MqttConnectionBuilder builder = new MqttConnectionBuilder(); | ||
builder.brokerUri(applicationArguments.getMqttArgs().brokerUri); | ||
builder.topic(applicationArguments.getMqttArgs().topic); | ||
builder.clientId(applicationArguments.getMqttArgs().clientId); | ||
builder.userName(applicationArguments.getMqttArgs().userName); | ||
builder.password(applicationArguments.getMqttArgs().password); | ||
builder.bridge(listener); | ||
|
||
MqttClient mqttClient = builder.connect(); | ||
|
||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | ||
log.info("Going to close the application"); | ||
if (mqttClient != null) { | ||
try { | ||
mqttClient.close(); | ||
} catch (MqttException e) { | ||
log.error("Exception Occurred while closing MQTT client", e); | ||
} | ||
} | ||
latch.countDown(); | ||
})); | ||
} catch (Exception e) { | ||
log.error("Exception Occurred", e); | ||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are adding license headers to such gradle files too, see pravega/pravega.