File tree 1 file changed +45
-0
lines changed
1 file changed +45
-0
lines changed Original file line number Diff line number Diff line change
1
+ 四、Flume对接Kafka
2
+ ---
3
+ &emsp ; 一般实际应用中,会通过Flume+Kafka来对产生的数据进行采集,也可以在Kafka中对数据进行一个初步的处理,用于后续Spark或MapReduce的使用,
4
+ 这个案例主要是实现Flume和Kafka的对接,
5
+ #### 1、配置flume(flume-kafka.conf)
6
+ ``` xml
7
+ # define
8
+ a1.sources = r1
9
+ a1.sinks = k1
10
+ a1.channels = c1
11
+
12
+ # source
13
+ a1.sources.r1.type = exec
14
+ a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
15
+ a1.sources.r1.shell = /bin/bash -c
16
+
17
+ # sink
18
+ a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
19
+ a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
20
+ a1.sinks.k1.kafka.topic = first
21
+ a1.sinks.k1.kafka.flumeBatchSize = 20
22
+ a1.sinks.k1.kafka.producer.acks = 1
23
+ a1.sinks.k1.kafka.producer.linger.ms = 1
24
+
25
+ # channel
26
+ a1.channels.c1.type = memory
27
+ a1.channels.c1.capacity = 1000
28
+ a1.channels.c1.transactionCapacity = 100
29
+
30
+ # bind
31
+ a1.sources.r1.channels = c1
32
+ a1.sinks.k1.channel = c1
33
+ ```
34
+
35
+ #### 2、启动kafkaIDEA消费者(上一篇文章有)
36
+ #### 3、进入flume根目录下,启动flume
37
+ ``` xml
38
+ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
39
+ ```
40
+ #### 4、向/opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况
41
+ ``` xml
42
+ echo hello >> /opt/module/datas/flume.log
43
+ ```
44
+
45
+
You can’t perform that action at this time.
0 commit comments