11.1 使用Spring WebFlux

传统的基于Servlet的Web框架,如Spring MVC,在本质上都是阻塞和多线程的,每个连接都会使用一个线程。在请求处理的时候,会在线程池中拉取一个工作者(worker)线程来对请求进行处理。同时,请求线程是阻塞的,直到工作者线程提示它已经完成为止。

这样带来的后果就是阻塞式Web框架在大量请求下无法有效地扩展。缓慢的工作者线程所带来的延迟会使情况变得更糟,因为它将花费更长的时间才能将工作者线程送回池中,准备处理另一个请求。在某些场景中,这种设计完全可以接受。事实上,在很大程度上这就是十多年来大多数Web应用程序的开发方式,但是时代在改变。

这些Web应用程序的客户端以前是偶尔浏览网站的人们,而现在这些人会频繁消费内容而且会使用与HTTP API协作的应用程序。如今,物联网(甚至不需要人类)产生了汽车、喷气式发动机和其他非传统的客户端,它们会持续地和Web API交换数据。随着消费Web应用的客户端越来越多,可扩展性比以往任何时候都更加重要。

异步的Web框架能够以更少的线程获得更高的可扩展性,通常它们只需要与CPU核心数量相同的线程。通过使用所谓的事件轮询(event looping)机制(如图11.1所示),这些框架能够用一个线程处理很多请求,这样每次连接的成本会更低。

image-20211016212143634

图11.1 异步Web框架借助事件轮询机制能够以更少的线程处理更多的请求

在事件轮询中,所有事情都是以事件的方式来进行处理的,包括请求以及密集型操作(如数据库和网络操作)的回调。当需要执行成本高昂的操作时,事件轮询会为该操作注册一个回调,这样操作可以并行执行,而事件轮询则会继续处理其他的事件。

当操作完成时,事件轮询机制会将其作为一个事件,这一点与请求是相同的。这样达到的效果就是,在面临大量负载的时候,异步Web框架能够以更少的线程实现更好的可扩展性,这样会减少线程管理的开销。

Spring 5引入了一个非阻塞、异步的Web框架,该框架在很大程度上是基于Reactor项目的,能够解决Web应用和API中对更好的可扩展性的需求。接下来我们看一下Spring WebFlux:面向Spring的反应式Web框架。

11.1.1 Spring WebFlux简介

当Spring团队思考如何向Web层添加反应式编程模型时,如果不在Spring MVC中做大量工作,显然很难实现这一点。这会在代码中产生分支以决定是否要以反应式的方式来处理请求。如果这样做,本质上就是将两个Web框架打包成一个,依靠if语句来区分反应式和非反应式。

与其将反应式编程模型硬塞进Spring MVC中,还不如创建一个单独的反应式Web框架,并尽可能多地借鉴Spring MVC。这样,Spring WebFlux就应运而生了。Spring 5定义的完整Web开发技术栈如图11.2所示。

image-20211016212213773

图11.2 Spring 5通过名为WebFlux的新Web框架来支持反应式Web应用

在图11.2的左侧,我们会看到Spring MVC技术栈,这是Spring框架2.5版本就引入的。Spring MVC(在第2章和第6章已经进行了讨论)建立在Java Servlet API之上,因此需要Servlet容器(比如Tomcat)才能执行。

与之不同,Spring WebFlux(在图11.2的右侧,和Spring MVC系出同门,并且很多核心组件都是公用的)并不会绑定Servlet API,所以它构建在Reactive HTTPAPI之上,这个API与Servlet API具有相同的功能,只不过是采用了反应式的方式。因为Spring WebFlux没有与Servlet API耦合,所以它的运行并不需要Servlet容器。它可以运行在任意非阻塞Web容器中,包括Netty、Undertow、Tomcat、Jetty或任意Servlet 3.1及以上的容器。

在图11.2中,最值得注意的是左上角,它代表了Spring MVC和Spring WebFlux公用的组件,主要用来定义控制器的注解。因为Spring MVC和Spring WebFlux会使用相同的注解,所以Spring WebFlux与Spring MVC在很多方面并没有区别。

右上角的方框表示另一种编程模型,它使用函数式编程范式来定义控制器,而不是使用注解。在11.2节中,我们将更多地讨论Spring的函数式Web编程模型。

Spring MVC和Spring WebFlux之间最显著的区别在于,我们要将哪个依赖项添加到构建文件中。在使用Spring WebFlux时,我们需要添加Spring Boot WebFluxstarter依赖项,而不是标准的Web starter(例如,spring-boot-starter-web)。在项目的pom.xml文件中,如下所示:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
注意:与Spring Boot的大多数starter依赖类似,这个starter也可以在Initializr中通过选中Reactive Web复选框添加到项目中。

使用WebFlux有一个很有意思的副作用,即WebFlux的默认嵌入式服务器是Netty而不是Tomcat。Netty是一个异步、事件驱动的服务器,非常适合SpringWebFlux这样的反应式Web框架。

除了使用不同的starter依赖之外,Spring WebFlux的控制器方法要接受和返回反应式类型,如Mono和Flux,而不是领域类型和集合。Spring WebFlux控制器也能处理RxJava类型,如Observable、Single和Completable。

反应式Spring MVC

尽管Spring WebFlux控制器通常会返回Mono和Flux,但是这并不意味着SpringMVC无法体验反应式类型的乐趣。如果你愿意,那么Spring MVC也可以返回Mono和Flux。

这里的区别在于,这些类型会如何被使用。Spring WebFlux是真正的反应式Web框架,允许在事件轮询中处理请求;而Spring MVC是基于Servlet的,依赖于多线程来处理多个请求。

接下来,我们让Spring WebFlux运行起来,借助Spring WebFlux重新编写TacoCloud的API控制器。

11.1.2 编写反应式控制器

你可能还记得在第6章中我们为Taco Cloud的REST API创建了一些控制器,这些控制器中包含请求处理方法,这些方法会以领域类型(如Order和Taco)或领域类型集合的方式处理输入和输出。作为提醒,我们看一下在第6章所编写的DesignTacoController片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RequestMapping(path="/design",
produces="application/json")
@CrossOrigin(origins="*")
public class DesignTacoController {
...
@GetMapping("/recent")
public Iterable<Taco> recentTacos() {
PageRequest page = PageRequest.of(
0, 12, Sort.by("createdAt").descending());
return tacoRepo.findAll(page).getContent();
}
...
}

按照以上的编写形式,recentTacos()控制器会处理对“/design/recent”的HTTPGET请求,返回最近创建的taco列表。具体来讲,它会返回Taco类型的Iterable。这主要是因为repository的findAll()方法返回的就是该类型,或者更准确地说,这个结果来自findAll()方法所返回的Page对象的getContent()方法。

这样能够很好地运行,但是Iterable并不是反应式类型。我们不能对它使用任何反应式操作,也不能让框架将它视为反应式类型,从而将工作切分到多个线程中。我们希望recentTacos()方法能够返回Flux<Taco>

这里有一个简单但效果有限的方案:重写recentTacos(),将Iterable转换为Flux。而且,在重写的时候,我们可以去掉分页代码,将其替换为调用Flux的take():

1
2
3
4
@GetMapping("/recent")
public Flux<Taco> recentTacos() {
return Flux.fromIterable(tacoRepo.findAll()).take(12);
}

借助Flux.fromIterable(),我们可以将Iterable<Taco>转换为Flux<Taco>。既然我们可以使用Flux了,那么就能使用take操作将Flux返回的值限制为最多12个Taco对象。不仅代码更加简洁,而且我们能够处理反应式的Flux,而不是简单的Iterable。

到目前为止,我们编写反应式代码一切都很顺利。如果repository一开始就给我们一个Flux那就更好了,就没有必要进行转换了。如果能够实现这一点,那么recentTacos()将会写成如下形式:

1
2
3
4
@GetMapping("/recent")
public Flux<Taco> recentTacos() {
return tacoRepo.findAll().take(12);
}

这样就更好了!在理想情况下,反应式控制器将会位于反应式端到端栈的顶部,这个栈包括了控制器、repository、数据库以及在它们之间可能还会包含的服务。这样的端到端反应式栈如图11.3所示。

image-20211016212402176

图11.3 它应该成为完整的端到端反应式栈的一部分(为了最大化反应式Web框架的收益)

这样的端到端技术栈要求repository返回Flux,而不是Iterable。在第12章中,我们将会详细研究如何编写反应式repository,但是反应式TacoRepository大致会如下所示:

1
2
3
public interface TacoRepository
extends ReactiveCrudRepository<Taco, Long> {
}

此时,最需要注意的事情在于除了使用Flux来替换Iterable以及如何获取Flux之外,定义反应式WebFlux控制器的编程模型与非反应式Spring MVC控制器并没有什么差异。它们都使用了@RestController注解并且都在类级别使用了@RequestMapping。它们都有在方法级别使用@GetMapping注解的请求处理函数。真正重要的是处理器方法返回了什么类型。

另外值得注意的是,尽管我们从repository得到了Flux<Taco>,但是我们直接将它返回了,并没有调用subscribe()。框架将会为我们调用subscribe()。这意味着当处理“/design/recent”请求的时候,recentTacos()方法会被调用,在数据真正从数据库取出之前它就能立即返回。

返回单个值

作为另外一个样例,我们思考一下在第6章中编写的DesignTacoController的tacoById()方法:

1
2
3
4
5
6
7
8
@GetMapping("/{id}")
public Taco tacoById(@PathVariable("id") Long id) {
Optional<Taco> optTaco = tacoRepo.findById(id);
if (optTaco.isPresent()) {
return optTaco.get();
}
return null;
}

在这里,该方法处理对“/design/{id}”的GET请求并返回单个Taco对象。因为repository的findById()返回的是Optional,所以我们必须编写一些烦琐的代码处理它。如果findById()返回的是Mono<Taco>,而不是Optional<Taco>,那么我们可以按照如下的方式重写控制器的tacoById():

1
2
3
4
@GetMapping("/{id}")
public Mono<Taco> tacoById(@PathVariable("id") Long id) {
return tacoRepo.findById(id);
}

