8.3 使用Kafka的消息

8.3 使用Kafka的消息

Apache Kafka是我们在本章研究的最新的消息方案。乍看上去,Kafka是与ActiveMQ、Artemis或Rabbit类似的消息代理,其实Kafka有一些独特的技巧。

Kafka设计为集群运行,从而能够实现很强的可扩展性。通过将主题在集群的所有实例上进行分区(partition),它能够具有更强的弹性。RabbitMQ主要处理Exchange中的队列,而Kafka仅使用主题实现消息的发布/订阅。

Kafka主题会复制到集群的所有代理上。集群中的每个节点都会担任一个或多个主题的首领(leader),负责该主题的数据并将其复制到集群中的其他节点上。

更进一步来讲,每个主题可以划分为多个分区。在这种情况下,集群中的每个节点是某个主题一个或多个分区的首领,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。图8.2阐述了它是如何运行的。

image-20211015203522617

图8.2 Kafka集群是由多个代理组成的,每个代理作为主题分区的首领

关于Kafka的独特架构,我建议你阅读Dylan Scott编写的Kafka in Action(Manning,2017)。就我们来讲,我们将会关注如何通过Spring发送和接收Kafka的消息。

8.3.1 为Spring搭建支持Kafka消息的环境

为了搭建Kafka的消息环境,我们需要添加对应的依赖到构建文件中。但是,与JMS和RabbitMQ方案不同,并没有针对Kafka的Spring Boot starter。不过,不用担心,我们只需要添加一项依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

这项依赖会为我们的项目引入Kafka所需的所有内容。另外,它的出现会触发Spring Boot对Kafka的自动配置,除了其他功能之外,它会在Spring应用上下文中创建一个KafkaTemplate。我们所需要做的就是注入KafkaTemplate并使用它来发布和接收消息。

但是,在发送和接收消息之前,我们还需要注意使用Kafka时的一些属性。具体来讲,KafkaTemplate默认会使用localhost上监听9092端口的Kafka代理。在开发应用的时候,在本地启动Kafka代理没有问题,但是在投入生产的时候,我们需要配置不同的主机和端口。

spring.kafka.bootstrap-servers属性能够设置一个或多个Kafka服务器的地址,系统将会使用它来建立到Kafka集群的初始连接。例如,集群中的某个服务器运行在kafka.tacocloud.com上并监听9092端口,那么我们可以按照如下的方式在YAML中配置它的位置:

1
2
3
4
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092

但是需要注意spring.kafka.bootstrap-servers是复数形式,它能接受一个列表。所以,我们可以提供集群中的多个Kafka服务器:

1
2
3
4
5
6
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094

Kafka在项目中准备就绪之后,我们就可以发送和接收消息了。我们首先使用KafkaTemplate发送Order对象到Kafka中。

8.3.2 通过KafkaTemplate发送消息

在很多方面,KafkaTemplate与JMS和RabbitMQ对应的模板非常相似。但同时,它也有很大的差异。在发送消息的时候,这一点非常明显:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
Long timestamp, K key, V data);

我们首先可能会发现,这里没有convertAndSend()方法了。这是因为,KafkaTemplate是通过泛型类型化的,在发送消息的时候,它能够直接处理领域类型。这样的话,所有的send()方法都完成了convertAndSend()的任务。

你可能也会发现,send()和sendDefault()的参数与JMS和Rabbit有很大的差异。在使用Kafka发送消息的时候,我们可以使用如下参数设置消息该如何进行发送:

  • 消息要发送到的主题(send()方法的必选参数);
  • 主题要写入的分区(可选);
  • 记录上要发送的key(可选);
  • 时间戳(可选,默认为System.currentTimeMillis());
  • 载荷(必选)。

主题和载荷是其中最重要的两个参数。分区和key对于如何使用KafkaTemplate几乎没有影响,只是作为额外的信息提供给send()和sendDefault()。对于我们的场景来说,我们只关心将消息载荷发送到给定的主题,不用担心分区和key的问题。

对于send()方法来说,我们还可以选择发送一个ProducerRecord对象,它只是一个简单类型,将上述的参数放到了一个对象中。我们还可以发送Message对象,但是需要将领域对象转换成Message对象。相对创建和发送ProducerRecord和Message对象,使用其他的方法会更简单一些。

借助KafkaTemplate及其send()方法,我们可以编写一个基于Kafka实现的OrderMessagingService实现。程序清单8.8展现了该实现类。

程序清单8.8 使用KafkaTemplate发送订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService
implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}

在这个OrderMessagingService的新实现中,sendOrder()用到了注入的KafkaTemplate对象的send()方法,将Order发送到名为tacocloud.orders.topic的主题中。除了代码中随处可见的“Kafka”之外,它其实与为JMS和Rabbit编写的代码并没有太大的差异。

如果你想要设置默认主题,那么可以稍微简化一下sendOrder()。首先,通过spring.kafka.template.default-topic属性,我们可以将默认主题设置为tacocloud.orders.topic:

1
2
3
4
spring:
kafka:
template:
default-topic: tacocloud.orders.topic

然后,在sendOrder()方法中,我们就可以调用sendDefault()而不是send()了,这样可以不用指定主题的名称:

1
2
3
4
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}

现在,我们已经编写完发送消息的代码了。接下来,我们转移一下注意力,编写从Kafka中接收消息的代码。

8.3.3 编写Kafka监听器

除了send()和sendDefault()特有的方法签名之外,KafkaTemplate与JmsTemplate和RabbitTemplate另一个不同之处在于它没有提供接收消息的方法。这意味着在Spring中想要消费来自Kafka主题的消息只有一种办法,就是编写消息监听器。

对于Kafka消息来说,消息监听器是通过带有@KafkaListener注解的方法来实现的。@KafkaListener大致对应于@JmsListener@RabbitListener,并且使用方式也基本相同。如下的程序清单展示了为Kafka编写的基于监听器的订单接收器。

程序清单8.9 使用@KafkaListener接收订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}

handle()方法使用了@KafkaListener注解,表明当有消息抵达名为tacocloud.orders.topic的主题时,该方法将会被调用。如程序清单8.9所示,我们只将Order(载荷)对象传递给了handle()。如果你想要获取消息中其他的元数据,我们也可以接受ConsumerRecord或Message对象。

例如,如下的handle()实现接受一个ConsumerRecord,这样我们就能在日志中将消息的分区和时间戳记录下来:

1
2
3
4
5
6
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}

类似地,我们还可以用一个Message对象来替代ConsumerRecord,并且能够达到相同的目的:

1
2
3
4
5
6
7
8
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}

值得一提的是,消息载荷也可以通过ConsumerRecord.value()或Message.getPayload()获取到。这意味着我们可以通过这些对象获取Order,而不必直接将其作为handle()的参数。