8.2 使用RabbitMQ和AMQP

8.2 使用RabbitMQ和AMQP

RabbitMQ可以说是AMQP最杰出的实现,它提供了比JMS更高级的消息路由策略。JMS消息使用目的地名称来寻址,接收者要从这里检索消息,而AMQP消息使用Exchange和routing key来寻址,这样消息就与接收者要监听的队列解耦了。Exchange和队列的关系如图8.1所示。

image-20211015123622109

图8.1 发送到RabbitMQ Exchange的消息会基于routing key和binding被路由到一个或多个队列上

当消息抵达RabbitMQ代理的时候,它会进入为其设置的Exchange上。Exchange负责将它路由到一个或多个队列中,这个过程会根据Exchange的类型、Exchange和队列之间的binding以及消息的routing key进行路由。

这方面有多个不同类型的Exchange,包括以下内容。

  • Default:这是代理创建的特殊Exchange。它会将消息路由至名字与消息routing key相同的队列。所有的队列都会自动绑定至Default Exchange。
  • Direct:如果消息的routing key与队列的binding key相同,那么消息将会路由到该队列上。
  • Topic:如果消息的routing key与队列binding key(可能会包含通配符)匹配,那么消息将会路由到一个或多个这样的队列上。
  • Fanout:不管routing key和binding key是什么,消息都将会路由到所有绑定队列上。
  • Headers:与Topic Exchange类似,只不过要基于消息的头信息进行路由,而不是routing key。
  • Dead letter:捕获所有无法投递(也就是它们无法匹配所有已定义的Exchange和队列的binding关系)的消息。

最简单的Exchange形式是Default和Fanout,因为它们大致对应了JMS中的队列和主题,但是其他的Exchange允许我们定义更加灵活的路由模式。

这里最重要的是要明白消息会通过routing key发送至Exchange,而消息要在队列中被消费。它们如何从Exchange路由至队列取决于binding的定义以及哪种方式最适合我们的使用场景。

至于使用哪种Exchange类型以及如何定义从Exchange到队列的binding,这本身与如何在Spring应用中发送和接收消息关系不大。因此,我们更加关心如何编写使用Rabbit发送和接收消息的代码。

注意:关于如何绑定队列到Exchange的更详细讨论,请参考Alvaro Videla和Jason J.W. Williams编写的RabbitMQ in Action (Manning,2012)。

8.2.1 添加RabbitMQ到Spring中

在使用Spring发送和接收RabbitMQ消息之前,我们需要将Spring Boot的AMQPstarter依赖添加到构建文件中,替换上文中Artemis或ActiveMQ starter的位置:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加AMQP starter到构建文件中之后,将会触发自动配置功能,这样会为我们创建一个AMQP连接工厂和RabbitTemplate bean,以及其他的一些支撑组件。我们要使用Spring发送和接收RabbitMQ代理的消息,只需要添加这项依赖就可以了。但是,这里还有一些我们需要掌握的有用属性,如表8.4所示。

表8.4 配置RabbitMQ代理位置和凭证的属性

epub_29101559_49

对于开发来说,我们可能会使用不需要认证的RabbitMQ代理,它运行在本地机器上并监听5672端口。在开发阶段,这些属性可能没有太大的用处,但是当应用程序投入生产环境时,它们无疑是非常有用的。

假设我们要将应用投入生产环境,RabbitMQ代理位于名为rabbit.tacocloud.com服务器上,监听5673端口并且需要认证。在这种情况下,当prod profile处于激活状态时,application.yml文件中的如下配置将会设置这些属性:

1
2
3
4
5
6
7
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: l3tm31n

在我们的应用中,RabbitMQ已经配置好了,接下来就可以使用RabbitTemplate发送消息了。

8.2.2 通过RabbitTemplate发送消息

Spring对RabbitMQ消息支持的核心是RabbitTemplate。RabbitTemplate与JmsTemplate类似,提供了一组相似的方法。但是,我们将会看到,这里有一些细微的差异,这是与RabbitMQ独特的运行方式有关的。

