8.3 使用Kafka的消息
8.3 使用Kafka的消息
Apache Kafka是我们在本章研究的最新的消息方案。乍看上去,Kafka是与ActiveMQ、Artemis或Rabbit类似的消息代理,其实Kafka有一些独特的技巧。
Kafka设计为集群运行,从而能够实现很强的可扩展性。通过将主题在集群的所有实例上进行分区(partition),它能够具有更强的弹性。RabbitMQ主要处理Exchange中的队列,而Kafka仅使用主题实现消息的发布/订阅。
Kafka主题会复制到集群的所有代理上。集群中的每个节点都会担任一个或多个主题的首领(leader),负责该主题的数据并将其复制到集群中的其他节点上。
更进一步来讲,每个主题可以划分为多个分区。在这种情况下,集群中的每个节点是某个主题一个或多个分区的首领,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。图8.2阐述了它是如何运行的。
关于Kafka的独特架构,我建议你阅读Dylan Scott编写的Kafka in Action(Manning,2017)。就我们来讲,我们将会关注如何通过Spring发送和接收Kafka的消息。
8.3.1 为Spring搭建支持Kafka消息的环境
为了搭建Kafka的消息环境,我们需要添加对应的依赖到构建文件中。但是,与JMS和RabbitMQ方案不同,并没有针对Kafka的Spring Boot starter。不过,不用担心,我们只需要添加一项依赖:
1 | <dependency> |
这项依赖会为我们的项目引入Kafka所需的所有内容。另外,它的出现会触发Spring Boot对Kafka的自动配置,除了其他功能之外,它会在Spring应用上下文中创建一个KafkaTemplate。我们所需要做的就是注入KafkaTemplate并使用它来发布和接收消息。
但是,在发送和接收消息之前,我们还需要注意使用Kafka时的一些属性。具体来讲,KafkaTemplate默认会使用localhost上监听9092端口的Kafka代理。在开发应用的时候,在本地启动Kafka代理没有问题,但是在投入生产的时候,我们需要配置不同的主机和端口。
spring.kafka.bootstrap-servers属性能够设置一个或多个Kafka服务器的地址,系统将会使用它来建立到Kafka集群的初始连接。例如,集群中的某个服务器运行在kafka.tacocloud.com上并监听9092端口,那么我们可以按照如下的方式在YAML中配置它的位置:
1 | spring: |
但是需要注意spring.kafka.bootstrap-servers是复数形式,它能接受一个列表。所以,我们可以提供集群中的多个Kafka服务器:
1 | spring: |
Kafka在项目中准备就绪之后,我们就可以发送和接收消息了。我们首先使用KafkaTemplate发送Order对象到Kafka中。
8.3.2 通过KafkaTemplate发送消息
在很多方面,KafkaTemplate与JMS和RabbitMQ对应的模板非常相似。但同时,它也有很大的差异。在发送消息的时候,这一点非常明显:
1 | ListenableFuture<SendResult<K, V>> send(String topic, V data); |
我们首先可能会发现,这里没有convertAndSend()方法了。这是因为,KafkaTemplate是通过泛型类型化的,在发送消息的时候,它能够直接处理领域类型。这样的话,所有的send()方法都完成了convertAndSend()的任务。
你可能也会发现,send()和sendDefault()的参数与JMS和Rabbit有很大的差异。在使用Kafka发送消息的时候,我们可以使用如下参数设置消息该如何进行发送:
- 消息要发送到的主题(send()方法的必选参数);
- 主题要写入的分区(可选);
- 记录上要发送的key(可选);
- 时间戳(可选,默认为System.currentTimeMillis());
- 载荷(必选)。
主题和载荷是其中最重要的两个参数。分区和key对于如何使用KafkaTemplate几乎没有影响,只是作为额外的信息提供给send()和sendDefault()。对于我们的场景来说,我们只关心将消息载荷发送到给定的主题,不用担心分区和key的问题。
对于send()方法来说,我们还可以选择发送一个ProducerRecord对象,它只是一个简单类型,将上述的参数放到了一个对象中。我们还可以发送Message对象,但是需要将领域对象转换成Message对象。相对创建和发送ProducerRecord和Message对象,使用其他的方法会更简单一些。
借助KafkaTemplate及其send()方法,我们可以编写一个基于Kafka实现的OrderMessagingService实现。程序清单8.8展现了该实现类。
程序清单8.8 使用KafkaTemplate发送订单
1 | package tacos.messaging; |
在这个OrderMessagingService的新实现中,sendOrder()用到了注入的KafkaTemplate对象的send()方法,将Order发送到名为tacocloud.orders.topic的主题中。除了代码中随处可见的“Kafka”之外,它其实与为JMS和Rabbit编写的代码并没有太大的差异。
如果你想要设置默认主题,那么可以稍微简化一下sendOrder()。首先,通过spring.kafka.template.default-topic属性,我们可以将默认主题设置为tacocloud.orders.topic:
1 | spring: |
然后,在sendOrder()方法中,我们就可以调用sendDefault()而不是send()了,这样可以不用指定主题的名称:
1 |
|
现在,我们已经编写完发送消息的代码了。接下来,我们转移一下注意力,编写从Kafka中接收消息的代码。
8.3.3 编写Kafka监听器
除了send()和sendDefault()特有的方法签名之外,KafkaTemplate与JmsTemplate和RabbitTemplate另一个不同之处在于它没有提供接收消息的方法。这意味着在Spring中想要消费来自Kafka主题的消息只有一种办法,就是编写消息监听器。
对于Kafka消息来说,消息监听器是通过带有@KafkaListener
注解的方法来实现的。@KafkaListener
大致对应于@JmsListener
和@RabbitListener
,并且使用方式也基本相同。如下的程序清单展示了为Kafka编写的基于监听器的订单接收器。
程序清单8.9 使用@KafkaListener
接收订单
1 | package tacos.kitchen.messaging.kafka.listener; |
handle()方法使用了@KafkaListener
注解,表明当有消息抵达名为tacocloud.orders.topic的主题时,该方法将会被调用。如程序清单8.9所示,我们只将Order(载荷)对象传递给了handle()。如果你想要获取消息中其他的元数据,我们也可以接受ConsumerRecord或Message对象。
例如,如下的handle()实现接受一个ConsumerRecord,这样我们就能在日志中将消息的分区和时间戳记录下来:
1 |
|
类似地,我们还可以用一个Message对象来替代ConsumerRecord,并且能够达到相同的目的:
1 |
|
值得一提的是,消息载荷也可以通过ConsumerRecord.value()或Message.getPayload()获取到。这意味着我们可以通过这些对象获取Order,而不必直接将其作为handle()的参数。