这样看上去简单多了。更重要的是,通过返回Mono<Taco>来替代Taco,我们能够让Spring WebFlux以反应式的方式处理响应。这样做的结果就是在面临高负载的时候我们的API能够更好地进行扩展。

使用RxJava类型

值得一提的是,在使用Spring WebFlux时,虽然Flux和Mono是自然而然的选择,但是我们也可以使用像Observable和Single这样的RxJava类型。例如,假设在DesignTacoController和后端repository之间有一个服务,处理的是RxJava类型,那么recentTacos()方法可以编写为:

1
2
3
4
@GetMapping("/recent")
public Observable<Taco> recentTacos() {
return tacoService.getRecentTacos();
}

类似的,tacoById()方法可以编写成处理RxJava Single类型,而不是Mono类型:

1
2
3
4
@GetMapping("/{id}")
public Single<Taco> tacoById(@PathVariable("id") Long id) {
return tacoService.lookupTaco(id);
}

除此之外,Spring WebFlux控制器方法还可以返回RxJava的Completable,后者等价于Reactor中的Mono<Void>。WebFlux也可以返回Flowable,替换Observable或Reactor的Flux。

实现输入的反应式

到目前为止,我们只关心了控制器方法返回什么样的反应式类型。但是,借助Spring WebFlux,我们还可以接受Mono或Flux作为处理器方法的输入。为了阐述这一点,请思考DesignTacoController中原始的postTaco()实现:

1
2
3
4
5
@PostMapping(consumes="application/json")
@ResponseStatus(HttpStatus.CREATED)
public Taco postTaco(@RequestBody Taco taco) {
return tacoRepo.save(taco);
}

按照原始的编写方式,postTaco()不仅会返回一个简单的Taco对象,还会接受一个Taco,这个对象绑定了请求体中的内容。这意味着在请求载荷完成解析并初始化为Taco对象之前,postTaco()方法是不会被调用的。这同时也意味着,在对repository的save()方法的阻塞调用返回之前,postTaco()是不能返回的。简而言之,这个请求阻塞了两次:在进入postTaco()的时候以及在postTaco()调用的过程中。通过为postTaco()添加一些反应式代码,我们能够将它变成完全非阻塞的请求处理方法:

1
2
3
4
5
@PostMapping(consumes="application/json")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Taco> postTaco(@RequestBody Mono<Taco> tacoMono) {
return tacoRepo.saveAll(tacoMono).next();
}

在这里,postTaco()接受一个Mono<Taco>并调用了repository的saveAll()方法。我们将会在第12章看到这个repository能够接受反应式流Publisher的任意实现,包括Mono或Flux。saveAll()方法返回了一个Flux<Taco>,但我们想要的是Mono。我们知道该Flux最多只能发布一个Taco,所以调用next()方法获取postTaco()方法要返回的Mono<Taco>

通过接受Mono<Taco>作为输入,方法会立即调用,不用等待从请求体中解析生成Taco。另外,repository也是反应式的,它接受一个Mono并立即返回Flux<Taco>,所以我们调用Flux的next()来获取最终的Mono<Taco>。方法在请求真正处理之前就能返回。

Spring WebFlux是一个非常棒的Spring MVC替代方案,提供了与Spring MVC相同的开发模型来编写反应式Web应用。其实Spring 5还有另外一项技巧,下面让我们看看如何使用Spring 5的新函数式编程风格创建反应式API。

11.2 定义函数式请求处理器

Spring MVC基于注解的编程模型从Spring 2.5就存在了,而且这种模型非常流行,但是它也有一些缺点。

首先,所有基于注解的编程方式都会存在注解该做什么以及注解如何做之间的割裂。注解本身定义了该做什么,而具体如何去做则是在框架代码的其他部分定义的。如果想要进行自定义或扩展,编程模型就会变得很复杂,因为这样的变更需要修改注解之外的代码。除此之外,这种代码的调试也是比较麻烦的,因为我们无法在注解上设置断点。

其次,随着Spring变得越来越流行,很多熟悉其他语言和框架的Spring新手会觉得基于注解的Spring MVC(和WebFlux)与他们之前掌握的知识有很大的差异。作为注解式WebFlux的一种替代方案,Spring 5引入了一个新的函数式编程模型,用来定义反应式API。

这个新的编程模型使用起来更像一个库,而不是一个框架,能够让我们在不使用注解的情况下将请求映射到处理器代码中。使用Spring的函数式编程模型编写API会涉及4个主要的类型:

  • RequestPredicate:声明要处理的请求类型。
  • RouterFunction:声明如何将请求路由到处理器代码中。
  • ServerRequest:代表一个HTTP请求,包括对请求头和请求体的访问。
  • ServerResponse:代表一个HTTP响应,包括响应头和响应体信息。

下面是一个将所有类型组合在一起的Hello World样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package demo;
import static org.springframework.web.
reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.
reactive.function.server.RouterFunctions.route;
import static org.springframework.web.
reactive.function.server.ServerResponse.ok;
import static reactor.core.publisher.Mono.just;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
@Configuration
public class RouterFunctionConfig {
@Bean
public RouterFunction<?> helloRouterFunction() {
return route(GET("/hello"),
request -> ok().body(just("Hello World!"), String.class));
}
}

我们需要注意的第一件事情就是,在这里静态导入了一些辅助类,可以使用它们来创建前文所述的函数式类型。我们还以静态方式导入了Mono,从而能够让剩余的代码更易于阅读和理解。

在这个@Configuration类中,我们有一个类型为RouterFunction<?>的@Bean方法。按照前文所述,RouterFunction能够声明一个或多个RequestPredicate对象和处理与之匹配的请求的函数之间的映射关系。

RouterFunctions的route()方法接受两个参数:RequestPredicate以及处理与之匹配的请求的函数。在本例中,RequestPredicates的GET()方法声明一个RequestPredicate,后者会匹配针对“/hello”的HTTP GET请求。

至于处理器函数,它写成了lambda表达式的形式,当然它也可以使用方法引用。尽管这里没有显式声明,但是处理器lambda表达式会接受一个ServerRequest作为参数。它通过ServerResponse的ok()方法和BodyBuilder的body()方法返回了一个ServerResponse。BodyBuilder对象是由ok()所返回的。这样的话,就会创建出状态码为HTTP 200 (OK)并且响应体载荷为“Hello World!”的响应。

按照这种编写形式,helloRouterFunction()方法所声明的RouterFunction只能处理一种类型的请求。如果想要处理不同类型的请求,那么我们没有必要编写另外一个@Bean(当然你也可以这样做),仅需调用andRoute()来声明另一个RequestPredicate到函数的映射。例如,为“/bye”的GET请求添加一个处理器:

1
2
3
4
5
6
7
@Bean
public RouterFunction<?> helloRouterFunction() {
return route(GET("/hello"),
request -> ok().body(just("Hello World!"), String.class))
.andRoute(GET("/bye"),
request -> ok().body(just("See ya!"), String.class));
}

Hello World这种级别的样例只能用来简单体验一些新东西。接下来,我们进一步看一下如何使用Spring的函数式Web编程模型处理接近真实场景的请求。

为了阐述如何在真实应用中使用函数式编程模型,我们会使用函数式风格重新实现DesignTacoController的功能。如下的配置类是DesignTacoController的函数式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class RouterFunctionConfig {
@Autowired
private TacoRepository tacoRepo;
@Bean
public RouterFunction<?> routerFunction() {
return route(GET("/design/taco"), this::recents)
.andRoute(POST("/design"), this::postTaco);
}
public Mono<ServerResponse> recents(ServerRequest request) {
return ServerResponse.ok()
.body(tacoRepo.findAll().take(12), Taco.class);
}
public Mono<ServerResponse> postTaco(ServerRequest request) {
Mono<Taco> taco = request.bodyToMono(Taco.class);
Mono<Taco> savedTaco = tacoRepo.save(taco);
return ServerResponse
.created(URI.create(
"http://localhost:8080/design/taco/" +
savedTaco.getId()))
.body(savedTaco, Taco.class);
}
}

我们可以看到,routerFunction()方法声明了一个RouterFunction<?> bean,这与Hello World样例类似。但是,它们之间的差异在于要处理什么类型的请求以及如何处理。在本例中,我们创建的RouterFunction处理针对“/design/taco”的GET请求以及针对“/design”的POST请求。

更明显的差异在于,路由是由方法引用处理的。如果RouterFunction背后的行为相对简单和简洁,那么lambda是很不错的选择。在很多场景下,最好将功能抽取到一个单独的方法中(甚至抽取到一个独立类的方法中),以便于保持代码的可读性。

就我们的需求而言,针对“/design/taco”的GET请求将由recents()方法来处理。它使用注入的TacoRepository得到一个Flux<Taco>,然后从中得到12个条目。针对“/design”的POST请求会由postTaco()方法来处理,它会从传入的ServerRequest中抽取Mono<Taco>。postTaco()使用TacoRepository方法来进行保存,随后使用save()返回的Mono<Taco>作为响应。

11.3 测试反应式控制器

在反应式控制器的测试方面,Spring 5并没有置我们于不顾。实际上,Spring 5引入了WebTestClient。这是一个新的测试工具类,让Spring WebFlux编写的反应式控制器的测试变得非常容易。为了了解如何使用WebTestClient编写测试,我们首先使用它测试11.1.2小节中编写的DesignTacoController中的recentTacos()方法。

11.3.1 测试GET请求

对于recentTacos()方法,我们想断言如果针对“/design/recent”路径发送HTTPGET请求,那么将会得到JSON载荷的响应并且taco的数量不会超过12个。程序清单11.1中的测试类将会是一个很好的起点。