在使用RabbitTemplate发送消息方面,我们可以使用与JmsTemplate中同名的send()和convertAndSend()方法。但是,与JmsTemplate的方法只是将消息路由至队列或主题不同,RabbitTemplate会按照Exchanges和routing key来发送消息。下面列出关于使用RabbitTemplate发送消息比较重要的一些方法^1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 发送原始的消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message)
throws AmqpException;
// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message) throws AmqpException;
// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message, MessagePostProcessor mPP)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;

我们可以看到,这些方法与JmsTemplate中对应的方法遵循了相同的模式。前3个send()方法都是发送原始的Message对象。接下来的3个convertAndSend()方法会接受一个对象,这个对象会在发送之前在幕后转换成Message。最后的3个convertAndSend()方法与前面的3个方法类似,但是它们还会接受一个MessagePostProcessor对象,这个对象能够在Message发送至代理之前对其进行操作。

这些方法与JmsTemplate对应方法的不同之处在于,它们会接受String类型的值以指定Exchange和routing key,而不像JmsTemplate那样接受目的地名称(或Destination)。没有接受Exchange参数的方法会将消息发送至DefaultExchange。与之类似,没有指定routing key的方法会把消息路由至默认的routing key。

接下来,我们看一下如何使用RabbitTemplate发送taco订单。有一种方式是使用send()方法,如程序清单8.5所示。但是,在调用send()之前,我们需要将Order对象转换为Message。RabbitTemplate能够通过getMessageConverter()方法获取消息转换器,否则,这项工作会非常乏味。

程序清单8.5 使用RabbitTemplate.send()发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package tacos.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class RabbitOrderMessagingService
implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}

有了MessageConverter之后,将Order转换成Message就是非常简单的任务了。我们必须通过MessageProperties来提供消息属性,但是如果我们不需要设置任何这样的属性,使用默认的MessageProperties实例就可以了。随后,剩下的就是调用send()了,并将Exchange和routing key(这两者都是可选的)连同消息一起传递过去。在本例中,我们只指定了routing key(tacocloud.order)和消息本身,所以会使用默认的Exchange。

这里提到了默认的Exchange,它的名字是“”(空的String),对应RabbitMQ代理自动生成的Default Exchange。与之相似,默认的routing key也是“”(它的路由将会取决于Exchange以及相应的binding)。我们可以通过设置spring.rabbitmq.template.exchange和spring.rabbitmq.template.routing-key属性重写这些默认值:

1
2
3
4
5
spring:
rabbitmq:
template:
exchange: tacocloud.orders
routing-key: kitchens.central

在本例中,所有未指明Exchange的消息都将会自动发送至名为tacocloud.orders的Exchange。如果在调用send()或convertAndSend()的时候也没有指定routingkey,那么消息将会使用值为kitchens.central的routing key。

通过消息转换器创建Message对象是非常简单的,但是使用convertAndSend()让RabbitTemplate处理所有的转换操作则会更加简单:

1
2
3
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order", order);
}

配置消息转换器

默认情况下,消息转换是通过SimpleMessageConverter来实现的,它能够将简单类型(如String)和Serializable对象转换成Message对象。但是,Spring为RabbitTemplate提供了多个消息转换器,包括下面内容。

  • Jackson2JsonMessageConverter:使用Jackson 2 JSON实现对象和JSON的相互转换。
  • MarshallingMessageConverter:使用Spring的Marshaller和Unmarshaller进行转换。
  • SerializerMessageConverter:使用Spring的Serializer和Deserializer转换String和任意种类的原生对象。
  • SimpleMessageConverter:转换String、字节数组和Serializable类型。
  • ContentTypeDelegatingMessageConverter:基于contentType头信息,将转换功能委托给另外一个MessageConverter。
  • MessagingMessageConverter:将消息转换功能委托给另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter。

