Skip to content

Latest commit

 

History

History
80 lines (62 loc) · 3.96 KB

File metadata and controls

80 lines (62 loc) · 3.96 KB

8.3.2 使用 KafkaTemplate 发送消息

在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时:

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

注意到的第一件事是没有 convertAndSend() 方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send() 方法都在做 convertAndSend() 的工作。

再者 send() 和 sendDefault() 的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:

  • 发送消息的 topic(send() 方法必要的参数)
  • 写入 topic 的分区(可选)
  • 发送记录的键(可选)
  • 时间戳(可选;默认为 System.currentTimeMillis())
  • payload(必须)

topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send() 和 sendDefault() 的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。

对于 send() 方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。

使用 KafkaTemplate 及其 send() 方法,可以编写一个基于 kafka 的 OrderMessagingService 实现。下面的程序清单显示了这样一个实现。

{% code title="程序清单 8.8 使用 KafkaTemplate 发送订单" %}

package tacos.messaging;
​
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
​
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    @Autowired
    public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.send("tacocloud.orders.topic", order);
    }
}

{% endcode %}

在 OrderMessagingService 的这个实现中,sendOrder() 方法使用注入的 KafkaTemplate 的 send() 方法向名为tacocloud.orders.topic 的主题发送 Order。代码中除了使用 "Kafka" 这个名称外,这与为 JMS 和 Rabbit 编写的代码没有太大的不同。

如果设置了默认主题,可以稍微简化 sendOrder() 方法。首先,通过设置 spring.kafka.template.default-topic 属性,将默认主题设置为 tacocloud.orders.topic:

spring:
  kafka:
    template:
      default-topic: tacocloud.orders.topic

然后,在 sendOrder() 方法中,可以调用 sendDefault() 而不是 send(),并且不指定主题名称:

@Override
public void sendOrder(Order order) {
    kafkaTemplate.sendDefault(order);
}

现在已经编写了消息发送代码了,让我们将注意力转向编写从 Kafka 接收这些消息的代码。