程序清单11.1 使用WebTestClient测试DesignTacoController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package tacos;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import tacos.Ingredient.Type;
import tacos.data.TacoRepository;
import tacos.web.api.DesignTacoController;
public class DesignTacoControllerTest {
@Test
public void shouldReturnRecentTacos() {
Taco[] tacos = {
testTaco(1L), testTaco(2L),
testTaco(3L), testTaco(4L), ⇽--- 创建测试数据
testTaco(5L), testTaco(6L),
testTaco(7L), testTaco(8L),
testTaco(9L), testTaco(10L),
testTaco(11L), testTaco(12L),
testTaco(13L), testTaco(14L),
testTaco(15L), testTaco(16L)};
Flux<Taco> tacoFlux = Flux.just(tacos);
TacoRepository tacoRepo = Mockito.mock(TacoRepository.class);
when(tacoRepo.findAll()).thenReturn(tacoFlux); ⇽--- Mock TacoRepository
WebTestClient testClient = WebTestClient.bindToController(
new DesignTacoController(tacoRepo))
.build(); ⇽--- 创建WebTestClient
testClient.get().uri("/design/recent")
.exchange() ⇽--- 请求最近的taco
.expectStatus().isOk() ⇽--- 检验预期的响应
.expectBody()
.jsonPath("$").isArray()
.jsonPath("$").isNotEmpty()
.jsonPath("$[^0].id").isEqualTo(tacos[^0].getId().toString())
.jsonPath("$[^0].name").isEqualTo("Taco 1").jsonPath("$[^1].id")
.isEqualTo(tacos[^1].getId().toString()).jsonPath("$[^1].name")
.isEqualTo("Taco 2").jsonPath("$[^11].id")
.isEqualTo(tacos[^11].getId().toString())
...
.jsonPath("$[^11].name").isEqualTo("Taco 12").jsonPath("$[^12]")
.doesNotExist();
.jsonPath("$[^12]").doesNotExist();
}
...
}

shouldReturnRecentTacos()方法做的第一件事情就是以Flux<Taco>的形式创建了一些测试数据。这个Flux随后作为mock TacoRepository的findAll()方法的返回值。

Flux发布的Taco对象是由一个名为testTaco()的方法创建的。这个方法会根据一个数字生成一个Taco,其ID和名称都是基于该数字生成的。testTaco()方法的实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
private Taco testTaco(Long number) {
Taco taco = new Taco();
taco.setId(UUID.randomUUID());
taco.setName("Taco " + number);
List<IngredientUDT> ingredients = new ArrayList<>();
ingredients.add(
new IngredientUDT("INGA", "Ingredient A", Type.WRAP));
ingredients.add(
new IngredientUDT("INGB", "Ingredient B", Type.PROTEIN));
taco.setIngredients(ingredients);
return taco;
}

简单起见,所有的测试taco都具有两种相同的配料,但是它们的ID和名称是根据传入的数字确定的。

另外,回到shouldReturnRecentTacos()方法,我们实例化了一个DesignTacoController并将mock TacoRepository注入到了构造器中。这个控制器传递给了WebTestClient. bindToController()方法,以便于生成WebTestClient实例。

所有的环境搭建工作完成后,我们可以使用WebTestClient提交GET请求至“/design/recent”并校验响应符合我们的预期。对get().uri(“/design/recent”)的调用描述了我们想要发送的请求。随后,调用exchange()会提交请求,这个请求将会由WebTestClient绑定的控制器(DesignTacoController)来进行处理。

最后,我们可以确认响应符合预期。通过调用expectStatus(),我们可以断言响应具有HTTP 200 (OK)状态码。然后,我们多次调用jsonPath()断言响应体中的JSON包含它应该具有的值。最后一个断言检查第12个元素(在基于零开始计数的数组中)是否真的不存在,以此判断结果不超过12个元素。

如果返回的JSON比较复杂,比如有大量的数据或多层嵌套的数据,那么使用jsonPath()会变得非常烦琐。实际上,为了节省空间,在程序清单11.1中,我省略了很多对jsonPath()的调用。在这种情况下,使用jsonPath()会变得非常枯燥烦琐,WebTestClient提供了json()方法。这个方法可以传入一个String参数(包含响应要对比的JSON)。

举例来说,假设我们在名为recent-tacos.json的文件中创建了完整的响应JSON并将它放到了类路径的“/tacos”路径下,那么我们可以按照如下的方式重写WebTestClient断言:

1
2
3
4
5
6
7
8
9
10
ClassPathResource recentsResource =
new ClassPathResource("/tacos/recent-tacos.json");
String recentsJson = StreamUtils.copyToString(
recentsResource.getInputStream(), Charset.defaultCharset());
testClient.get().uri("/design/recent")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBody()
.json(recentsJson);

因为json()接受的是一个String,所以我们必须先将类路径资源加载为String。借助Spring中StreamUtils的copyToString()方法,这一点很容易实现。copyToString()方法返回的String就是我们的请求所预期的响应JSON内容。将其传递给json()方法,我们就能确保控制器会生成正确的输出。

WebTestClient提供的另外一种可选方案就是它允许将响应体与一个值的列表进行对比。expectBodyList()方法会接受一个代表列表中元素类型的Class或ParameterizedTypeReference,并且会返回ListBodySpec对象,随后可以基于该对象进行断言。借助expectBodyList(),我们可以重写测试类,使用创建mockTacoRepository时的测试数据的子集来进行验证:

1
2
3
4
5
6
testClient.get().uri("/design/recent")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBodyList(Taco.class)
.contains(Arrays.copyOf(tacos, 12));

在这里,我们断言响应体包含了在测试方法开头处所创建的原始Taco数组的前12个元素。

11.3.2 测试POST请求

WebTestClient不仅能对控制器发送GET请求,还能用来测试各种HTTP方法,包括GET、POST、PUT、PATCH、DELETE和HEAD方法。表11.1将HTTP方法与WebTestClient的方法进行了映射。

表11.1 WebTestClient能够测试针对Spring WebFlux控制器的各种请求

epub_29101559_89

作为测试Spring WebFlux控制器其他HTTP请求方法的样例,我们看一下针对DesignTacoController的另一个测试。这一次,我们会编写一个对taco创建API的测试,也就是提交POST请求到“/design”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void shouldSaveATaco() {
TacoRepository tacoRepo = Mockito.mock(
TacoRepository.class); ⇽--- 搭建测试数据
Mono<Taco> unsavedTacoMono = Mono.just(testTaco(null));
Taco savedTaco = testTaco(null);
savedTaco.setId(1L);
Mono<Taco> savedTacoMono = Mono.just(savedTaco);
when(tacoRepo.save(any())).thenReturn(savedTacoMono); ⇽--- mock TacoRepository
WebTestClient testClient = WebTestClient.bindToController( ⇽--- 创建WebTestClient
new DesignTacoController(tacoRepo)).build();
testClient.post() ⇽--- POST taco
.uri("/design")
.contentType(MediaType.APPLICATION_JSON)
.body(unsavedTacoMono, Taco.class)
.exchange()
.expectStatus().isCreated() ⇽--- 校验响应
.expectBody(Taco.class)
.isEqualTo(savedTaco);
}

与上面的测试方法类似,shouldSaveATaco()首先会创建一些测试数据和mockTacoRepository,并且创建了一个WebTestClient并绑定到控制器上。随后,它使用WebTestClient提交POST请求到“/design”,并且将请求体声明为application/json类型,请求载荷为Taco的JSON序列化形式,放到未保存的Mono中。在执行exchange()之后,测试断言响应状态为HTTP 201 (CREATED)并且响应体中的载荷与已保存的Taco对象相同。

11.3.3 使用实时服务器进行测试

到目前为止,我们所编写的测试都依赖于Spring WebFlux的mock实现,所以并不需要真正的服务器。但是,我们可能需要在服务器(如Netty或Tomcat)环境中测试WebFlux控制器,也许还会需要repository或其他的依赖。换句话说,我们有可能要编写集成测试。

要编写WebTestClient的集成测试,与其他的Spring Boot集成测试类似,我们首先要为测试类添加@RunWith@SpringBootTest

1
2
3
4
5
6
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment=WebEnvironment.RANDOM_PORT)
public class DesignTacoControllerWebTest {
@Autowired
private WebTestClient testClient;
}

通过将webEnvironment属性设置为WebEnvironment.RANDOM_PORT,我们要求Spring启动一个运行时服务器并监听任意选择的端口
^1

你可能也注意到,我们将WebTestClient自动织入到了测试类中。这不仅意味着我们不用在测试的方法中创建它了,而且在发送请求的时候也不需要指定完整的URL了。这是因为WebTestClient能够知道测试服务器在哪个端口上运行。现在,我们可以使用自动织入的WebTestClient将shouldReturnRecentTacos()重写为集成测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void shouldReturnRecentTacos() throws IOException {
testClient.get().uri("/design/recent")
.accept(MediaType.APPLICATION_JSON).exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath("$[?(@.id == 'TACO1')].name")
.isEqualTo("Carnivore")
.jsonPath("$[?(@.id == 'TACO2')].name")
.isEqualTo("Bovine Bounty")
.jsonPath("$[?(@.id == 'TACO3')].name")
.isEqualTo("Veg-Out");
}

我们发现,这个新版本的shouldReturnRecentTacos()代码要少得多。我们不再需要创建WebTestClient,因为可以使用自动织入的实例。另外,也不需要mockTacoRepository,因为Spring将会创建DesignTacoController实例并将一个真正的TacoRepository注入进来。在新版本的测试方法中,我们使用JSONPath表达式来校验数据库提供的值。

WebTestClient在测试的时候非常有用,此时我们会消费WebFlux控制器所暴露的API。但是,如果我们的应用本身要消费某个API,又该怎样处理呢?接下来,我们将注意力转向Spring反应式Web的客户端,看一下WebClient如何通过REST客户端来处理反应式类型,如Mono和Flux。

8.4 小结

  • 异步消息在要通信的应用程序之间提供了一个中间层,这样能够实现更松散的耦合和更强的可扩展性。
  • Spring支持使用JMS、RabbitMQ或Apache Kafka实现异步消息。
  • 应用程序可以使用基于模板的客户端(JmsTemplate、RabbitTemplate或KafkaTemplate)向消息代理发送消息。
  • 接收消息的应用程序可以借助相同的基于模板的客户端以拉取模式消费消息。
  • 通过使用消息监听器注解(@JmsListener@RabbitListener@KafkaListener),消息也可以推送至消费者的bean方法中。

