|
| 1 | +三、Kafka API操作实践 |
| 2 | +--- |
| 3 | +### 1、Producer API |
| 4 | +1)消息发送流程 |
| 5 | +  Kafka的Producer发送消息采用的是**异步发送**的方式。在消息发送的过程中,涉及到了**两个线程——main线程和Sender线程**,以及**一个线程共享变量——RecordAccumulator**。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。 |
| 6 | +<p align="center"> |
| 7 | +<img src="https://github.com/Dr11ft/BigDataGuide/blob/master/Pics/Kafka%E6%96%87%E6%A1%A3Pics/Kafka%E6%A1%88%E4%BE%8B/Producer%20API.png"/> |
| 8 | +<p align="center"> |
| 9 | +</p> |
| 10 | +</p> |
| 11 | + |
| 12 | +**`batch.size`**:只有数据积累到batch.size之后,sender才会发送数据。 |
| 13 | +**`linger.ms`**:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。 |
| 14 | + |
| 15 | +2)**异步发送API** |
| 16 | +(1)导入依赖 |
| 17 | +```xml |
| 18 | +<dependency> |
| 19 | + <groupId>org.apache.kafka</groupId> |
| 20 | + <artifactId>kafka-clients</artifactId> |
| 21 | + <version>0.11.0.0</version> |
| 22 | +</dependency> |
| 23 | +``` |
| 24 | + |
| 25 | +(2)编写代码 |
| 26 | +需要用到的类: |
| 27 | +**`KafkaProducer`**:需要创建一个生产者对象,用来发送数据 |
| 28 | +**`ProducerConfig`**:获取所需的一系列配置参数 |
| 29 | +**`ProducerRecord`**:每条数据都要封装成一个ProducerRecord对象 |
| 30 | + |
| 31 | +**1.不带回调函数的API** |
| 32 | +```java |
| 33 | +import org.apache.kafka.clients.producer.*; |
| 34 | + |
| 35 | +import java.util.Properties; |
| 36 | +import java.util.concurrent.ExecutionException; |
| 37 | + |
| 38 | +public class CustomProducer { |
| 39 | + |
| 40 | + public static void main(String[] args) throws ExecutionException, InterruptedException { |
| 41 | + Properties props = new Properties(); |
| 42 | + props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list |
| 43 | + props.put("acks", "all"); |
| 44 | + props.put("retries", 1);//重试次数 |
| 45 | + props.put("batch.size", 16384);//批次大小 |
| 46 | + props.put("linger.ms", 1);//等待时间 |
| 47 | + props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 |
| 48 | + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 49 | + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 50 | + |
| 51 | + Producer<String, String> producer = new KafkaProducer<>(props); |
| 52 | + for (int i = 0; i < 100; i++) { |
| 53 | + producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))); |
| 54 | + } |
| 55 | + producer.close(); |
| 56 | + } |
| 57 | +} |
| 58 | +``` |
| 59 | + |
| 60 | +**2.带回调函数的API** |
| 61 | +  回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。 |
| 62 | +  **注意**:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 |
| 63 | +```java |
| 64 | +import org.apache.kafka.clients.producer.*; |
| 65 | + |
| 66 | +import java.util.Properties; |
| 67 | +import java.util.concurrent.ExecutionException; |
| 68 | + |
| 69 | +public class CustomProducer { |
| 70 | + |
| 71 | + public static void main(String[] args) throws ExecutionException, InterruptedException { |
| 72 | + Properties props = new Properties(); |
| 73 | + props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list |
| 74 | + props.put("acks", "all"); |
| 75 | + props.put("retries", 1);//重试次数 |
| 76 | + props.put("batch.size", 16384);//批次大小 |
| 77 | + props.put("linger.ms", 1);//等待时间 |
| 78 | + props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 |
| 79 | + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 80 | + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 81 | + |
| 82 | + Producer<String, String> producer = new KafkaProducer<>(props); |
| 83 | + for (int i = 0; i < 100; i++) { |
| 84 | + producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() { |
| 85 | + |
| 86 | + //回调函数,该方法会在Producer收到ack时调用,为异步调用 |
| 87 | + @Override |
| 88 | + public void onCompletion(RecordMetadata metadata, Exception exception) { |
| 89 | + if (exception == null) { |
| 90 | + System.out.println("success->" + metadata.offset()); |
| 91 | + } else { |
| 92 | + exception.printStackTrace(); |
| 93 | + } |
| 94 | + } |
| 95 | + }); |
| 96 | + } |
| 97 | + producer.close(); |
| 98 | + } |
| 99 | +} |
| 100 | +``` |
| 101 | + |
| 102 | +3)**同步发送API** |
| 103 | +  同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。 |
| 104 | +  由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。 |
| 105 | +```java |
| 106 | +import org.apache.kafka.clients.producer.KafkaProducer; |
| 107 | +import org.apache.kafka.clients.producer.Producer; |
| 108 | +import org.apache.kafka.clients.producer.ProducerRecord; |
| 109 | + |
| 110 | +import java.util.Properties; |
| 111 | +import java.util.concurrent.ExecutionException; |
| 112 | + |
| 113 | +public class CustomProducer { |
| 114 | + |
| 115 | + public static void main(String[] args) throws ExecutionException, InterruptedException { |
| 116 | + Properties props = new Properties(); |
| 117 | + props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list |
| 118 | + props.put("acks", "all"); |
| 119 | + props.put("retries", 1);//重试次数 |
| 120 | + props.put("batch.size", 16384);//批次大小 |
| 121 | + props.put("linger.ms", 1);//等待时间 |
| 122 | + props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 |
| 123 | + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 124 | + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 125 | + |
| 126 | + Producer<String, String> producer = new KafkaProducer<>(props); |
| 127 | + for (int i = 0; i < 100; i++) { |
| 128 | + producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get(); |
| 129 | + } |
| 130 | + producer.close(); |
| 131 | + } |
| 132 | +} |
| 133 | +``` |
| 134 | + |
| 135 | +### 2、Consumer API |
| 136 | +  Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。 |
| 137 | +  由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。 |
| 138 | +  所以offset的维护是Consumer消费数据是必须考虑的问题。 |
| 139 | + |
| 140 | +1)**手动提交offset** |
| 141 | +(1)导入依赖 |
| 142 | +```xml |
| 143 | +<dependency> |
| 144 | + <groupId>org.apache.kafka</groupId> |
| 145 | + <artifactId>kafka-clients</artifactId> |
| 146 | + <version>0.11.0.0</version> |
| 147 | +</dependency> |
| 148 | +``` |
| 149 | + |
| 150 | +(2)编写代码 |
| 151 | +需要用到的类: |
| 152 | +**`KafkaConsumer`**:需要创建一个消费者对象,用来消费数据 |
| 153 | +**`ConsumerConfig`**:获取所需的一系列配置参数 |
| 154 | +**`ConsuemrRecord`**:每条数据都要封装成一个ConsumerRecord对象 |
| 155 | + |
| 156 | +```java |
| 157 | +import org.apache.kafka.clients.consumer.ConsumerRecord; |
| 158 | +import org.apache.kafka.clients.consumer.ConsumerRecords; |
| 159 | +import org.apache.kafka.clients.consumer.KafkaConsumer; |
| 160 | + |
| 161 | +import java.util.Arrays; |
| 162 | +import java.util.Properties; |
| 163 | + |
| 164 | +public class CustomConsumer { |
| 165 | + |
| 166 | + public static void main(String[] args) { |
| 167 | + Properties props = new Properties(); |
| 168 | + props.put("bootstrap.servers", "hadoop102:9092"); |
| 169 | + props.put("group.id", "test");//消费者组,只要group.id相同,就属于同一个消费者组 |
| 170 | + props.put("enable.auto.commit", "false");//自动提交offset |
| 171 | + |
| 172 | + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
| 173 | + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
| 174 | + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); |
| 175 | + consumer.subscribe(Arrays.asList("first")); |
| 176 | + while (true) { |
| 177 | + ConsumerRecords<String, String> records = consumer.poll(100); |
| 178 | + for (ConsumerRecord<String, String> record : records) { |
| 179 | + System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); |
| 180 | + } |
| 181 | + consumer.commitSync(); |
| 182 | + } |
| 183 | + } |
| 184 | +} |
| 185 | +``` |
| 186 | + |
| 187 | +(3)代码分析: |
| 188 | +  手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync会失败重试,一直到提交成功(如果由于不可恢复原因导致,也会提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。 |
| 189 | + |
| 190 | +(4)数据重复消费问题 |
| 191 | +<p align="center"> |
| 192 | +<img src="https://github.com/Dr11ft/BigDataGuide/blob/master/Pics/Kafka%E6%96%87%E6%A1%A3Pics/Kafka%E6%A1%88%E4%BE%8B/%E6%95%B0%E6%8D%AE%E9%87%8D%E5%A4%8D%E6%B6%88%E8%B4%B9%E9%97%AE%E9%A2%98.jpg"/> |
| 193 | +<p align="center"> |
| 194 | +</p> |
| 195 | +</p> |
| 196 | + |
| 197 | +2)**自动提交offset** |
| 198 | +  为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。 |
| 199 | +  自动提交offset的相关参数: |
| 200 | +    `enable.auto.commit`:是否开启自动提交offset功能 |
| 201 | +    `auto.commit.interval.ms`:自动提交offset的时间间隔 |
| 202 | + |
| 203 | +**`代码:`** |
| 204 | +```java |
| 205 | +import org.apache.kafka.clients.consumer.ConsumerRecord; |
| 206 | +import org.apache.kafka.clients.consumer.ConsumerRecords; |
| 207 | +import org.apache.kafka.clients.consumer.KafkaConsumer; |
| 208 | + |
| 209 | +import java.util.Arrays; |
| 210 | +import java.util.Properties; |
| 211 | + |
| 212 | +public class CustomConsumer { |
| 213 | + |
| 214 | + public static void main(String[] args) { |
| 215 | + Properties props = new Properties(); |
| 216 | + props.put("bootstrap.servers", "hadoop102:9092"); |
| 217 | + props.put("group.id", "test"); |
| 218 | + props.put("enable.auto.commit", "true"); |
| 219 | + props.put("auto.commit.interval.ms", "1000"); |
| 220 | + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
| 221 | + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
| 222 | + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); |
| 223 | + consumer.subscribe(Arrays.asList("first")); |
| 224 | + while (true) { |
| 225 | + ConsumerRecords<String, String> records = consumer.poll(100); |
| 226 | + for (ConsumerRecord<String, String> record : records) |
| 227 | + System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); |
| 228 | + } |
| 229 | + } |
| 230 | +} |
| 231 | +``` |
| 232 | + |
| 233 | +### 3、自定义Interceptor |
| 234 | +1)拦截器原理 |
| 235 | +  Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。 |
| 236 | +  对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如**修改消息**等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是**org.apache.kafka.clients.producer.ProducerInterceptor**,其定义的方法包括: |
| 237 | +  (1)configure(configs)  获取配置信息和初始化数据时调用。 |
| 238 | +  (2)onSend(ProducerRecord): |
| 239 | +  该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。**用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区**,否则会影响目标分区的计算。 |
| 240 | +  (3)onAcknowledgement(RecordMetadata, Exception): |
| 241 | +  **该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用**。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。 |
| 242 | +  (4)close: |
| 243 | +  **关闭interceptor,主要用于执行一些资源清理工作** |
| 244 | +  如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外**倘若指定了多个interceptor,则producer将按照指定顺序调用它们**,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。 |
| 245 | + |
| 246 | +2)**拦截器案例** |
| 247 | +(1)需求: |
| 248 | +  实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。 |
| 249 | +<p align="center"> |
| 250 | +<img src="https://github.com/Dr11ft/BigDataGuide/blob/master/Pics/Kafka%E6%96%87%E6%A1%A3Pics/Kafka%E6%A1%88%E4%BE%8B/Kafka%E6%8B%A6%E6%88%AA%E5%99%A8.jpg"/> |
| 251 | +<p align="center"> |
| 252 | +</p> |
| 253 | +</p> |
| 254 | + |
| 255 | +(2)案例实践 |
| 256 | +1.增加时间戳拦截器 |
| 257 | +```java |
| 258 | +import java.util.Map; |
| 259 | +import org.apache.kafka.clients.producer.ProducerInterceptor; |
| 260 | +import org.apache.kafka.clients.producer.ProducerRecord; |
| 261 | +import org.apache.kafka.clients.producer.RecordMetadata; |
| 262 | + |
| 263 | +public class TimeInterceptor implements ProducerInterceptor<String, String> { |
| 264 | + |
| 265 | + @Override |
| 266 | + public void configure(Map<String, ?> configs) { |
| 267 | + |
| 268 | + } |
| 269 | + |
| 270 | + @Override |
| 271 | + public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { |
| 272 | + // 创建一个新的record,把时间戳写入消息体的最前部 |
| 273 | + return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), |
| 274 | + System.currentTimeMillis() + "," + record.value().toString()); |
| 275 | + } |
| 276 | + |
| 277 | + @Override |
| 278 | + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { |
| 279 | + |
| 280 | + } |
| 281 | + |
| 282 | + @Override |
| 283 | + public void close() { |
| 284 | + |
| 285 | + } |
| 286 | +} |
| 287 | +``` |
| 288 | + |
| 289 | +2.统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器 |
| 290 | +```java |
| 291 | +import java.util.Map; |
| 292 | +import org.apache.kafka.clients.producer.ProducerInterceptor; |
| 293 | +import org.apache.kafka.clients.producer.ProducerRecord; |
| 294 | +import org.apache.kafka.clients.producer.RecordMetadata; |
| 295 | + |
| 296 | +public class CounterInterceptor implements ProducerInterceptor<String, String>{ |
| 297 | + private int errorCounter = 0; |
| 298 | + private int successCounter = 0; |
| 299 | + |
| 300 | + @Override |
| 301 | + public void configure(Map<String, ?> configs) { |
| 302 | + |
| 303 | + } |
| 304 | + |
| 305 | + @Override |
| 306 | + public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { |
| 307 | + return record; |
| 308 | + } |
| 309 | + |
| 310 | + @Override |
| 311 | + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { |
| 312 | + // 统计成功和失败的次数 |
| 313 | + if (exception == null) { |
| 314 | + successCounter++; |
| 315 | + } else { |
| 316 | + errorCounter++; |
| 317 | + } |
| 318 | + } |
| 319 | + |
| 320 | + @Override |
| 321 | + public void close() { |
| 322 | + // 保存结果 |
| 323 | + System.out.println("Successful sent: " + successCounter); |
| 324 | + System.out.println("Failed sent: " + errorCounter); |
| 325 | + } |
| 326 | +} |
| 327 | +``` |
| 328 | + |
| 329 | +3.producer主程序 |
| 330 | +```java |
| 331 | +import java.util.ArrayList; |
| 332 | +import java.util.List; |
| 333 | +import java.util.Properties; |
| 334 | +import org.apache.kafka.clients.producer.KafkaProducer; |
| 335 | +import org.apache.kafka.clients.producer.Producer; |
| 336 | +import org.apache.kafka.clients.producer.ProducerConfig; |
| 337 | +import org.apache.kafka.clients.producer.ProducerRecord; |
| 338 | + |
| 339 | +public class InterceptorProducer { |
| 340 | + |
| 341 | + public static void main(String[] args) throws Exception { |
| 342 | + // 1 设置配置信息 |
| 343 | + Properties props = new Properties(); |
| 344 | + props.put("bootstrap.servers", "hadoop102:9092"); |
| 345 | + props.put("acks", "all"); |
| 346 | + props.put("retries", 0); |
| 347 | + props.put("batch.size", 16384); |
| 348 | + props.put("linger.ms", 1); |
| 349 | + props.put("buffer.memory", 33554432); |
| 350 | + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 351 | + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| 352 | + |
| 353 | + // 2 构建拦截链 |
| 354 | + List<String> interceptors = new ArrayList<>(); |
| 355 | + interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); |
| 356 | + props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); |
| 357 | + |
| 358 | + String topic = "first"; |
| 359 | + Producer<String, String> producer = new KafkaProducer<>(props); |
| 360 | + |
| 361 | + // 3 发送消息 |
| 362 | + for (int i = 0; i < 10; i++) { |
| 363 | + |
| 364 | + ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i); |
| 365 | + producer.send(record); |
| 366 | + } |
| 367 | + |
| 368 | + // 4 一定要关闭producer,这样才会调用interceptor的close方法 |
| 369 | + producer.close(); |
| 370 | + } |
| 371 | +} |
| 372 | +``` |
| 373 | + |
| 374 | +(3)测试 |
| 375 | +  在kafka上启动消费者,然后运行客户端java程序。 |
| 376 | +```xml |
| 377 | +bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first |
| 378 | +``` |
| 379 | + |
| 380 | + |
| 381 | + |
| 382 | + |
| 383 | + |
| 384 | + |
| 385 | + |
| 386 | + |
| 387 | + |
| 388 | + |
| 389 | + |
| 390 | + |
| 391 | + |
| 392 | + |
| 393 | + |
| 394 | + |
| 395 | + |
| 396 | + |
0 commit comments