+ org.slf4j
+ slf4j-log4j12
+ org.scala-tools
+ maven-scala-plugin
+ 2.15.2
+ compile
+ testCompile
+ maven-compiler-plugin
+ 3.6.0
+ 1.8
+ 1.8
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19
+ true
+package com.smarthane.spark.scalalearn.s01
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/6/27 14:29
+ * @Description: 在函数式编程语言中,函数是“头等公民”,它可以像任何其他数据类型一样被传递和操作
+ */
+object MethodAndFunctionDemo {
+ //定义一个方法
+ //方法m2参数要求是一个函数,函数的参数必须是两个Int类型
+ //返回值类型也是Int类型
+ def m1(f:(Int,Int) => Int) : Int = {
+ f(2,6)
+ }
+ //定义一个函数f1,参数是两个Int类型,返回值是一个Int类型
+ val f1 = (x:Int,y:Int) => x + y
+ //再定义一个函数f2
+ val f2 = (m:Int,n:Int) => m * n
+ def ttt(f:Int => Int):Unit = {
+ val r = f(10)
+ println(r)
+ }
+ val f0 = (x : Int) => x * x
+ //定义了一个方法
+ def m0(x:Int) : Int = {
+ //传递进来的参数乘以10
+ x * 10
+ }
+ //将方法转换成函数,利用了神奇的下滑线
+ val f11 = m0 _
+ //main方法
+ def main(args: Array[String]): Unit = {
+ //调用m1方法,并传入f1函数
+ val r1 = m1(f1)
+ println(r1)
+ //调用m1方法,并传入f2函数
+ val r2 = m1(f2)
+ println(r2)
+ println("---------------------")
+ ttt(f0)
+ //通过m0 _将方法转化成函数
+ ttt(m0 _);
+ //如果直接传递的是方法名称,scala相当于是把方法转成了函数
+ ttt(m0)
+ //通过x => m0(x)的方式将方法转化成函数,这个函数是一个匿名函数,等价:(x:Int) => m0(x)
+ ttt(x => m0(x))
+ }
+package com.smarthane.spark.scalalearn.s01
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/6/27 17:45
+ * @Description:
+ */
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+object SparkTest {
+ val conf: SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
+ val sc: SparkContext = new SparkContext(conf)
+ /**
+ * 创建数据的方式--从内存里构造数据(基础)
+ */
+ def createDataMethod(): Unit = {
+ /* 使用makeRDD创建RDD */
+ /* List */
+ val rdd01 = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
+ val r01 = rdd01.map { x => x * x }
+ println("===================createDataMethod:makeRDD:List=====================")
+ println(r01.collect().mkString(","))
+ println("===================createDataMethod:makeRDD:List=====================")
+ /* Array */
+ val rdd02 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6))
+ val r02 = rdd02.filter { x => x < 5 }
+ println("===================createDataMethod:makeRDD:Array=====================")
+ println(r02.collect().mkString(","))
+ println("===================createDataMethod:makeRDD:Array=====================")
+ /* 使用parallelize创建RDD */
+ /* List */
+ val rdd03 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 1)
+ val r03 = rdd03.map { x => x + 1 }
+ println("===================createDataMethod:parallelize:List=====================")
+ println(r03.collect().mkString(","))
+ println("===================createDataMethod:parallelize:List=====================")
+ /* Array */
+ val rdd04 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 1)
+ val r04 = rdd04.filter { x => x > 3 }
+ println("===================createDataMethod:parallelize:Array=====================")
+ println(r04.collect().mkString(","))
+ println("===================createDataMethod:parallelize:Array=====================")
+ }
+ /**
+ * 创建Pair Map
+ */
+ def createPairRDD(): Unit = {
+ val rdd: RDD[(String, Int)] = sc.makeRDD(List(("key01", 1), ("key02", 2), ("key03", 3)))
+ val r: RDD[String] = rdd.keys
+ println("===========================createPairRDD=================================")
+ println(r.collect().mkString(","))
+ println("===========================createPairRDD=================================")
+ }
+ /**
+ * 通过文件创建RDD
+ * 文件数据:
+ * key01,1,2.3
+ * key02,5,3.7
+ * key03,23,4.8
+ * key04,12,3.9
+ * key05,7,1.3
+ */
+ def createDataFromFile(path: String): Unit = {
+ val rdd: RDD[String] = sc.textFile(path, 1)
+ val r: RDD[String] = rdd.flatMap { x => x.split(",") }
+ println("=========================createDataFromFile==================================")
+ println(r.collect().mkString(","))
+ println("=========================createDataFromFile==================================")
+ }
+ /**
+ * 基本的RDD操作
+ */
+ def basicTransformRDD(path: String): Unit = {
+ val rddInt: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 2, 5, 1))
+ val rddStr: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "b", "a"), 1)
+ val rddFile: RDD[String] = sc.textFile(path, 1)
+ val rdd01: RDD[Int] = sc.makeRDD(List(1, 3, 5, 3))
+ val rdd02: RDD[Int] = sc.makeRDD(List(2, 4, 5, 1))
+ /* map操作 */
+ println("======map操作======")
+ println(rddInt.map(x => x + 1).collect().mkString(","))
+ println("======map操作======")
+ /* filter操作 */
+ println("======filter操作======")
+ println(rddInt.filter(x => x > 4).collect().mkString(","))
+ println("======filter操作======")
+ /* flatMap操作 */
+ println("======flatMap操作======")
+ println(rddFile.flatMap { x => x.split(",") }.first())
+ println("======flatMap操作======")
+ /* distinct去重操作 */
+ println("======distinct去重======")
+ println(rddInt.distinct().collect().mkString(","))
+ println(rddStr.distinct().collect().mkString(","))
+ println("======distinct去重======")
+ /* union操作 */
+ println("======union操作======")
+ println(rdd01.union(rdd02).collect().mkString(","))
+ println("======union操作======")
+ /* intersection操作 */
+ println("======intersection操作======")
+ println(rdd01.intersection(rdd02).collect().mkString(","))
+ println("======intersection操作======")
+ /* subtract操作 */
+ println("======subtract操作======")
+ println(rdd01.subtract(rdd02).collect().mkString(","))
+ println("======subtract操作======")
+ /* cartesian操作 */
+ println("======cartesian操作======")
+ println(rdd01.cartesian(rdd02).collect().mkString(","))
+ println("======cartesian操作======")
+ }
+ /**
+ * 基本的RDD行动操作
+ */
+ def basicActionRDD(): Unit = {
+ val rddInt: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 2, 5, 1))
+ val rddStr: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "b", "a"), 1)
+ /* count操作 */
+ println("======count操作======")
+ println(rddInt.count())
+ println("======count操作======")
+ /* countByValue操作 */
+ println("======countByValue操作======")
+ println(rddInt.countByValue())
+ println("======countByValue操作======")
+ /* reduce操作 */
+ println("======countByValue操作======")
+ println(rddInt.reduce((x, y) => x + y))
+ println("======countByValue操作======")
+ /* fold操作 */
+ println("======fold操作======")
+ println(rddInt.fold(0)((x, y) => x + y))
+ println("======fold操作======")
+ /* aggregate操作 */
+ println("======aggregate操作======")
+ val res: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => (x._1 + x._2, y), (x, y) => (x._1 + x._2, y._1 + y._2))
+ println(res._1 + "," + res._2)
+ println("======aggregate操作======")
+ /* foeach操作 */
+ println("======foeach操作======")
+ println(rddStr.foreach { x => println(x) })
+ println("======foeach操作======")
+ }
+ def main(args: Array[String]): Unit = {
+ println(System.getenv("HADOOP_HOME"))
+ createDataMethod()
+ createPairRDD()
+ //createDataFromFile("file:///D:/sparkdata.txt")
+ //basicTransformRDD("file:///D:/sparkdata.txt")
+ //basicActionRDD()
+ /*打印结果*/
+ /*D://hadoop
+Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
+ }
+package com.smarthane.spark.scalalearn.s01
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/6/27 18:04
+ * @Description:
+ */
+object SpartTest2 {
+ def main(args: Array[String]): Unit = {
+ Logger.getLogger("org").setLevel(Level.OFF)
+ Logger.getLogger("com").setLevel(Level.OFF)
+ System.setProperty("spark.ui.showConsoleProgress", "false")
+ Logger.getRootLogger().setLevel(Level.OFF)
+ val conf: SparkConf = new SparkConf().setAppName("SpartTest2").setMaster("local[2]")
+ val sc: SparkContext = new SparkContext(conf)
+ val rdd01 = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
+ val r01 = rdd01.map { x => x * x }
+ println(r01.collect().mkString(","))
+ }
package com.smarthane.spark.scalalearn.s01
+import scala.reflect.internal.util.StringOps
+import scala.math._
* Created with by smarthane-cloud-microservice.
@@ -9,8 +13,32 @@ package com.smarthane.spark.scalalearn.s01
object Test {
+ def abs(x: Double) = if (x >= 0) x else -x
+ def fac(n: Int) = {
+ var r = 1
+ for (i <- 1 to n) r = r * i
+ r
+ }
+ def fac2(n: Int): Int = if (n <= 0) 1 else n * fac2(n - 1)
def main(args: Array[String]): Unit = {
- println("Hello world ")
+ val distance = {
+ val dx = 9
+ val dy = 8
+ sqrt(dx * dx + dy * dy)
+ }
+ val abs1 = abs(-9)
+ val fac1 = fac2(5)
+ println(s"Hello world $distance --- $abs1 --- $fac1")
-package com.smarthane;
-import static org.junit.Assert.assertTrue;
-import org.junit.Test;
- * Unit test for simple App.
- */
-public class AppTest
- /**
- * Rigorous Test :-)
- */
- @Test
- public void shouldAnswerWithTrue()
- {
- assertTrue( true );
- }
+ mudfrog-modules
+ com.smarthane
+ 1.0.0
+ 4.0.0
+ mudfrog-dubbo-api
+ 1.0.0
+ mudfrog-dubbo-api
diff --git a/mudfrog-modules/mudfrog-dubbo-api/src/main/java/com/smarthane/dubbo/api/DemoService.java b/mudfrog-modules/mudfrog-dubbo-api/src/main/java/com/smarthane/dubbo/api/DemoService.java
new file mode 100644
index 0000000..1862d36
--- /dev/null
+++ b/mudfrog-modules/mudfrog-dubbo-api/src/main/java/com/smarthane/dubbo/api/DemoService.java
@@ -0,0 +1,19 @@
+ * File Name: DemoService
+ * Created by leeyh on 2018/7/10 14:09.
+ * Copyright:Copyright © 1985-2017 ZKTeco Inc.All right reserved.
+ */
+package com.smarthane.dubbo.api;
+ * @author smarthane
+ * @version v1.0.0
+ * @date 2018/7/10 14:09
+ */
+public interface DemoService {
+ String sayHello(String name);
+ mudfrog-modules
+ com.smarthane
+ 1.0.0
+ 4.0.0
+ mudfrog-dubbo-biz
+ 1.0.0
+ mudfrog-dubbo-biz
+ org.springframework.boot
+ spring-boot-starter-web
+ org.springframework.boot
+ spring-boot-starter-actuator
+ com.alibaba.boot
+ dubbo-spring-boot-starter
+ 0.2.0
+ com.alibaba.boot
+ dubbo-spring-boot-actuator
+ 0.2.0
+ com.smarthane
+ mudfrog-dubbo-api
+ 1.0.0
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+ repackage
+package com.smarthane.dubbo.biz;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/7/10 14:33
+ * @Description:
+ */
+public class DubboBizApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(DubboBizApplication.class);
+ }
+package com.smarthane.dubbo.biz.controller;
+import com.alibaba.dubbo.config.annotation.Reference;
+import com.smarthane.dubbo.api.DemoService;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/7/10 14:28
+ * @Description:
+ */
+public class DemoController {
+ @Reference(version = "${demo.service.version}", application = "${dubbo.application.id}", url = "dubbo://localhost:12345")
+ private DemoService demoService;
+ @RequestMapping("/sayHello")
+ public String sayHello(@RequestParam String name) {
+ return demoService.sayHello(name);
+ }
+package com.smarthane.dubbo.biz.service;
+import com.alibaba.dubbo.config.annotation.Service;
+import com.smarthane.dubbo.api.DemoService;
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/7/10 14:37
+ * @Description:
+ */
+ version = "${demo.service.version}",
+ application = "${dubbo.application.id}",
+ protocol = "${dubbo.protocol.id}",
+ registry = "${dubbo.registry.id}"
+public class DefaultDemoService implements DemoService {
+ @Override
+ public String sayHello(String name) {
+ return "Hello, " + name + " (from Spring Boot)";
+ }
+ profiles:
+ active: mudfrog-dubbo-provider
+## --- 为多环境配置分隔
+ profiles: mudfrog-dubbo-provider
+ application:
+ name: mudfrog-dubbo-provider
+ port: 9090
+ service:
+ version: 1.0.0
+ application:
+ id: dubbo-provider-demo
+ name: dubbo-provider-demo
+ qos-port: 22222
+ qos-enable: true
+ protocol:
+ id: dubbo
+ name: dubbo
+ port: 12345
+ status: server
+ registry:
+ id: my-registry
+ address: N/A
+ scan:
+ base-packages: com.smarthane.dubbo.biz.service
+ endpoint:
+ dubbo:
+ enabled: true
+ dubbo-shutdown:
+ enabled: true
+ dubbo-configs:
+ enabled: true
+ dubbo-services:
+ enabled: true
+ dubbo-references:
+ enabled: true
+ dubbo-properties:
+ enabled: true
+ health:
+ dubbo:
+ status:
+ defaults: memory
+ extras: load,threadpool
+ profiles: mudfrog-dubbo-consumer
+ application:
+ name: mudfrog-dubbo-consumer
+ port: 8080
+ service:
+ version: 1.0.0
+ application:
+ id: dubbo-consumer-demo
+ name: dubbo-consumer-demo
+ qos-port: 22223
+ protocol:
+ id: dubbo
+ name: dubbo
+ port: 12345
+ dubbo:
+ enabled: true
+ server:
+ port: 8081
+ endpoint:
+ dubbo:
+ enabled: true
+ dubbo-shutdown:
+ enabled: true
+ dubbo-configs:
+ enabled: true
+ dubbo-services:
+ enabled: true
+ dubbo-references:
+ enabled: true
+ dubbo-properties:
+ enabled: true
+ health:
+ dubbo:
+ status:
+ defaults: memory
+ extras: load,threadpool
+ mudfrog-dubbo-api
+ mudfrog-dubbo-biz
+ org.fusesource.mqtt-client
+ mqtt-client
+ 1.14
+ com.alibaba
+ fastjson
+ 1.2.47
+ com.esotericsoftware
+ kryo
+ 4.0.2
+ de.ruedigermoeller
+ fst
+ 2.57
+package com.smarthane.mudfrog.sapmles.mqtt;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.client.Callback;
+import org.fusesource.mqtt.client.ExtendedListener;
+public class MyLs implements ExtendedListener {
+ public void onPublish(UTF8Buffer topic, Buffer body, Callback> ack) {
+ System.out.println(new String(body.toByteArray()));
+ ack.onSuccess(new Callback(){
+ public void onSuccess(Object value) {
+ }
+ public void onFailure(Throwable value) {
+ }
+ });
+ }
+ public void onConnected() {
+ System.out.println("onConnected");
+ }
+ public void onDisconnected() {
+ System.out.println("onDisconnected");
+ }
+ public void onFailure(Throwable value) {
+ System.out.println("onFailure");
+ }
+ public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) {
+ }
+package com.smarthane.mudfrog.sapmles.mqtt;
+import com.alibaba.fastjson.JSONObject;
+import com.smarthane.mudfrog.sapmles.mqtt.utils.PublishClient;
+import java.net.URISyntaxException;
+public class PublishTest {
+ public static void main(String[] args) throws URISyntaxException {
+ PublishClient bb = new PublishClient("tcp://", "iot/update/dev_id/+/app_id/+/forcedType/+", "bbaa002", "admin", "admin");
+ JSONObject obj = new JSONObject();
+ obj.put("deviceId", 5);
+ obj.put("appid", 5);
+ bb.run();
+ bb.PublishMessage(obj.toJSONString());
+// PublishClient aa = new PublishClient("tcp://", "$SYS/brokers/disconnected/clients/aa", "001", "admin", "admin");
+// JSONObject obj = new JSONObject();
+// obj.put("deviceId", 5);
+// obj.put("appid", 5);
+// aa.run();
+// int i = 0;
+// aa.PublishMessage(obj.toJSONString());
+// while (true){
+// aa.PublishMessage(obj.toJSONString());
+// SleepUtils.second(1);
+// i++;
+// System.out.println(i);
+// }
+ //aa.disconnect();
+ }
+package com.smarthane.mudfrog.sapmles.mqtt;
+import com.smarthane.mudfrog.sapmles.mqtt.utils.SleepUtils;
+import com.smarthane.mudfrog.sapmles.mqtt.utils.SubscribeClient;
+import java.net.URISyntaxException;
+public class SubscribeTest {
+ public static void main(String[] args) throws URISyntaxException {
+ SubscribeClient bb = new SubscribeClient("tcp://", new String[]{"iot/update/dev_id/+/app_id/+/forcedType/+","iot/update/dev_type/+/app_id/+/version/#"}, "bbaa002", "admin", "admin");
+ bb.run(new MyLs());
+ while (true){
+ SleepUtils.second(1);
+ }
+ }
+package com.smarthane.mudfrog.sapmles.mqtt.utils;
+import org.fusesource.mqtt.client.Callback;
+import org.fusesource.mqtt.client.CallbackConnection;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import java.util.ArrayList;
+import java.util.List;
+public class CallbackHandle implements Callback {
+ private CallbackConnection cbC = null;
+ private String[] topics;
+ private String status="";
+ CallbackHandle(CallbackConnection cc, String[] topics){
+ this.cbC = cc;
+ this.topics = topics;
+ }
+ // 连接成功
+ public void onSuccess(Object o) {
+ //订阅主题
+ List ltopics = new ArrayList();
+ for (String topic:topics){
+ ltopics.add(new Topic(topic, QoS.AT_LEAST_ONCE));
+ }
+ Topic[] array =new Topic[ltopics.size()];
+ cbC.subscribe(ltopics.toArray(array), new Callback() {
+ //订阅主题成功
+ public void onSuccess(byte[] bytes) {
+ status="subscribe onSuccess";
+ }
+ //订阅主题失败
+ public void onFailure(Throwable throwable) {
+ status="subscribe onFailure";
+ cbC.disconnect(null);
+ }
+ });
+ }
+ //连接失败
+ public void onFailure(Throwable throwable) {
+ status="connect Failure";
+ cbC.disconnect(null);
+ }
+ public String getStatus() {
+ return status;
+ }
+package com.smarthane.mudfrog.sapmles.mqtt.utils;
+import org.fusesource.mqtt.client.CallbackConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import java.net.URISyntaxException;
+public class Initialization {
+ private final static boolean CLEAN_START = false;
+ private final static short KEEP_ALIVE = 30;
+ public final static long RECONNECTION_ATTEMPT_MAX=-1;
+ public final static long RECONNECTION_DELAY=-1;
+ public final static int SEND_BUFFER_SIZE=2*1024*1024;
+ public static CallbackConnection GetCallbackConnection(String mqttBrokerUrl, String clientId, String userName, String passWord) throws URISyntaxException {
+ MQTT mqtt = new MQTT();
+ mqtt.setHost(mqttBrokerUrl);
+ mqtt.setClientId(clientId);
+ mqtt.setCleanSession(CLEAN_START);
+ mqtt.setKeepAlive(KEEP_ALIVE);
+ mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
+ mqtt.setReconnectDelay(RECONNECTION_DELAY);
+ mqtt.setUserName(userName);
+ mqtt.setPassword(passWord);
+ mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
+ mqtt.setVersion("3.1.1");
+ return mqtt.callbackConnection();
+ }
+package com.smarthane.mudfrog.sapmles.mqtt.utils;
+import java.net.URISyntaxException;
+import org.fusesource.mqtt.client.Callback;
+import org.fusesource.mqtt.client.CallbackConnection;
+import org.fusesource.mqtt.client.QoS;
+public class PublishClient {
+ private String MqttBrokerUrl;
+ private String topic;
+ private String ClientId;
+ private String UserName;
+ private String PassWord;
+ private CallbackConnection callbackConnection = null;
+ public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
+ private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
+ public PublishClient(String mqttBrokerUrl, String topic, String clientId, String userName, String passWord) {
+ this.MqttBrokerUrl=mqttBrokerUrl;
+ this.topic=topic;
+ this.ClientId=clientId;
+ this.UserName=userName;
+ this.PassWord=passWord;
+ }
+ public PublishClient(String mqttBrokerUrl, String clientId, String userName, String passWord) {
+ this.MqttBrokerUrl=mqttBrokerUrl;
+ this.ClientId=clientId;
+ this.UserName=userName;
+ this.PassWord=passWord;
+ }
+ public String getTopic() {
+ return topic;
+ }
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+ /**
+ * 初始化发布消息客户端
+ * @return
+ */
+ public void run() throws URISyntaxException {
+ callbackConnection=Initialization.GetCallbackConnection(MqttBrokerUrl,ClientId,UserName,PassWord);
+ callbackConnection.connect(new Callback() {
+ public void onFailure(Throwable arg0) {
+ System.out.println("connect failure");
+ }
+ public void onSuccess(Object arg0) {
+ System.out.println("connect success");
+ }
+ });
+ }
+ public void PublishMessage(String msgContent){
+ callbackConnection.publish(topic, msgContent.getBytes(), QoS.AT_LEAST_ONCE, true, new Callback(){
+ public void onSuccess(Void aVoid) {
+ System.out.println("send message ok");
+ }
+ public void onFailure(Throwable throwable){
+ System.out.println("send message failure");
+ }
+ }
+ );
+ }
+ public void disconnect(){
+ callbackConnection.disconnect(null);
+ }
+package com.smarthane.mudfrog.sapmles.mqtt.utils;
+import java.util.concurrent.TimeUnit;
+public class SleepUtils {
+ public static final void second (long seconds){
+ try {
+ TimeUnit.SECONDS.sleep(seconds);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ public static final void millisecond (long millisecond){
+ try {
+ TimeUnit.MILLISECONDS.sleep(millisecond);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+package com.smarthane.mudfrog.sapmles.mqtt.utils;
+import java.net.URISyntaxException;
+import org.fusesource.mqtt.client.*;
+public class SubscribeClient {
+ private String MqttBrokerUrl;
+ private String[] topic;
+ private String ClientId;
+ private String UserName;
+ private String PassWord;
+ private CallbackConnection callbackConnection= null;
+ public SubscribeClient(String mqttBrokerUrl, String[] topic, String clientId, String userName, String passWord) {
+ this.MqttBrokerUrl=mqttBrokerUrl;
+ this.topic=topic;
+ this.ClientId=clientId;
+ this.UserName=userName;
+ this.PassWord=passWord;
+ }
+ public void run(ExtendedListener myListener) throws URISyntaxException {
+ callbackConnection = Initialization.GetCallbackConnection(MqttBrokerUrl,ClientId,UserName,PassWord);
+ callbackConnection.listener(myListener);
+ CallbackHandle callbackHandle = new CallbackHandle(callbackConnection,topic);
+ callbackConnection.connect(callbackHandle);
+ }
+ /**
+ * 断开连接
+ */
+ public void disconnect(){
+ callbackConnection.disconnect(null);
+ }
+package com.smarthane.mudfrog.sapmles.serializer;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.nustaq.serialization.FSTConfiguration;
+import java.io.*;
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/7/20 10:40
+ * @Description:
+ */
+public class SerializerUtil {
+ private static FSTConfiguration fstConfiguration = FSTConfiguration.createStructConfiguration();
+ private static Kryo kryo = new Kryo();
+ public static byte[] fstSerialize(Object obj) {
+ return fstConfiguration.asByteArray(obj);
+ }
+ public static Object fstUnserialize(byte[] sec) {
+ return fstConfiguration.asObject(sec);
+ }
+ public static byte[] kryoSerizlize(Object obj) {
+ try {
+ Kryo kryo = new Kryo();
+ byte[] buffer = new byte[2048];
+ Output output = new Output(buffer);
+ kryo.writeClassAndObject(output, obj);
+ return output.toBytes();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ public static Object kryoUnSerizlize(byte[] src) {
+ try {
+ Input input = new Input(src);
+ return kryo.readClassAndObject(input);
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ return kryo;
+ }
+ // jdk原生序列换方案
+ public static byte[] jdkserialize(Object obj) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(obj);
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ public static Object jdkdeserialize(byte[] bits) {
+ try {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bits);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ return ois.readObject();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+package com.smarthane.mudfrog.sapmles.serializer;
+import java.io.Serializable;
+ * Created with by smarthane-cloud-microservice.
+ *
+ * @author: smarthane
+ * @Date: 2018/7/20 10:46
+ * @Description:
+ */
+public class Test {
+ public static void main(String[] args) {
+ User bean = new User();
+ bean.setUsername("xxxxx");
+ bean.setPassword("123456");
+ bean.setAge(1000000);
+ System.out.println("序列化 , 反序列化 对比测试:");
+ long size = 0;
+ long time1 = System.currentTimeMillis();
+ for (int i = 0; i < 10000; i++) {
+ byte[] jdkserialize = SerializerUtil.jdkserialize(bean);
+ size += jdkserialize.length;
+ SerializerUtil.jdkdeserialize(jdkserialize);
+ }
+ System.out.println("原生序列化方案[序列化10000次]耗时:"
+ + (System.currentTimeMillis() - time1) + "ms size:=" + size);
+ size = 0;
+ long time2 = System.currentTimeMillis();
+ for (int i = 0; i < 10000; i++) {
+ byte[] serialize = SerializerUtil.fstSerialize(bean);
+ size += serialize.length;
+ User u = (User) SerializerUtil.fstUnserialize(serialize);
+ }
+ System.out.println("fst序列化方案[序列化10000次]耗时:"
+ + (System.currentTimeMillis() - time2) + "ms size:=" + size);
+ size = 0;
+ long time3 = System.currentTimeMillis();
+ for (int i = 0; i < 10000; i++) {
+ byte[] serialize = SerializerUtil.kryoSerizlize(bean);
+ size += serialize.length;
+ User u = (User) SerializerUtil.kryoUnSerizlize(serialize);
+ }
+ System.out.println("kryo序列化方案[序列化10000次]耗时:"
+ + (System.currentTimeMillis() - time3) + "ms size:=" + size);
+ }
+ static class User implements Serializable {
+ private String username;
+ private int age;
+ private String password;
+ public String getUsername() {
+ return username;
+ }
+ public void setUsername(String username) {
+ this.username = username;
+ }
+ public int getAge() {
+ return age;
+ }
+ public void setAge(int age) {
+ this.age = age;
+ }
+ public String getPassword() {
+ return password;
+ }
+ public void setPassword(String password) {
+ this.password = password;
+ }
+ }