8.3 使用Kafka的消息

Apache Kafka是我们在本章研究的最新的消息方案。乍看上去,Kafka是与ActiveMQ、Artemis或Rabbit类似的消息代理,其实Kafka有一些独特的技巧。

Kafka设计为集群运行,从而能够实现很强的可扩展性。通过将主题在集群的所有实例上进行分区(partition),它能够具有更强的弹性。RabbitMQ主要处理Exchange中的队列,而Kafka仅使用主题实现消息的发布/订阅。

Kafka主题会复制到集群的所有代理上。集群中的每个节点都会担任一个或多个主题的首领(leader),负责该主题的数据并将其复制到集群中的其他节点上。

更进一步来讲,每个主题可以划分为多个分区。在这种情况下,集群中的每个节点是某个主题一个或多个分区的首领,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。图8.2阐述了它是如何运行的。

image-20211015203522617

图8.2 Kafka集群是由多个代理组成的,每个代理作为主题分区的首领

关于Kafka的独特架构,我建议你阅读Dylan Scott编写的Kafka in Action(Manning,2017)。就我们来讲,我们将会关注如何通过Spring发送和接收Kafka的消息。

8.3.1 为Spring搭建支持Kafka消息的环境

为了搭建Kafka的消息环境,我们需要添加对应的依赖到构建文件中。但是,与JMS和RabbitMQ方案不同,并没有针对Kafka的Spring Boot starter。不过,不用担心,我们只需要添加一项依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

这项依赖会为我们的项目引入Kafka所需的所有内容。另外,它的出现会触发Spring Boot对Kafka的自动配置,除了其他功能之外,它会在Spring应用上下文中创建一个KafkaTemplate。我们所需要做的就是注入KafkaTemplate并使用它来发布和接收消息。

但是,在发送和接收消息之前,我们还需要注意使用Kafka时的一些属性。具体来讲,KafkaTemplate默认会使用localhost上监听9092端口的Kafka代理。在开发应用的时候,在本地启动Kafka代理没有问题,但是在投入生产的时候,我们需要配置不同的主机和端口。

spring.kafka.bootstrap-servers属性能够设置一个或多个Kafka服务器的地址,系统将会使用它来建立到Kafka集群的初始连接。例如,集群中的某个服务器运行在kafka.tacocloud.com上并监听9092端口,那么我们可以按照如下的方式在YAML中配置它的位置:

1
2
3
4
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092

但是需要注意spring.kafka.bootstrap-servers是复数形式,它能接受一个列表。所以,我们可以提供集群中的多个Kafka服务器:

1
2
3
4
5
6
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094

Kafka在项目中准备就绪之后,我们就可以发送和接收消息了。我们首先使用KafkaTemplate发送Order对象到Kafka中。

8.3.2 通过KafkaTemplate发送消息

在很多方面,KafkaTemplate与JMS和RabbitMQ对应的模板非常相似。但同时,它也有很大的差异。在发送消息的时候,这一点非常明显:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
Long timestamp, K key, V data);

我们首先可能会发现,这里没有convertAndSend()方法了。这是因为,KafkaTemplate是通过泛型类型化的,在发送消息的时候,它能够直接处理领域类型。这样的话,所有的send()方法都完成了convertAndSend()的任务。

你可能也会发现,send()和sendDefault()的参数与JMS和Rabbit有很大的差异。在使用Kafka发送消息的时候,我们可以使用如下参数设置消息该如何进行发送:

  • 消息要发送到的主题(send()方法的必选参数);
  • 主题要写入的分区(可选);
  • 记录上要发送的key(可选);
  • 时间戳(可选,默认为System.currentTimeMillis());
  • 载荷(必选)。

主题和载荷是其中最重要的两个参数。分区和key对于如何使用KafkaTemplate几乎没有影响,只是作为额外的信息提供给send()和sendDefault()。对于我们的场景来说,我们只关心将消息载荷发送到给定的主题,不用担心分区和key的问题。

对于send()方法来说,我们还可以选择发送一个ProducerRecord对象,它只是一个简单类型,将上述的参数放到了一个对象中。我们还可以发送Message对象,但是需要将领域对象转换成Message对象。相对创建和发送ProducerRecord和Message对象,使用其他的方法会更简单一些。

借助KafkaTemplate及其send()方法,我们可以编写一个基于Kafka实现的OrderMessagingService实现。程序清单8.8展现了该实现类。

程序清单8.8 使用KafkaTemplate发送订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService
implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}

在这个OrderMessagingService的新实现中,sendOrder()用到了注入的KafkaTemplate对象的send()方法,将Order发送到名为tacocloud.orders.topic的主题中。除了代码中随处可见的“Kafka”之外,它其实与为JMS和Rabbit编写的代码并没有太大的差异。

如果你想要设置默认主题,那么可以稍微简化一下sendOrder()。首先,通过spring.kafka.template.default-topic属性,我们可以将默认主题设置为tacocloud.orders.topic:

1
2
3
4
spring:
kafka:
template:
default-topic: tacocloud.orders.topic

然后,在sendOrder()方法中,我们就可以调用sendDefault()而不是send()了,这样可以不用指定主题的名称:

1
2
3
4
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}

现在,我们已经编写完发送消息的代码了。接下来,我们转移一下注意力,编写从Kafka中接收消息的代码。

8.3.3 编写Kafka监听器

除了send()和sendDefault()特有的方法签名之外,KafkaTemplate与JmsTemplate和RabbitTemplate另一个不同之处在于它没有提供接收消息的方法。这意味着在Spring中想要消费来自Kafka主题的消息只有一种办法,就是编写消息监听器。

对于Kafka消息来说,消息监听器是通过带有@KafkaListener注解的方法来实现的。@KafkaListener大致对应于@JmsListener@RabbitListener,并且使用方式也基本相同。如下的程序清单展示了为Kafka编写的基于监听器的订单接收器。

程序清单8.9 使用@KafkaListener接收订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}

handle()方法使用了@KafkaListener注解,表明当有消息抵达名为tacocloud.orders.topic的主题时,该方法将会被调用。如程序清单8.9所示,我们只将Order(载荷)对象传递给了handle()。如果你想要获取消息中其他的元数据,我们也可以接受ConsumerRecord或Message对象。

例如,如下的handle()实现接受一个ConsumerRecord,这样我们就能在日志中将消息的分区和时间戳记录下来:

1
2
3
4
5
6
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}

类似地,我们还可以用一个Message对象来替代ConsumerRecord,并且能够达到相同的目的:

1
2
3
4
5
6
7
8
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}

值得一提的是,消息载荷也可以通过ConsumerRecord.value()或Message.getPayload()获取到。这意味着我们可以通过这些对象获取Order,而不必直接将其作为handle()的参数。

8.2 使用RabbitMQ和AMQP

RabbitMQ可以说是AMQP最杰出的实现,它提供了比JMS更高级的消息路由策略。JMS消息使用目的地名称来寻址,接收者要从这里检索消息,而AMQP消息使用Exchange和routing key来寻址,这样消息就与接收者要监听的队列解耦了。Exchange和队列的关系如图8.1所示。

image-20211015123622109

图8.1 发送到RabbitMQ Exchange的消息会基于routing key和binding被路由到一个或多个队列上

当消息抵达RabbitMQ代理的时候,它会进入为其设置的Exchange上。Exchange负责将它路由到一个或多个队列中,这个过程会根据Exchange的类型、Exchange和队列之间的binding以及消息的routing key进行路由。

这方面有多个不同类型的Exchange,包括以下内容。

  • Default:这是代理创建的特殊Exchange。它会将消息路由至名字与消息routing key相同的队列。所有的队列都会自动绑定至Default Exchange。
  • Direct:如果消息的routing key与队列的binding key相同,那么消息将会路由到该队列上。
  • Topic:如果消息的routing key与队列binding key(可能会包含通配符)匹配,那么消息将会路由到一个或多个这样的队列上。
  • Fanout:不管routing key和binding key是什么,消息都将会路由到所有绑定队列上。
  • Headers:与Topic Exchange类似,只不过要基于消息的头信息进行路由,而不是routing key。
  • Dead letter:捕获所有无法投递(也就是它们无法匹配所有已定义的Exchange和队列的binding关系)的消息。

最简单的Exchange形式是Default和Fanout,因为它们大致对应了JMS中的队列和主题,但是其他的Exchange允许我们定义更加灵活的路由模式。

这里最重要的是要明白消息会通过routing key发送至Exchange,而消息要在队列中被消费。它们如何从Exchange路由至队列取决于binding的定义以及哪种方式最适合我们的使用场景。

至于使用哪种Exchange类型以及如何定义从Exchange到队列的binding,这本身与如何在Spring应用中发送和接收消息关系不大。因此,我们更加关心如何编写使用Rabbit发送和接收消息的代码。

注意:关于如何绑定队列到Exchange的更详细讨论,请参考Alvaro Videla和Jason J.W. Williams编写的RabbitMQ in Action (Manning,2012)。

8.2.1 添加RabbitMQ到Spring中

在使用Spring发送和接收RabbitMQ消息之前,我们需要将Spring Boot的AMQPstarter依赖添加到构建文件中,替换上文中Artemis或ActiveMQ starter的位置:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加AMQP starter到构建文件中之后,将会触发自动配置功能,这样会为我们创建一个AMQP连接工厂和RabbitTemplate bean,以及其他的一些支撑组件。我们要使用Spring发送和接收RabbitMQ代理的消息,只需要添加这项依赖就可以了。但是,这里还有一些我们需要掌握的有用属性,如表8.4所示。

表8.4 配置RabbitMQ代理位置和凭证的属性

epub_29101559_49

对于开发来说,我们可能会使用不需要认证的RabbitMQ代理,它运行在本地机器上并监听5672端口。在开发阶段,这些属性可能没有太大的用处,但是当应用程序投入生产环境时,它们无疑是非常有用的。

