- 轻量级部署:可以单独部署,也支持集群部署
- 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
-
整体架构: 介绍RocketMQ-streams总体构成;
-
构建拓扑图: 介绍如何通过DataStream将各个算子添加进入计算拓扑;
-
启动:介绍计算拓扑图如何启动,消费数据流入之前需要做那些准备工作;
-
数据的流转:介绍数据进入计算拓扑的流转过程,以及各种系统消息的作用原理;
-
Window算子解析: 介绍有状态算子window实例化、数据处理、窗口触发过程;
import org.apache.rocketmq.streams.client.transform.DataStream;
DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
source
.fromFile("~/admin/data/text.txt",false)
.map(message->message)
.toPrint(1)
.start();
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
StreamBuilder 用于构建流任务的源;
- dataStream(nameSpaceName,pipelineName) 返回DataStreamSource实例,用于分段编程实现流计算任务;
DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
-
fromFile
从文件中读取数据, 该方法包含俩个参数filePath
文件路径,必填参数isJsonData
是否json数据, 非必填参数, 默认为true
-
fromRocketmq
从rocketmq中获取数据,包含四个参数topic
rocketmq消息队列的topic名称,必填参数groupName
消费者组的名称,必填参数isJson
是否json格式,非必填参数tags
rocketmq消费的tags值,用于过滤消息,非必填参数
-
fromMqtt
从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数url
mqtt broker的地址,必填参数clientId
客户端ID, 必填参数,相同的clientId有负载的作用topic
topic信息, 必填参数username
用户名, 非必填,在mqtt端添加鉴权机制时使用password
密码,非必填参数,在mqtt端添加鉴权机制时使用cleanSession
是否清理session信息, 非必填,默认为trueconnectionTimeout
连接超时信息, 非必填,默认是10saliveInterval
判断连接是否活跃的间隔时间,非必填,默认是60sautomaticReconnect
连接断开后自动重连机制,非必填,默认是true
-
from
自定义的数据源, 通过实现ISource接口实现自己的数据源
transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括DataStream
,JoinStream
, SplitStream
,WindowStream
等多个transform类;
DataStream实现了一系列常见的流计算算子
map
通过将源的每个记录传递给函数func来返回一个新的DataStreamflatmap
与map类似,一个输入项对应0个或者多个输出项filter
只选择func返回true的源DStream的记录来返回一个新的DStreamforEach
对每个记录执行一次函数func, 返回一个新的DataStreamselectFields
对每个记录返回对应的字段值,返回一个新的DataStreamoperate
对每个记录执行一次自定义的函数,返回一个新的DataStreamscript
针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStreamtoPrint
将结果在控制台打印,生成新的DataStream实例toFile
将结果保存为文件,生成一个新的DataStream实例toMqtt
将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例toDB
将结果保存到数据库toRocketmq
将结果输出到rocketmqto
将结果经过自定义的ISink接口输出到指定的存储window
在窗口内进行相关的统计分析,一般会与groupBy
连用,window()
用来定义窗口的大小,groupBy()
用来定义统计分析的主key,可以指定多个count
在窗口内计数min
获取窗口内统计值的最小值max
获取窗口内统计值得最大值avg
获取窗口内统计值的平均值sum
获取窗口内统计值的加和值reduce
在窗口内进行自定义的汇总运算
join
根据条件将俩个流进行内关联leftJoin
根据条件将俩个流的数据进行左关联dimJoin
根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库dimLeftJoin
根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库union
将俩个流进行合并split
将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算with
with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过with
算子传入,可以同时传入多个策略类型;
//指定checkpoint的存储策略
source
.fromRocketmq("TSG_META_INFO","")
.map(message->message+"--")
.toPrint(1)
.with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
.start();
Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
首先对应用的源码进行编译
mvn -Prelease-all -DskipTests clean install -U
然后直接通过java指令来运行
java -jar jarName mainClass
更多详细的案例可以看这里