如果需要变更消息转换器,就要配置一个类型为MessageConverter的bean。例如,对于基于JSON的转换,我们可以按照如下的方式来配置Jackson2JsonMessageConverter:

1
2
3
4
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

Spring Boot的自动配置功能会发现这个bean,并将它注入RabbitTemplate中,替换默认的消息转换器。

设置消息属性

与在JMS中一样,我们可能需要在发送的消息中添加一些头信息。例如,假设我们需要为所有通过Taco Cloud Web站点提交的订单添加一个X_ORDER_SOURCE信息。在自行创建Message的时候,我们可以通过MessageProperties实例设置头信息,随后将这个对象传递给消息转换器。回到程序清单8.5的sendOrder()方法,我们需要做的就是添加一行设置头信息的代码:

1
2
3
4
5
6
7
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}

但是,在使用convertAndSend()的时候,我们无法快速访问MessageProperties对象。不过,此时MessagePostProcessor可以帮助我们:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}

在这里,我们为convertAndSend()提供了MessagePostProcessor接口的匿名内部类实现。在postProcessMessage()中,我们从Message中拉取MessageProperties对象,然后通过setHeader()方法设置X_ORDER_SOURCE头信息。

现在,我们已经看到了如何通过RabbitTemplate发送消息,接下来我们转换视角看一下如何接收来自RabbitMQ队列的消息。

8.2.3 接收来自RabbitMQ的消息

我们看到使用RabbitTemplate发送消息与使用JmsTemplate发送消息并没有太大差别。实际上,接收来自RabbitMQ队列的消息也与JMS没有太大差别。与JMS类似,我们有两个可选方案。

  • 使用RabbitTemplate从队列拉取消息。
  • 将消息推送至带有@RabbitListener注解的方法。

我们首先看一下基于拉取的RabbitTemplate.receive()方法。

RabbitTemplate提供了多个从队列拉取消息的方法。其中,最有用的方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 接收消息
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// 接收由消息转换而成的对象
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws
AmqpException;
// 接收由消息转换而成的类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws
AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T>
type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
ParameterizedTypeReference<T> type)
throws AmqpException;

这些方法对应于前文所述的send()和convertAndSend()方法。send()用于发送原始的Message对象,而receive()则会接收来自队列的原始Message对象。与之类似,receiveAndConvert()接收消息并且在返回之前使用一个消息转换器将它们转换为领域对象。

但是,在方法签名上有一些明显的不同。首先,这些方法都不会接收Exchange和routing key作为参数。这是因为Exchange和routing key是用来将消息路由至队列的,在消息位于队列中之后,它们的目的地是将它们从队列中拉取下来的消费者。消费消息的应用本身并不需要关心Exchange和routing key。消费消息的应用只需要知道队列信息就可以了。

你可能会注意到,很多方法都接收一个long类型的参数,用来指定接收消息的超时时间。默认情况下,接收消息的超时时间是0毫秒。也就是说,调用receive()会立即返回,如果没有可用消息,那么返回值是null。这是与JmsTemplate的receive()的一个显著差异。通过传入一个超时时间的值,我们就可以让receive()和receiveAndConvert()阻塞,直到消息抵达或者超时时间过期。但是,即便我们设置了非零的超时时间,在代码中依然要处理null返回值的场景。

接下来,我们看一下如何实际使用它们。程序清单8.6展现了一个新的基于Rabbit的OrderReceiver实现,它使用RabbitTemplate来接收订单。

程序清单8.6 通过RabbitTemplate从RabbitMQ中拉取订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package tacos.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.orders");
return message != null
? (Order) converter.fromMessage(message)
: null;
}
}

所有的操作都发生在receiveOrder()方法中。它调用了注入的RabbitTemplate对象的receive()方法,从名为tacocloud.orders的队列中拉取一个订单。它并没有提供超时值,所以我们只能假定这个调用会马上返回,要么得到Message对象,要么返回null。如果返回Message对象,就使用RabbitTemplate中的MessageConverter将Message转换成一个Order对象。如果receive()方法返回null,我们就将null作为返回值。