假设我们要将应用投入生产环境,RabbitMQ代理位于名为rabbit.tacocloud.com服务器上,监听5673端口并且需要认证。在这种情况下,当prod profile处于激活状态时,application.yml文件中的如下配置将会设置这些属性:

1
2
3
4
5
6
7
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: l3tm31n

在我们的应用中,RabbitMQ已经配置好了,接下来就可以使用RabbitTemplate发送消息了。

8.2.2 通过RabbitTemplate发送消息

Spring对RabbitMQ消息支持的核心是RabbitTemplate。RabbitTemplate与JmsTemplate类似,提供了一组相似的方法。但是,我们将会看到,这里有一些细微的差异,这是与RabbitMQ独特的运行方式有关的。

在使用RabbitTemplate发送消息方面,我们可以使用与JmsTemplate中同名的send()和convertAndSend()方法。但是,与JmsTemplate的方法只是将消息路由至队列或主题不同,RabbitTemplate会按照Exchanges和routing key来发送消息。下面列出关于使用RabbitTemplate发送消息比较重要的一些方法^1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 发送原始的消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message)
throws AmqpException;
// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message) throws AmqpException;
// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message, MessagePostProcessor mPP)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;

我们可以看到,这些方法与JmsTemplate中对应的方法遵循了相同的模式。前3个send()方法都是发送原始的Message对象。接下来的3个convertAndSend()方法会接受一个对象,这个对象会在发送之前在幕后转换成Message。最后的3个convertAndSend()方法与前面的3个方法类似,但是它们还会接受一个MessagePostProcessor对象,这个对象能够在Message发送至代理之前对其进行操作。

这些方法与JmsTemplate对应方法的不同之处在于,它们会接受String类型的值以指定Exchange和routing key,而不像JmsTemplate那样接受目的地名称(或Destination)。没有接受Exchange参数的方法会将消息发送至DefaultExchange。与之类似,没有指定routing key的方法会把消息路由至默认的routing key。

接下来,我们看一下如何使用RabbitTemplate发送taco订单。有一种方式是使用send()方法,如程序清单8.5所示。但是,在调用send()之前,我们需要将Order对象转换为Message。RabbitTemplate能够通过getMessageConverter()方法获取消息转换器,否则,这项工作会非常乏味。

程序清单8.5 使用RabbitTemplate.send()发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package tacos.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class RabbitOrderMessagingService
implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}

有了MessageConverter之后,将Order转换成Message就是非常简单的任务了。我们必须通过MessageProperties来提供消息属性,但是如果我们不需要设置任何这样的属性,使用默认的MessageProperties实例就可以了。随后,剩下的就是调用send()了,并将Exchange和routing key(这两者都是可选的)连同消息一起传递过去。在本例中,我们只指定了routing key(tacocloud.order)和消息本身,所以会使用默认的Exchange。

这里提到了默认的Exchange,它的名字是“”(空的String),对应RabbitMQ代理自动生成的Default Exchange。与之相似,默认的routing key也是“”(它的路由将会取决于Exchange以及相应的binding)。我们可以通过设置spring.rabbitmq.template.exchange和spring.rabbitmq.template.routing-key属性重写这些默认值:

1
2
3
4
5
spring:
rabbitmq:
template:
exchange: tacocloud.orders
routing-key: kitchens.central

在本例中,所有未指明Exchange的消息都将会自动发送至名为tacocloud.orders的Exchange。如果在调用send()或convertAndSend()的时候也没有指定routingkey,那么消息将会使用值为kitchens.central的routing key。

通过消息转换器创建Message对象是非常简单的,但是使用convertAndSend()让RabbitTemplate处理所有的转换操作则会更加简单:

1
2
3
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order", order);
}

配置消息转换器

默认情况下,消息转换是通过SimpleMessageConverter来实现的,它能够将简单类型(如String)和Serializable对象转换成Message对象。但是,Spring为RabbitTemplate提供了多个消息转换器,包括下面内容。

  • Jackson2JsonMessageConverter:使用Jackson 2 JSON实现对象和JSON的相互转换。
  • MarshallingMessageConverter:使用Spring的Marshaller和Unmarshaller进行转换。
  • SerializerMessageConverter:使用Spring的Serializer和Deserializer转换String和任意种类的原生对象。
  • SimpleMessageConverter:转换String、字节数组和Serializable类型。
  • ContentTypeDelegatingMessageConverter:基于contentType头信息,将转换功能委托给另外一个MessageConverter。
  • MessagingMessageConverter:将消息转换功能委托给另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter。

如果需要变更消息转换器,就要配置一个类型为MessageConverter的bean。例如,对于基于JSON的转换,我们可以按照如下的方式来配置Jackson2JsonMessageConverter:

1
2
3
4
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

Spring Boot的自动配置功能会发现这个bean,并将它注入RabbitTemplate中,替换默认的消息转换器。

设置消息属性

与在JMS中一样,我们可能需要在发送的消息中添加一些头信息。例如,假设我们需要为所有通过Taco Cloud Web站点提交的订单添加一个X_ORDER_SOURCE信息。在自行创建Message的时候,我们可以通过MessageProperties实例设置头信息,随后将这个对象传递给消息转换器。回到程序清单8.5的sendOrder()方法,我们需要做的就是添加一行设置头信息的代码:

1
2
3
4
5
6
7
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}

但是,在使用convertAndSend()的时候,我们无法快速访问MessageProperties对象。不过,此时MessagePostProcessor可以帮助我们:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}

在这里,我们为convertAndSend()提供了MessagePostProcessor接口的匿名内部类实现。在postProcessMessage()中,我们从Message中拉取MessageProperties对象,然后通过setHeader()方法设置X_ORDER_SOURCE头信息。

现在,我们已经看到了如何通过RabbitTemplate发送消息,接下来我们转换视角看一下如何接收来自RabbitMQ队列的消息。

8.2.3 接收来自RabbitMQ的消息

我们看到使用RabbitTemplate发送消息与使用JmsTemplate发送消息并没有太大差别。实际上,接收来自RabbitMQ队列的消息也与JMS没有太大差别。与JMS类似,我们有两个可选方案。

  • 使用RabbitTemplate从队列拉取消息。
  • 将消息推送至带有@RabbitListener注解的方法。

我们首先看一下基于拉取的RabbitTemplate.receive()方法。

RabbitTemplate提供了多个从队列拉取消息的方法。其中,最有用的方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 接收消息
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// 接收由消息转换而成的对象
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws
AmqpException;
// 接收由消息转换而成的类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws
AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T>
type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
ParameterizedTypeReference<T> type)
throws AmqpException;

这些方法对应于前文所述的send()和convertAndSend()方法。send()用于发送原始的Message对象,而receive()则会接收来自队列的原始Message对象。与之类似,receiveAndConvert()接收消息并且在返回之前使用一个消息转换器将它们转换为领域对象。

但是,在方法签名上有一些明显的不同。首先,这些方法都不会接收Exchange和routing key作为参数。这是因为Exchange和routing key是用来将消息路由至队列的,在消息位于队列中之后,它们的目的地是将它们从队列中拉取下来的消费者。消费消息的应用本身并不需要关心Exchange和routing key。消费消息的应用只需要知道队列信息就可以了。

你可能会注意到,很多方法都接收一个long类型的参数,用来指定接收消息的超时时间。默认情况下,接收消息的超时时间是0毫秒。也就是说,调用receive()会立即返回,如果没有可用消息,那么返回值是null。这是与JmsTemplate的receive()的一个显著差异。通过传入一个超时时间的值,我们就可以让receive()和receiveAndConvert()阻塞,直到消息抵达或者超时时间过期。但是,即便我们设置了非零的超时时间,在代码中依然要处理null返回值的场景。

接下来,我们看一下如何实际使用它们。程序清单8.6展现了一个新的基于Rabbit的OrderReceiver实现,它使用RabbitTemplate来接收订单。

程序清单8.6 通过RabbitTemplate从RabbitMQ中拉取订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package tacos.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.orders");
return message != null
? (Order) converter.fromMessage(message)
: null;
}
}

所有的操作都发生在receiveOrder()方法中。它调用了注入的RabbitTemplate对象的receive()方法,从名为tacocloud.orders的队列中拉取一个订单。它并没有提供超时值,所以我们只能假定这个调用会马上返回,要么得到Message对象,要么返回null。如果返回Message对象,就使用RabbitTemplate中的MessageConverter将Message转换成一个Order对象。如果receive()方法返回null,我们就将null作为返回值。

根据使用场景,我们也许能够容忍一定的延迟。例如,在Taco Cloud厨房悬挂的显示器中,如果没有订单,我们可以稍等一会儿。假设在放弃之前,我们决定等待30秒钟,那么receiveOrder()方法可以修改为传递30 000毫秒的延迟给receive()方法:

1
2
3
4
5
6
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.order.queue", 30000);
return message != null
? (Order) converter.fromMessage(message)
: null;
}

如果你像我一样,觉得使用这样一个硬编码的数字会让人觉得不舒服,那么你可能会想,创建一个带有@ConfigurationProperties注解的类并使用Spring Boot的配置属性来设置超时时间,可能会是更好的方案。在一点上,我的想法和你一样,只不过Spring Boot已经为我们提供了一个这样的配置属性。如果你想要通过配置来设置超时时间,只需要在调用receive()的时候移除超时值,并将超时时间设置为spring.rabbitmq.template.receive-timeout属性即可:

1
2
3
4
spring:
rabbitmq:
template:
receive-timeout: 30000

回到receiveOrder()方法,我们必须要使用RabbitTemplate中的消息转换器才能将传入的Message对象转换成Order对象。但是,既然RabbitTemplate已经携带了消息转换器,它为什么不能自动为我们进行转换呢?这就是receiveAndConvert()方法所做的事情。借助receiveAndConvert(),我们可以将receiveOrder()重写为:

1
2
3
public Order receiveOrder() {
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
}

看起来简单了许多,对吧。唯一让我觉得麻烦的就是从Object到Order的类型转换。不过,这种转换还有另外一种实现方式。我们可以传递一个ParameterizedTypeReference引用给receiveAndConvert(),这样我们就可以直接得到Order对象了:

1
2
3
4
public Order receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}

关于这种方式是否真的比类型转换更好,依然还有争论,但是它确实能够更加确保类型安全。唯一需要注意的是,要在receiveAndConvert()中使用ParameterizedTypeReference,消息转换器必须要实现SmartMessageConverter,目前Jackson2JsonMessageConverter是唯一一个可选的内置实现。

RabbitTemplate提供的拉取模式适用于很多使用场景,但是有时候监听消息并在消息抵达的时候对其进行处理会更好一些。接下来,我们看一下如何编写消息驱动的bean,让它对RabbitMQ消息做出回应。

使用监听器处理RabbitMQ的消息

Spring提供了RabbitListener实现消息驱动的RabbitMQ bean,对应于JmsListener。为了声明当消息抵达RabbitMQ队列时某个方法应该被调用,我们可以为bean的方法添加@RabbitListener注解。

例如,程序清单8.7展现了OrderReceiver的RabbitMQ实现,它通过注解声明要监听订单消息,而不是使用RabbitTemplate进行轮询。

程序清单8.7 将方法声明为RabbitMQ的消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package tacos.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}

你肯定会发现它与程序清单8.4的代码非常相似。确实,唯一的变更就是监听器的注解,从@JmsListener换成了@RabbitListener。尽管@RabbitListener注解非常棒,但是几乎重复的代码让我无法找出@RabbitListener具有什么在@JmsListener中没有提到的功能。当消息从各自的代理推送过来的时候,这两个注解都非常适合用来编写对应的代码,其中@JmsListener对应的是JMS代理,而@RabbitListener对应的是RabbitMQ代理。

在前面的段落中,你可能会对@RabbitListener感到索然无趣,但是这并非我的本意。实际上,@RabbitListener和@JmsListener的运行方式非常相似是一件令人兴奋的事情。这意味着当我们使用RabbitMQ替代Artemis或ActiveMQ的时候,不需要学习全新的编程模型。同样令人兴奋的是,RabbitTemplate和JmsTemplate之间也具有这样的相似性。

让我们暂且保持一下这种兴奋,在本章结束之前,我们看一下Spring支持的另外一个消息方案:Apache Kafka。

8.1 使用JMS发送消息

JMS是一个Java标准,定义了使用消息代理(message broker)的通用API,最早于2001年提出。长期以来,JMS一直是实现异步消息的首选方案。在JMS出现之前,每个消息代理都有私有的API,这就使得不同代理之间的消息代码很难通用。但是借助JMS,所有遵从规范的实现都使用通用的接口,这就好像JDBC为数据库操作提供了通用的接口一样。

Spring通过基于模板的抽象为JMS功能提供了支持,这个模板就是JmsTemplate。借助JmsTemplate,我们能够非常容易地在消息生产方发送队列和主题消息,在消费消息的那一方,也能够非常容易地接收这些消息。Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息。

我们将会讨论Spring对JMS的支持,包括JmsTemplate和消息驱动POJO。但是在发送和接收消息之前,我们首先需要一个消息代理(broker),它能够在消息的生产者和消费者之间传递消息。对Spring JMS的探索就从在Spring中搭建消息代理开始吧。

8.1.1 搭建JMS环境

在使用JMS之前,我们必须要将JMS客户端添加到项目的构建文件中。借助SpringBoot,这再简单不过了。我们所需要做的就是添加一个starter依赖到构建文件中。但是,首先,我们需要决定该使用Apache ActiveMQ还是更新的ApacheActiveMQ Artemis代理。

如果选择使用Apache ActiveMQ,那么我们需要添加如下的依赖到项目的pom.xml文件中:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

如果选择使用ActiveMQ Artemis,那么starter依赖将会如下所示:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>

Artemis是重新实现的下一代ActiveMQ,使ActiveMQ变成了遗留方案。因此,Taco Cloud会选择使用Artemis。但是在编写发送和接收消息的代码方面,选择哪种方案几乎没有什么影响。唯一需要注意的重要差异就是如何配置Spring创建到代理的连接。

默认情况下,Spring会假定Artemis代理在localhost的61616端口运行。对于开发来说,这样是没有问题的,但是一旦要将应用部署到生产环境,我们就需要设置一些属性来告诉Spring该如何访问代理。最有用的属性如表8.1所示。

表8.1 配置Artemis代理的位置和凭证信息的属性

epub_29101559_45

例如,我们看看如下的application.yml文件条目,它可能会用于一个非开发的环境:

1
2
3
4
5
6
spring:
artemis:
host: artemis.tacocloud.com
port: 61617
user: tacoweb
password: l3tm31n

这会让Spring创建到Artemis代理的连接,该Artemis代理监听artemis.tacocloud.com的61617端口。它还为应用设置了与代理交互的凭证信息。凭证信息是可选的,但是对于生产环境来说,我们推荐使用它们。

如果你选择使用ActiveMQ而不是Artemis,那么需要使用ActiveMQ特定的属性,如表8.2所示。

表8.2 配置ActiveMQ代理的位置和凭证信息的属性

epub_29101559_46

需要注意,ActiveMQ代理不是分别设置代理的主机和端口,而是使用了一个名为spring.activemq.broker-url的属性来指定代理的地址。URL应该是“tcp://”协议的地址,如下面的YMAL片段所示:

1
2
3
4
5
spring:
activemq:
broker-url: tcp://activemq.tacocloud.com
user: tacoweb
password: l3tm31n

不管你是选择Artemis还是ActiveMQ,如果是在本地开发运行,那么你都不需要配置这些属性。

但是,如果你选择使用ActiveMQ,需要将spring.activemq.in-memory属性设置为false,防止Spring启动内存中运行的代理。内存中运行的代理看起来很有用,但是只有同一个应用发布和消费消息时才能使用它(这限制了它的用途)。

在继续下面的内容之前,我们要安装并启动一个Artemis(或ActiveMQ)代理,而不是选择使用嵌入式的代理。我在这里不再重复讲述安装过程,推荐你参考Artemis和ActiveMQ的文档来了解详细内容。

现在,我们已经在构建文件中添加了对JMS starter的依赖,代理也已经准备好将消息从一个应用传递到另一个应用。接下来,我们就可以开始发送消息了。

8.1.2 使用JmsTemplate发送消息

将JMS starter依赖(不管是Artemis还是ActiveMQ)添加到构建文件之后,Spring Boot会自动配置一个JmsTemplate(以及其他内容),我们可以将它注入到其他bean中,并使用它来发送和接收消息。

JmsTemplate是Spring对JMS集成支持功能的核心。与Spring其他面向模板的组件类似,JmsTemplate消除了大量传统使用JMS时所需的样板代码。如果没有JmsTemplate的话,那么我们需要编写代码来创建到消息代理的连接和会话,还要编写更多的代码来处理发送消息过程中可能出现的异常。JmsTemplate能够让我们关注真正要做的事情:发送消息。

JmsTemplate有多个用来发送消息的方法,包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 发送原始的消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator)
throws JmsException;
void send(String destinationName, MessageCreator messageCreator)
throws JmsException;
// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message)
throws JmsException;
void convertAndSend(String destinationName, Object message)
throws JmsException;
// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message,
MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message,
MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message,
MessagePostProcessor postProcessor) throws JmsException;

我们可以看到,实际上只有两个方法,也就是send()和convertAndSend(),每个方法都有重载形式以支持不同的参数。如果我们仔细观察一下,就会发现convertAndSend()的各种形式又可以分成两个子类型。在考虑这些方法作用的时候,我们对它们进行以下细分。

  • 3个send()方法都需要MessageCreator来生成Message对象。
  • 3个convertAndSend()方法会接受Object对象,并且会在幕后自动将Object转换为Message。
  • 3个convertAndSend()会自动将Object转换为Message,但同时还能接受一个MessagePostProcessor对象,用来在发送之前对Message进行自定义。

这3种方法分类都分别包含3个重载方法,它们的区别在于如何指定JMS的目的地(队列或主题)。

  • 有1个方法不接受目的地参数,它会将消息发送至默认的目的地。
  • 有1个方法接受Destination对象,该对象指定了消息的目的地。
  • 有1个方法接受String,它通过名字的形式指定了消息的目的地。

要让这些方法真正发挥作用,我们看一下程序清单8.1中的JmsOrderMessagingService,它使用了形式最简单的send()方法。

程序清单8.1 使用.send()方法将订单发送至默认的目的地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package tacos.messaging;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private JmsTemplate jms;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms) {
this.jms = jms;
}
@Override
public void sendOrder(Order order) {
jms.send(new MessageCreator() {
@Override
public Message createMessage(Session session)
throws JMSException {
return session.createObjectMessage(order);
}
}
);
}
}

sendOrder()方法调用了jms.send(),并传递了MessageCreator接口的一个匿名内部实现。这个实现类重写了createMessage()方法,从而能够通过给定的Order对象创建新的对象消息。

我不知道你的感觉如何,但是我认为程序清单8.1虽然比较直接,但还是有点啰唆。声明匿名内部类的过程使得原本很简单的方法调用变得很复杂。我们发现MessageCreator是一个函数式接口,所以我们可以通过lambda表达式简化一下sendOrder()方法:

1
2
3
4
@Override
public void sendOrder(Order order) {
jms.send(session -> session.createObjectMessage(order));
}

但是,需要注意对jms.send()的调用并没有指定目的地。为了让它能够运行,我们需要通过名为spring.jms.template.default-destination的属性声明一个默认的目的地名称。例如,我们可以在application.yml文件中这样设置该属性:

1
2
3
4
spring:
jms:
template:
default-destination: tacocloud.order.queue

在很多场景下,使用默认的目的地是最简单的可选方案。借助它,我们只声明一次目的地名称就可以了,代码只关心发送消息,而无须关心消息会发到哪里。但是,如果我们要将消息发送至默认目的地之外的其他地方,那么我们需要通过为send()设置参数来进行指定。