根据使用场景,我们也许能够容忍一定的延迟。例如,在Taco Cloud厨房悬挂的显示器中,如果没有订单,我们可以稍等一会儿。假设在放弃之前,我们决定等待30秒钟,那么receiveOrder()方法可以修改为传递30 000毫秒的延迟给receive()方法:

1
2
3
4
5
6
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.order.queue", 30000);
return message != null
? (Order) converter.fromMessage(message)
: null;
}

如果你像我一样,觉得使用这样一个硬编码的数字会让人觉得不舒服,那么你可能会想,创建一个带有@ConfigurationProperties注解的类并使用Spring Boot的配置属性来设置超时时间,可能会是更好的方案。在一点上,我的想法和你一样,只不过Spring Boot已经为我们提供了一个这样的配置属性。如果你想要通过配置来设置超时时间,只需要在调用receive()的时候移除超时值,并将超时时间设置为spring.rabbitmq.template.receive-timeout属性即可:

1
2
3
4
spring:
rabbitmq:
template:
receive-timeout: 30000

回到receiveOrder()方法,我们必须要使用RabbitTemplate中的消息转换器才能将传入的Message对象转换成Order对象。但是,既然RabbitTemplate已经携带了消息转换器,它为什么不能自动为我们进行转换呢?这就是receiveAndConvert()方法所做的事情。借助receiveAndConvert(),我们可以将receiveOrder()重写为:

1
2
3
public Order receiveOrder() {
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
}

看起来简单了许多,对吧。唯一让我觉得麻烦的就是从Object到Order的类型转换。不过,这种转换还有另外一种实现方式。我们可以传递一个ParameterizedTypeReference引用给receiveAndConvert(),这样我们就可以直接得到Order对象了:

1
2
3
4
public Order receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}

关于这种方式是否真的比类型转换更好,依然还有争论,但是它确实能够更加确保类型安全。唯一需要注意的是,要在receiveAndConvert()中使用ParameterizedTypeReference,消息转换器必须要实现SmartMessageConverter,目前Jackson2JsonMessageConverter是唯一一个可选的内置实现。

RabbitTemplate提供的拉取模式适用于很多使用场景,但是有时候监听消息并在消息抵达的时候对其进行处理会更好一些。接下来,我们看一下如何编写消息驱动的bean,让它对RabbitMQ消息做出回应。

使用监听器处理RabbitMQ的消息

Spring提供了RabbitListener实现消息驱动的RabbitMQ bean,对应于JmsListener。为了声明当消息抵达RabbitMQ队列时某个方法应该被调用,我们可以为bean的方法添加@RabbitListener注解。

例如,程序清单8.7展现了OrderReceiver的RabbitMQ实现,它通过注解声明要监听订单消息,而不是使用RabbitTemplate进行轮询。

程序清单8.7 将方法声明为RabbitMQ的消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package tacos.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}

你肯定会发现它与程序清单8.4的代码非常相似。确实,唯一的变更就是监听器的注解,从@JmsListener换成了@RabbitListener。尽管@RabbitListener注解非常棒,但是几乎重复的代码让我无法找出@RabbitListener具有什么在@JmsListener中没有提到的功能。当消息从各自的代理推送过来的时候,这两个注解都非常适合用来编写对应的代码,其中@JmsListener对应的是JMS代理,而@RabbitListener对应的是RabbitMQ代理。

在前面的段落中,你可能会对@RabbitListener感到索然无趣,但是这并非我的本意。实际上,@RabbitListener和@JmsListener的运行方式非常相似是一件令人兴奋的事情。这意味着当我们使用RabbitMQ替代Artemis或ActiveMQ的时候,不需要学习全新的编程模型。同样令人兴奋的是,RabbitTemplate和JmsTemplate之间也具有这样的相似性。

让我们暂且保持一下这种兴奋,在本章结束之前,我们看一下Spring支持的另外一个消息方案:Apache Kafka。