其中一种方式就是传递Destination对象作为send()方法的第一个参数。最简单的方式就是声明一个Destination bean并将其注入处理消息的bean中。例如,如下的bean声明Taco Cloud订单队列的Destination:

1
2
3
4
@Bean
public Destination orderQueue() {
return new ActiveMQQueue("tacocloud.order.queue");
}

很重要的一点需要注意,这里的ActiveMQQueue来源于Artemis(来自org.apache. activemq.artemis.jms.client包)。如果选择使用ActiveMQ(而不是Artemis),那么同样有一个名为ActiveMQQueue的类(来自org.apache.activemq.command包)。

在Destination bean注入到JmsOrderMessagingService之后,调用send()的时候,我们就可以使用它来指定目的地了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Destination orderQueue;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms,
Destination orderQueue) {
this.jms = jms;
this.orderQueue = orderQueue;
}
...
@Override
public void sendOrder(Order order) {
jms.send(
orderQueue,
session -> session.createObjectMessage(order));
}

通过Destination指定目的地的时候,我们其实可以设置Destination的更多属性,而不仅仅是目的地的名称。但是,在实践中,除了目的地名称我们几乎不会设置其他的属性。因此,使用名称作为send()的第一个参数会更加简单:

1
2
3
4
5
6
@Override
public void sendOrder(Order order) {
jms.send(
"tacocloud.order.queue",
session -> session.createObjectMessage(order));
}

尽管send()方法使用起来并不是特别困难(尤其是通过lambda表达式来实现MessageCreator的时候更是如此),但是它要求我们提供MessageCreator还是增加了一点复杂性。如果我们能够只指定要发送的对象(以及可能要用到的目的地),那岂不是更简单?这其实就是convertAndSend()的工作原理。接下来,我们看一下这种方式。

消息发送之前进行转换

JmsTemplates的convertAndSend()方法简化了消息的发布,因为它不再需要MessageCreator。我们将要发送的对象直接传递给convertAndSend(),这个对象在发送之前会被转换成Message。

例如,在如下重新实现的sendOrder()方法中,使用convertAndSend()将Order对象发送到给定名称的目的地:

1
2
3
4
@Override
public void sendOrder(Order order) {
jms.convertAndSend("tacocloud.order.queue", order);
}

与send()方法类似,convertAndSend()将会接受一个Destination对象或String值来确定目的地,我们也可以完全忽略目的地,将消息发送到默认目的地上。

不管使用哪种形式的convertAndSend(),传递给convertAndSend()的Order都会在发送之前转换成Message。在底层,这是通过MessageConverter的实现类来完成的,它替我们做了将对象转换成Message的脏活累活。

配置消息转换器

MessageConverter是Spring定义的接口,只有两个需要实现的方法:

1
2
3
4
5
public interface MessageConverter {
Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException;
Object fromMessage(Message message)
}

尽管这个接口实现起来很简单,但我们通常并没有必要创建自定义的实现。Spring已经提供了多个实现,如表8.3所示。

表8.3 Spring为通用的转换任务提供了多个消息转换器(所有的消息转换器都位于org.springframework.jms.support.converter包中)

epub_29101559_47

默认情况下,将会使用SimpleMessageConverter,但是它需要被发送的对象实现Serializable。这种办法可能也不错,但有时候我们可能想要使用其他的消息转换器来消除这种限制,比如MappingJackson2MessageConverter。

为了使用不同的消息转换器,我们必须要做的事情就是将选中的消息转换器实例声明为一个bean。例如,如下的bean声明将会使用MappingJackson2MessageConverter替代SimpleMessageConverter:

1
2
3
4
5
6
7
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
return messageConverter;
}

需要注意,在返回之前,我们调用了MappingJackson2MessageConverter的setTypeId PropertyName()方法。这非常重要,因为这样能够让接收者知道传入的消息要转换成什么类型。默认情况下,它将会包含要转换的类型的全限定类名。但是,这样的话会不太灵活,要求接收端也包含相同的类型,并且具有相同的全限定类名。

为了实现更大的灵活性,我们可以通过调用消息转换器的setTypeIdMappings()方法将一个合成类型名映射到实际类型上。举例来说,消息转换器bean方法的如下代码变更会将一个合成的order类型ID映射为Order类:

1
2
3
4
5
6
7
8
9
10
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
typeIdMappings.put("order", Order.class);
messageConverter.setTypeIdMappings(typeIdMappings);
return messageConverter;
}

这样的话,消息的_typeId属性中就不用发送全限定类型了,而是会发送order值。在接收端的应用中,将会配置类似的消息转换器,将order映射为它自己能够理解的订单类型。在接收端的订单可能位于不同的包中、有不同的类名,甚至可以只包含发送者Order属性的一个子集。

对消息进行后期处理

假设除了利润丰厚的Web业务之外,Taco Cloud还决定开几家实体的连锁taco店,鉴于任何一家餐馆都可能成为Web业务的运行中心,在餐馆中他们需要有一种方式告诉厨房订单的来源,这样厨房的工作人员就能为店面里的订单和Web上的订单执行不同的流程。

我们尽可以在Order对象上添加一个新的source属性,让它携带该信息:如果是在线订单,就将其设置为WEB;如果是店面里的订单,就将其设置为STORE。但是,这样我们就需要同时修改Web站点的Order类和厨房应用的Order类,但实际上只有taco的准备人员需要该信息。

有一种更简单的方案,就是为消息添加一个自定义的头部,让它携带订单的来源。如果我们使用send()方法来发送taco订单,那么通过调用Message对象的setStringProperty()方法非常容易实现:

1
2
3
4
5
jms.send("tacocloud.order.queue",
session -> {
Message message = session.createObjectMessage(order);
message.setStringProperty("X_ORDER_SOURCE", "WEB");
});

但是,这里的问题在于我们并没有使用send()。在使用convertAndSend()方法的时候,Message是在底层创建的,我们无法访问到它。

幸好,还有一种方式能够在发送之前修改底层创建的Message对象。我们可以传递一个MessagePostProcessor作为convertAndSend()的最后一个参数,借助它我们可以在Message创建之后做任何想做的事情。如下的代码依然使用了convertAndSend(),但是它能够在消息发送之前使用MessagePostProcessor添加X_ORDER_SOURCE头信息:

1
2
3
4
5
6
7
jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
});

你可能已经发现MessagePostProcessor是一个函数式接口。这意味着我们可以将匿名内部类替换为lambda,进一步简化它:

1
2
3
4
5
jms.convertAndSend("tacocloud.order.queue", order,
message -> {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});

尽管在这里我们只是将这个特殊的MessagePostProcessor用到了本次convertAndSend()方法调用中,但是你可能会发现在你的代码中会在不同的地方多次调用convertAndSend(),它们均会用到相同的MessagePostProcessor。在这种情况下,方法引用是比lambda更好的方案,它能避免不必要的代码重复:

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
Order order = buildOrder();
jms.convertAndSend("tacocloud.order.queue", order,
this::addOrderSource);
return "Convert and sent order";
}
private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}

我们已经看到了多种发送消息的方式,但是如果消息无人接收,那么只发送消息也没什么价值。接下来,我们看一下如何使用Spring和JMS接收消息。

8.1.3 接收JMS消息

在消费消息的时候,我们可以选择拉取模式(pull model)和推送模式(pushmodel),前者会在我们的代码中请求消息并一直等待直到消息到达为止,而后者则会在消息可用的时候自动在你的代码中执行。

JmsTemplate提供了多种方式来接收消息,但它们使用的都是拉取模式。我们可以调用其中的某个方法来请求消息,而线程会一直阻塞到一个消息抵达为止(这可能马上发生,也可能需要等待一会儿)。

另外,我们也可以使用推送模式,在这种情况下,我们会定义一个消息监听器,每当有消息可用时,它就会被调用。

这两种方案能够适用于各种用户场景。人们普遍觉得推送模式是更好的方案,因为它不会阻塞线程;但是,在某些场景下,如果消息抵达的速度太快,那么监听器可能会过载。而拉取模式允许消费者声明它们何时才为接收新消息做好准备。

我们将会看一下这两种方案,首先从JmsTemplate提供的拉取模式开始。

使用JmsTemplate来接收消息

JmsTemplate提供了多个对代理的拉取方法,其中包括:

1
2
3
4
5
6
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;

我们可以看到,这6个方法简直就是JmsTemplate中send()和convertAndSend()方法的镜像。receive()方法接收原始的Message,而receiveAndConvert()则会使用一个配置好的消息转换器将消息转换成领域对象。对于其中的每种方法,我们都可以指定Destination或者包含目的地名称的String值,否则,我们将会从默认目的地拉取消息。

为了实际看一下它是如何运行的,我们编写代码从tacocloud.order.queue目的地拉取一个Order。程序清单8.2展现了OrderReceiver,这个服务组件会使用JmsTemplate.receive()来接收订单数据。

程序清单8.2 从队列拉取订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package tacos.kitchen.messaging.jms;
import javax.jms.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
private MessageConverter converter;
@Autowired
public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {
this.jms = jms;
this.converter = converter;
}
public Order receiveOrder() {
Message message = jms.receive("tacocloud.order.queue");
return (Order) converter.fromMessage(message);
}
}

这里我们使用String值来指定从哪个目的地拉取订单。receive()返回的是没有经过转换的Message,但是,我们真正需要的是Message中的Order,所以接下来我们要做的事情就是使用被注入的消息转换器对消息进行转换。消息中的type ID属性将会指导转换器将消息转换成Order,但它返回的是Object,所以在最终返回之前要进行类型转换。

如果我们要探查消息的属性和消息头信息,那么接收原始的Message对象可能会非常有用。但是,通常来讲,我们只需要消息的载荷。将载荷转换成领域对象是一个需要两步操作的过程,而且它需要将消息转换器注入组件中。如果你只关心载荷,那么使用receiveAndConvert()会更简单一些。程序清单8.3展现了如何使用receiveAndConvert()替换receive()来重新实现JmsOrderReceiver。

程序清单8.3 接收已经转换好的Order对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package tacos.kitchen.messaging.jms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
@Autowired
public JmsOrderReceiver(JmsTemplate jms) {
this.jms = jms;
}
public Order receiveOrder() {
return (Order) jms.receiveAndConvert("tacocloud.order.queue");
}
}

这个新版本的JmsOrderReceiver的receieveOrder()被简化到了只有一行代码。同时,我们不再需要将MessageConverter注入进来了,因为所有的操作都会在receiveAndConvert()方法的幕后完成。

在继续学习下面的内容之前,我们考虑一下如何在Taco Cloud厨房应用中使用receiveOrder()。Taco Cloud厨房中的厨师可能会按下一个按钮或者采取其他操作,表明他已经准备好开始做taco了。此时,receiveOrder()会被调用,然后对receive()或receiveAndConvert()的调用将会阻塞。在订单消息抵达之前,这里不会发生任何事情。一旦订单抵达,对receiveOrder()的调用将会把该订单信息返回,订单的详细信息会展现给厨师,这样他就可以开始工作了。对于拉取模式来说,这似乎是一种很自然的选择。

接下来,我们看一下如何通过声明JMS监听器来实现推送模式。

声明消息监听器

拉取模式需要显式调用receive()或receiveAndConvert()才能接收消息,与之不同,消息监听器是一个被动的组件,在消息抵达之前,它会一直处于空闲状态。

要创建能够对JMS消息做出反应的消息监听器,我们需要为组件中的某个方法添加@JmsListener注解。程序清单8.4展示了一个新的OrderListener组件,它会被动地监听消息,而不是主动请求消息。

程序清单8.4 监听订单消息的OrderListener组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package tacos.kitchen.messaging.jms.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@JmsListener(destination = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}

receiveOrder()方法使用了JmsListener注解,这样它就会监听tacocloud.order.queue目的地的消息。该方法不需要使用JmsTemplate,也不会被你的应用显式调用。相反,Spring中的框架代码会等待消息抵达指定的目的地,当消息到达时,receiveOrder()方法会被自动调用,并且会将消息中的Order载荷作为参数。

从很多方面来讲,@JmsListener注解都和Spring MVC中的请求映射注解很相似,比如@GetMapping或@PostMapping。在Spring MVC中,带有请求映射注解的方法会响应指定路径的请求。与之类似,使用@JmsListener注解的方法会对到达指定目的地的消息做出响应。

消息监听器通常被视为最佳选择,因为它不会导致阻塞,并且能够快速处理多个消息。但是在Taco Cloud中,它可能并不是最佳的方案。在系统中,厨师是一个重要的瓶颈,他们可能无法在接收到订单的时候立即准备taco。当新订单出现在屏幕上的时候,可能上一个订单刚刚完成一半。厨房用户界面需要在订单到达时进行缓冲,避免给厨房人员带来过重的负载。

这并不是说消息监听器不好。相反,如果消息能够快速得到处理,那么它们是非常适合的方案。但是,如果消息处理器需要根据自己的时间请求更多消息,那么JmsTemplate提供的拉取模式会更加合适。

JMS是由标准Java规范定义的,所以它得到了众多代理实现的支持,在Java中实现消息时它是常见的可选方案。但是JMS有一些缺点,尤其是作为Java规范,它只能用在Java应用中。RabbitMQ和Kafka等较新的消息传递方案克服了这些缺点,可以用于JVM之外的其他语言和平台。让我们把JMS放在一边,看看如何使用RabbitMQ实现taco订单的消息传递。

第8章 发送异步消息

本章内容:
  • 异步化的消息
  • 使用JMS、RabbitMQ和Kafka发送消息
  • 从代理拉取消息
  • 监听消息

在星期五下午4点55分,再有几分钟你就可以开始休假了。现在,你的时间只够开车到机场赶上航班。但是在你离开办公室之前,你需要确定老板和同事了解你目前的工作进展,这样他们就可以在星期一继续完成你留下的工作。不过,你的一些同事已经提前离开过周末去了,而你的老板正在忙于开会。你该怎么办呢?

要想既传达到你的工作状态又能赶上飞机,最有效的方式就是发送一封电子邮件给你的老板和同事,详述工作进展并且承诺给他们寄张明信片。你不知道他们在哪里,也不知道他们什么时候才能真正读到你的邮件。但是你知道,他们终究会回到他们的办公桌旁,阅读你的邮件。而此时,你可能正在赶往机场的路上。

同步通信,比如我们在前面所看到的REST,有它自己的适用场景。不过,对于开发者而言,这种通信方式并不是应用程序之间进行交互的唯一方式。异步消息是一个应用程序向另一个应用程序间接发送消息的一种方式,这种间接性能够为进行通信的应用带来更松散的耦合和更大的可伸缩性。

在本章中,我们将会使用异步消息从Taco Cloud Web站点发送订单信息到一个单独的应用中,这个应用是Taco Cloud的厨房,在这里会烹制taco。我们将会考虑Spring提供的3种异步消息方案:Java消息服务(Java Message Service,JMS)、RabbitMQ和高级消息队列协议(Advanced Message QueueingProtocol)、Apache Kafka。除了基础的发送和接收消息之外,我们还会看一下Spring对消息驱动POJO的支持,它是一种与EJB的消息驱动Bean(Message-Driven Bean,MDB)类似的消息接收方式。

7.2 使用Traverson导航REST API

Traverson来源于Spring Data HATEOAS项目,是Spring应用中开箱即用的消费超媒体API的解决方案。这个基于Java的库灵感来源于同名的JavaScript库。

你可能已经发现Traverson的名字有点类似于“traverse on”,这种叫法其实可以很好地描述它的用法。在本节中,我们将会以遍历API关系名的方式来消费API。

要使用Traverson,首先我们要用API的基础URI来实例化一个Traverson对象:

1
2
Traverson traverson = new Traverson(
URI.create("http://localhost:8080/api"), MediaTypes.HAL_JSON);

在这里,我将Traverson指向了Taco Cloud的基础URL(本地运行)。这是需要给Traverson指定的唯一URL。从这里开始,我们就可以根据链接的关系名来遍历API。我们同时还指定了API将会生成JSON格式的响应,并且具有HAL风格的超链接,这样Traverson就能知道怎样解析传入的资源数据了。与RestTemplate类似,你可以选择在使用Traverson对象之前实例化它,也可以将其声明为一个bean并在需要的地方注入进来。

有了Traverson对象之后,我们就可以通过以下链接使用API。例如,假设我们想检索所有配料的列表。从6.3.1小节可以知道,配料链接有一个href属性,它会链接到配料资源。我们需要跟踪这个链接:

1
2
3
4
5
6
7
ParameterizedTypeReference<Resources<Ingredient>> ingredientType =
new ParameterizedTypeReference<Resources<Ingredient>>() {};
Resources<Ingredient> ingredientRes =
traverson
.follow("ingredients")
.toObject(ingredientType);
Collection<Ingredient> ingredients = ingredientRes.getContent();

通过调用Traverson对象的follow()方法,我们就可以导航至链接关系名为ingredients的资源。现在,客户端已经导航至ingredients,我们需要通过调用toObject()来提取资源的内容。

我们需要告诉toObject()方法要将数据读入到哪种对象之中。考虑到我们需要以Resources<Ingredient>对象的形式来读入,而且Java类型擦除使得为泛型提供类型信息变得非常困难,所以这可能会有些棘手。但是,创建ParameterizedTypeReference能够帮助我们解决这个问题。

打个比方,假设这不是REST API,而是Web站点上的主页;这也不是REST客户端代码,而是我们正在浏览器中查看主页。在页面中,我们看到了一个关于Ingredients的链接,点击进入该链接。在进入下一个页面的时候,我们需要读取该页面,类似于Traverson将内容提取为Resources<Ingredient>对象。

现在,我们考虑一个更有趣的场景——假设我们想要获取最新创建的taco。从主资源开始,我们可以按照如下方式导航至最近的taco资源:

1
2
3
4
5
6
7
8
ParameterizedTypeReference<Resources<Taco>> tacoType =
new ParameterizedTypeReference<Resources<Taco>>() {};
Resources<Taco> tacoRes =
traverson
.follow("tacos")
.follow("recents")
.toObject(tacoType);
Collection<Taco> tacos = tacoRes.getContent();

在这里,我们跟踪tacos链接,然后从这里开始,跟踪recents链接。通过这种方式,我们得到了感兴趣的资源,所以基于对应的ParameterizedTypeReference调用toObject()方法,我们就得到了想要的内容。我们可以通过列出关系名称列表的形式来简化“.follow()”方法:

1
2
3
4
Resources<Taco> tacoRes =
traverson
.follow("tacos", "recents")
.toObject(tacoType);

正如我们所看到的,Traverson能够很容易地导航HATEOAS的API并消费其资源。但是,它并没有提供通过这些API写入或删除资源的方法。相反,RestTemplate能够写入和删除资源,但是在导航API方面支持得并不太好。

当你既要导航API又要更新或删除资源时,你需要组合使用RestTemplate和Traverson。Traverson仍然可以导航至创建新资源的链接。然后,可以将这个链接传递给RestTemplate来执行POST、PUT、DELETE或任何其他需要的HTTP请求。

例如,我们想要为Taco Cloud菜单添加一个新的Ingredient,如下的addIngredient ()方法将Traverson和RestTemplate组合起来,向API提交了一个新的Ingredient:

1
2
3
4
5
6
7
8
9
private Ingredient addIngredient(Ingredient ingredient) {
String ingredientsUrl = traverson
.follow("ingredients")
.asLink()
.getHref();
return rest.postForObject(ingredientsUrl,
ingredient,
Ingredient.class);
}

在跟踪完Ingredients之后,我们通过调用asLink()方法得到链接本身。基于该链接,我们调用getHref()得到链接的URL。有了URL之后,我们就具备了使用RestTemplate调用postForObject()并创建新配料所需的一切。