11.2 定义函数式请求处理器

Spring MVC基于注解的编程模型从Spring 2.5就存在了,而且这种模型非常流行,但是它也有一些缺点。

首先,所有基于注解的编程方式都会存在注解该做什么以及注解如何做之间的割裂。注解本身定义了该做什么,而具体如何去做则是在框架代码的其他部分定义的。如果想要进行自定义或扩展,编程模型就会变得很复杂,因为这样的变更需要修改注解之外的代码。除此之外,这种代码的调试也是比较麻烦的,因为我们无法在注解上设置断点。

其次,随着Spring变得越来越流行,很多熟悉其他语言和框架的Spring新手会觉得基于注解的Spring MVC(和WebFlux)与他们之前掌握的知识有很大的差异。作为注解式WebFlux的一种替代方案,Spring 5引入了一个新的函数式编程模型,用来定义反应式API。

这个新的编程模型使用起来更像一个库,而不是一个框架,能够让我们在不使用注解的情况下将请求映射到处理器代码中。使用Spring的函数式编程模型编写API会涉及4个主要的类型:

  • RequestPredicate:声明要处理的请求类型。
  • RouterFunction:声明如何将请求路由到处理器代码中。
  • ServerRequest:代表一个HTTP请求,包括对请求头和请求体的访问。
  • ServerResponse:代表一个HTTP响应,包括响应头和响应体信息。

下面是一个将所有类型组合在一起的Hello World样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package demo;
import static org.springframework.web.
reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.
reactive.function.server.RouterFunctions.route;
import static org.springframework.web.
reactive.function.server.ServerResponse.ok;
import static reactor.core.publisher.Mono.just;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
@Configuration
public class RouterFunctionConfig {
@Bean
public RouterFunction<?> helloRouterFunction() {
return route(GET("/hello"),
request -> ok().body(just("Hello World!"), String.class));
}
}

我们需要注意的第一件事情就是,在这里静态导入了一些辅助类,可以使用它们来创建前文所述的函数式类型。我们还以静态方式导入了Mono,从而能够让剩余的代码更易于阅读和理解。

在这个@Configuration类中,我们有一个类型为RouterFunction<?>的@Bean方法。按照前文所述,RouterFunction能够声明一个或多个RequestPredicate对象和处理与之匹配的请求的函数之间的映射关系。

RouterFunctions的route()方法接受两个参数:RequestPredicate以及处理与之匹配的请求的函数。在本例中,RequestPredicates的GET()方法声明一个RequestPredicate,后者会匹配针对“/hello”的HTTP GET请求。

至于处理器函数,它写成了lambda表达式的形式,当然它也可以使用方法引用。尽管这里没有显式声明,但是处理器lambda表达式会接受一个ServerRequest作为参数。它通过ServerResponse的ok()方法和BodyBuilder的body()方法返回了一个ServerResponse。BodyBuilder对象是由ok()所返回的。这样的话,就会创建出状态码为HTTP 200 (OK)并且响应体载荷为“Hello World!”的响应。

按照这种编写形式,helloRouterFunction()方法所声明的RouterFunction只能处理一种类型的请求。如果想要处理不同类型的请求,那么我们没有必要编写另外一个@Bean(当然你也可以这样做),仅需调用andRoute()来声明另一个RequestPredicate到函数的映射。例如,为“/bye”的GET请求添加一个处理器:

1
2
3
4
5
6
7
@Bean
public RouterFunction<?> helloRouterFunction() {
return route(GET("/hello"),
request -> ok().body(just("Hello World!"), String.class))
.andRoute(GET("/bye"),
request -> ok().body(just("See ya!"), String.class));
}

Hello World这种级别的样例只能用来简单体验一些新东西。接下来,我们进一步看一下如何使用Spring的函数式Web编程模型处理接近真实场景的请求。

为了阐述如何在真实应用中使用函数式编程模型,我们会使用函数式风格重新实现DesignTacoController的功能。如下的配置类是DesignTacoController的函数式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class RouterFunctionConfig {
@Autowired
private TacoRepository tacoRepo;
@Bean
public RouterFunction<?> routerFunction() {
return route(GET("/design/taco"), this::recents)
.andRoute(POST("/design"), this::postTaco);
}
public Mono<ServerResponse> recents(ServerRequest request) {
return ServerResponse.ok()
.body(tacoRepo.findAll().take(12), Taco.class);
}
public Mono<ServerResponse> postTaco(ServerRequest request) {
Mono<Taco> taco = request.bodyToMono(Taco.class);
Mono<Taco> savedTaco = tacoRepo.save(taco);
return ServerResponse
.created(URI.create(
"http://localhost:8080/design/taco/" +
savedTaco.getId()))
.body(savedTaco, Taco.class);
}
}

我们可以看到,routerFunction()方法声明了一个RouterFunction<?> bean,这与Hello World样例类似。但是,它们之间的差异在于要处理什么类型的请求以及如何处理。在本例中,我们创建的RouterFunction处理针对“/design/taco”的GET请求以及针对“/design”的POST请求。

更明显的差异在于,路由是由方法引用处理的。如果RouterFunction背后的行为相对简单和简洁,那么lambda是很不错的选择。在很多场景下,最好将功能抽取到一个单独的方法中(甚至抽取到一个独立类的方法中),以便于保持代码的可读性。

就我们的需求而言,针对“/design/taco”的GET请求将由recents()方法来处理。它使用注入的TacoRepository得到一个Flux<Taco>,然后从中得到12个条目。针对“/design”的POST请求会由postTaco()方法来处理,它会从传入的ServerRequest中抽取Mono<Taco>。postTaco()使用TacoRepository方法来进行保存,随后使用save()返回的Mono<Taco>作为响应。

11.3 测试反应式控制器

在反应式控制器的测试方面,Spring 5并没有置我们于不顾。实际上,Spring 5引入了WebTestClient。这是一个新的测试工具类,让Spring WebFlux编写的反应式控制器的测试变得非常容易。为了了解如何使用WebTestClient编写测试,我们首先使用它测试11.1.2小节中编写的DesignTacoController中的recentTacos()方法。

11.3.1 测试GET请求

对于recentTacos()方法,我们想断言如果针对“/design/recent”路径发送HTTPGET请求,那么将会得到JSON载荷的响应并且taco的数量不会超过12个。程序清单11.1中的测试类将会是一个很好的起点。

程序清单11.1 使用WebTestClient测试DesignTacoController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package tacos;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import tacos.Ingredient.Type;
import tacos.data.TacoRepository;
import tacos.web.api.DesignTacoController;
public class DesignTacoControllerTest {
@Test
public void shouldReturnRecentTacos() {
Taco[] tacos = {
testTaco(1L), testTaco(2L),
testTaco(3L), testTaco(4L), ⇽--- 创建测试数据
testTaco(5L), testTaco(6L),
testTaco(7L), testTaco(8L),
testTaco(9L), testTaco(10L),
testTaco(11L), testTaco(12L),
testTaco(13L), testTaco(14L),
testTaco(15L), testTaco(16L)};
Flux<Taco> tacoFlux = Flux.just(tacos);
TacoRepository tacoRepo = Mockito.mock(TacoRepository.class);
when(tacoRepo.findAll()).thenReturn(tacoFlux); ⇽--- Mock TacoRepository
WebTestClient testClient = WebTestClient.bindToController(
new DesignTacoController(tacoRepo))
.build(); ⇽--- 创建WebTestClient
testClient.get().uri("/design/recent")
.exchange() ⇽--- 请求最近的taco
.expectStatus().isOk() ⇽--- 检验预期的响应
.expectBody()
.jsonPath("$").isArray()
.jsonPath("$").isNotEmpty()
.jsonPath("$[^0].id").isEqualTo(tacos[^0].getId().toString())
.jsonPath("$[^0].name").isEqualTo("Taco 1").jsonPath("$[^1].id")
.isEqualTo(tacos[^1].getId().toString()).jsonPath("$[^1].name")
.isEqualTo("Taco 2").jsonPath("$[^11].id")
.isEqualTo(tacos[^11].getId().toString())
...
.jsonPath("$[^11].name").isEqualTo("Taco 12").jsonPath("$[^12]")
.doesNotExist();
.jsonPath("$[^12]").doesNotExist();
}
...
}

shouldReturnRecentTacos()方法做的第一件事情就是以Flux<Taco>的形式创建了一些测试数据。这个Flux随后作为mock TacoRepository的findAll()方法的返回值。

Flux发布的Taco对象是由一个名为testTaco()的方法创建的。这个方法会根据一个数字生成一个Taco,其ID和名称都是基于该数字生成的。testTaco()方法的实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
private Taco testTaco(Long number) {
Taco taco = new Taco();
taco.setId(UUID.randomUUID());
taco.setName("Taco " + number);
List<IngredientUDT> ingredients = new ArrayList<>();
ingredients.add(
new IngredientUDT("INGA", "Ingredient A", Type.WRAP));
ingredients.add(
new IngredientUDT("INGB", "Ingredient B", Type.PROTEIN));
taco.setIngredients(ingredients);
return taco;
}

简单起见,所有的测试taco都具有两种相同的配料,但是它们的ID和名称是根据传入的数字确定的。

另外,回到shouldReturnRecentTacos()方法,我们实例化了一个DesignTacoController并将mock TacoRepository注入到了构造器中。这个控制器传递给了WebTestClient. bindToController()方法,以便于生成WebTestClient实例。

所有的环境搭建工作完成后,我们可以使用WebTestClient提交GET请求至“/design/recent”并校验响应符合我们的预期。对get().uri(“/design/recent”)的调用描述了我们想要发送的请求。随后,调用exchange()会提交请求,这个请求将会由WebTestClient绑定的控制器(DesignTacoController)来进行处理。

最后,我们可以确认响应符合预期。通过调用expectStatus(),我们可以断言响应具有HTTP 200 (OK)状态码。然后,我们多次调用jsonPath()断言响应体中的JSON包含它应该具有的值。最后一个断言检查第12个元素(在基于零开始计数的数组中)是否真的不存在,以此判断结果不超过12个元素。

如果返回的JSON比较复杂,比如有大量的数据或多层嵌套的数据,那么使用jsonPath()会变得非常烦琐。实际上,为了节省空间,在程序清单11.1中,我省略了很多对jsonPath()的调用。在这种情况下,使用jsonPath()会变得非常枯燥烦琐,WebTestClient提供了json()方法。这个方法可以传入一个String参数(包含响应要对比的JSON)。

举例来说,假设我们在名为recent-tacos.json的文件中创建了完整的响应JSON并将它放到了类路径的“/tacos”路径下,那么我们可以按照如下的方式重写WebTestClient断言:

1
2
3
4
5
6
7
8
9
10
ClassPathResource recentsResource =
new ClassPathResource("/tacos/recent-tacos.json");
String recentsJson = StreamUtils.copyToString(
recentsResource.getInputStream(), Charset.defaultCharset());
testClient.get().uri("/design/recent")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBody()
.json(recentsJson);

因为json()接受的是一个String,所以我们必须先将类路径资源加载为String。借助Spring中StreamUtils的copyToString()方法,这一点很容易实现。copyToString()方法返回的String就是我们的请求所预期的响应JSON内容。将其传递给json()方法,我们就能确保控制器会生成正确的输出。

WebTestClient提供的另外一种可选方案就是它允许将响应体与一个值的列表进行对比。expectBodyList()方法会接受一个代表列表中元素类型的Class或ParameterizedTypeReference,并且会返回ListBodySpec对象,随后可以基于该对象进行断言。借助expectBodyList(),我们可以重写测试类,使用创建mockTacoRepository时的测试数据的子集来进行验证:

1
2
3
4
5
6
testClient.get().uri("/design/recent")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBodyList(Taco.class)
.contains(Arrays.copyOf(tacos, 12));

在这里,我们断言响应体包含了在测试方法开头处所创建的原始Taco数组的前12个元素。

11.3.2 测试POST请求

WebTestClient不仅能对控制器发送GET请求,还能用来测试各种HTTP方法,包括GET、POST、PUT、PATCH、DELETE和HEAD方法。表11.1将HTTP方法与WebTestClient的方法进行了映射。

表11.1 WebTestClient能够测试针对Spring WebFlux控制器的各种请求

epub_29101559_89

作为测试Spring WebFlux控制器其他HTTP请求方法的样例,我们看一下针对DesignTacoController的另一个测试。这一次,我们会编写一个对taco创建API的测试,也就是提交POST请求到“/design”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void shouldSaveATaco() {
TacoRepository tacoRepo = Mockito.mock(
TacoRepository.class); ⇽--- 搭建测试数据
Mono<Taco> unsavedTacoMono = Mono.just(testTaco(null));
Taco savedTaco = testTaco(null);
savedTaco.setId(1L);
Mono<Taco> savedTacoMono = Mono.just(savedTaco);
when(tacoRepo.save(any())).thenReturn(savedTacoMono); ⇽--- mock TacoRepository
WebTestClient testClient = WebTestClient.bindToController( ⇽--- 创建WebTestClient
new DesignTacoController(tacoRepo)).build();
testClient.post() ⇽--- POST taco
.uri("/design")
.contentType(MediaType.APPLICATION_JSON)
.body(unsavedTacoMono, Taco.class)
.exchange()
.expectStatus().isCreated() ⇽--- 校验响应
.expectBody(Taco.class)
.isEqualTo(savedTaco);
}

与上面的测试方法类似,shouldSaveATaco()首先会创建一些测试数据和mockTacoRepository,并且创建了一个WebTestClient并绑定到控制器上。随后,它使用WebTestClient提交POST请求到“/design”,并且将请求体声明为application/json类型,请求载荷为Taco的JSON序列化形式,放到未保存的Mono中。在执行exchange()之后,测试断言响应状态为HTTP 201 (CREATED)并且响应体中的载荷与已保存的Taco对象相同。

11.3.3 使用实时服务器进行测试

到目前为止,我们所编写的测试都依赖于Spring WebFlux的mock实现,所以并不需要真正的服务器。但是,我们可能需要在服务器(如Netty或Tomcat)环境中测试WebFlux控制器,也许还会需要repository或其他的依赖。换句话说,我们有可能要编写集成测试。

要编写WebTestClient的集成测试,与其他的Spring Boot集成测试类似,我们首先要为测试类添加@RunWith@SpringBootTest

1
2
3
4
5
6
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment=WebEnvironment.RANDOM_PORT)
public class DesignTacoControllerWebTest {
@Autowired
private WebTestClient testClient;
}

通过将webEnvironment属性设置为WebEnvironment.RANDOM_PORT,我们要求Spring启动一个运行时服务器并监听任意选择的端口
^1

你可能也注意到,我们将WebTestClient自动织入到了测试类中。这不仅意味着我们不用在测试的方法中创建它了,而且在发送请求的时候也不需要指定完整的URL了。这是因为WebTestClient能够知道测试服务器在哪个端口上运行。现在,我们可以使用自动织入的WebTestClient将shouldReturnRecentTacos()重写为集成测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void shouldReturnRecentTacos() throws IOException {
testClient.get().uri("/design/recent")
.accept(MediaType.APPLICATION_JSON).exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath("$[?(@.id == 'TACO1')].name")
.isEqualTo("Carnivore")
.jsonPath("$[?(@.id == 'TACO2')].name")
.isEqualTo("Bovine Bounty")
.jsonPath("$[?(@.id == 'TACO3')].name")
.isEqualTo("Veg-Out");
}

我们发现,这个新版本的shouldReturnRecentTacos()代码要少得多。我们不再需要创建WebTestClient,因为可以使用自动织入的实例。另外,也不需要mockTacoRepository,因为Spring将会创建DesignTacoController实例并将一个真正的TacoRepository注入进来。在新版本的测试方法中,我们使用JSONPath表达式来校验数据库提供的值。

WebTestClient在测试的时候非常有用,此时我们会消费WebFlux控制器所暴露的API。但是,如果我们的应用本身要消费某个API,又该怎样处理呢?接下来,我们将注意力转向Spring反应式Web的客户端,看一下WebClient如何通过REST客户端来处理反应式类型,如Mono和Flux。

9.2 Spring Integration功能概览

Spring Integration涵盖了大量的集成场景。如果想将所有的内容放到一章中,就像把一头大象装到信封里一样不现实。在这里,我只会向你展示SpringIntegration这头大象的照片,而不是对Spring Integration进行面面俱到的讲解,目的就是让你能够了解它是如何运行的。随后,我们将会再创建一个集成流,为Taco Cloud应用添加新的功能。

集成流是由一个或多个如下介绍的组件组成的。在继续编写代码之前,我们先看一下这些组件在集成流中所扮演的角色。

  • 通道(channel):将消息从一个元素传递到另一个元素。
  • 过滤器(filter):基于某些断言,条件化地允许某些消息通过流。
  • 转换器(transformer):改变消息的值和/或将消息载荷从一种类型转换成另一种类型。
  • 路由器(router):将消息路由至一个或多个通道,通常会基于消息的头信息进行路由。
  • 切分器(splitter):将传入的消息切割成两个或更多的消息,然后将每个消息发送至不同的通道;
  • 聚合器(aggregator):切分器的反向操作,将来自不同通道的多个消息合并成一个消息。
  • 服务激活器(service activator):将消息传递给某个Java方法来进行处理,并将返回值发布到输出通道上。
  • 通道适配器(channel adapter):将通道连接到某些外部系统或传输方式,可以接受输入,也可以写出到外部系统。
  • 网关(gateway):通过接口将数据传递到集成流中。

在定义文件写入集成流的时候,我们已经看到过其中的一些组件了。FileWriterGateway是一个网关,通过它,应用可以提交要写入文件的文本。我们还定义了一个转换器,将给定的文本转换成大写的形式,随后,我们定义一个出站通道适配器,它执行将文本写入文件的任务。这个流有两个通道,textInChannel和fileWriterChannel,它们将应用中的其他组件连接在了一起。现在,我们按照承诺快速看一下这些集成流组件。

9.2.1 消息通道

消息通道是消息穿行集成通道的一种方式(参见图9.2)。它们是连接SpringIntegration其他组成部分的管道。

image-20211015211632575

图9.2 消息通道是集成流中数据在其他组件之间流动的管道

Spring Integration提供了多种通道实现。

  • PublishSubscribeChannel:发送到PublishSubscribeChannel的消息会被传递到一个或多个消费者中。如果有多个消费者,它们都会接收到消息。
  • QueueChannel:发送到QueueChannel的消息会存储到一个队列中,会按照先进先出(First In First Out,FIFO)的方式被拉取出来。如果有多个消费者,只有其中的一个消费者会接收到消息。
  • PriorityChannel:与QueueChannel类似,但它不是FIFO的方式,而是会基于消息的priority头信息被消费者拉取出来。
  • RendezvousChannel:与QueueChannel类似,但是发送者会一直阻塞通道,直到消费者接收到消息为止,实际上会同步发送者和消费者。
  • DirectChannel:与PublishSubscribeChannel类似,但是消息只会发送至一个消费者,它会在与发送者相同的线程中调用消费者。这种方式允许事务跨通道。
  • ExecutorChannel:类似于DirectChannel,但是消息分发是通过TaskExecutor实现的,这样会在与发送者独立的线程中执行。这种通道类型不支持事务跨通道。
  • FluxMessageChannel:反应式流的发布者消息通道,基于Reactor项目的Flux。(我们将会在第10章讨论反应式流、Reactor和Flux。)

在Java配置和Java DSL中,输入通道都是自动创建的,默认使用的是DirectChannel。但是,如果想要使用不同的通道实现,就需要将通道声明为bean并在集成流中引用它。例如,要声明PublishSubscribeChannel,我们需要声明如下的@Bean方法:

1
2
3
4
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}

随后,我们可以在集成流定义中根据通道名称引用它。例如,这个通道要被一个服务激活器bean所消费,那么我们可以在@ServiceActivator注解的inputChannel属性中引用它:

1
@ServiceActivator(inputChannel="orderChannel")

或者,使用Java DSL配置风格,我们可以通过调用channel()来引用它:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow orderFlow() {
return IntegrationFlows
...
.channel("orderChannel")
...
.get();
}

很重要的一点需要注意,如果使用QueueChannel,消费者必须配置一个poller。例如,声明一个QueueChannel bean:

1
2
3
4
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}

那么,我们需要确保消费者配置成轮询该通道的消息。如果是服务激活器,@ServiceActivator注解可能会如下所示:

1
2
@ServiceActivator(inputChannel="orderChannel",
poller=@Poller(fixedRate="1000"))

在本例中,服务激活器每秒(或者说每1000毫秒)都会轮询名为orderChannel的通道。

9.2.2 过滤器

过滤器放置于集成管道的中间,它能够根据断言允许或拒绝消息进入流程的下一步(见图9.3)。

image-20211015211913017

图9.3 过滤器会基于某个断言允许或拒绝消息在管道中进行处理

例如,假设消息包含了整型的值,它们要通过名为numberChannel的通道进行发布,但是我们只想让偶数进入名为evenNumberChannel的通道。在这种情况下,我们可以使用@Filter注解定义一个过滤器:

1
2
3
4
5
@Filter(inputChannel="numberChannel",
outputChannel="evenNumberChannel")
public boolean evenNumberFilter(Integer number) {
return number % 2 == 0;
}

作为替代方案,如果使用Java DSL配置风格来定义集成流,那么我们可以按照如下的方式来调用filter():

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
...
.<Integer>filter((p) -> p % 2 == 0)
...
.get();
}

在本例中,我们使用lambda表达式来实现过滤器。实际上,filter()方法接受GenericSelector作为参数。这意味着,如果我们的过滤器过于复杂,不适合放到一个简单的lambda表达式中,那么我们可以实现GenericSelector接口作为替代方案。

9.2.3 转换器

转换器会对消息执行一些操作,一般会形成不同的消息,有可能还会产生不同的载荷类型(见图9.4)。转换过程可能非常简单,比如执行数字的数学运算或者操作String值。转换也可能会很复杂,比如根据代表ISBN的String值查询并返回对应图书的详细信息。

image-20211016093244978

图9.4 转换器会改变流经集成流的消息

例如,假设整型值会通过名为numberChannel的通道进行发布,我们希望将这些数字转换成它们的罗马数字形式,以String类型来表示。在这种情况下,我们可以声明一个GenericTransformer类型的bean并为其添加@Transformer注解,如下所示:

1
2
3
4
5
6
@Bean
@Transformer(inputChannel="numberChannel",
outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
return RomanNumbers::toRoman;
}

@Transformer注解可以将这个bean声明为转换器bean,它会从名为numberChannel的通道接收Integer值,然后使用静态方法toRoman()进行转换。(toRoman()是静态方法,定义在名为RomanNumbers的类中,这里通过方法引用来使用它。)转换后的结果会发布到名为romanNumberChannel的通道中。

在Java DSL配置风格中,调用transform()会更加简单,我们只需将对toRoman()的方法引用传递进来就可以了:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows
...
.transform(RomanNumbers::toRoman)
...
.get();
}

尽管在这两个转换器代码中我们都使用了方法引用,但是转换器也可以使用lambda表达式来进行声明。或者,如果转换器足够复杂,需要使用一个单独的类,那么我们可以将其作为一个bean注入流定义中,并将引用传递给transform()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public RomanNumberTransformer romanNumberTransformer() {
return new RomanNumberTransformer();
}
@Bean
public IntegrationFlow transformerFlow(
RomanNumberTransformer romanNumberTransformer) {
return IntegrationFlows
...
.transform(romanNumberTransformer)
...
.get();
}

在这里,我们声明了RomanNumberTransformer类型的bean,它本身是SpringIntegration Transformer或GenericTransformer接口的实现。这个bean注入到了transformerFlow()方法中,并且在定义集成流的时候传递给了transform()方法。

9.2.4 路由器

路由器能够基于某个路由断言,实现集成流的分支,从而将消息发送至不同的通道上(见图9.5)。

image-20211016093412464

图9.5 路由器会根据应用于消息的断言将消息定向至不同的通道

例如,假设我们有一个名为numberChannel的通道,它会传输整型值。我们想要将带有偶数的消息定向到名为evenChannel的通道,将带有奇数的消息定向到名为oddChannel的通道。要在集成流中创建这样一个路由器,我们可以声明一个AbstractMessageRouter类型的bean,并为其添加@Router注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel>
determineTargetChannels(Message<?> message) {
Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel());
}
return Collections.singleton(oddChannel());
}
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}

这里定义的AbstractMessageRouter接收名为numberChannel的输入通道的消息,以匿名内部类的形式检查消息的载荷:如果是偶数,就返回名为evenChannel的通道(在路由器bean之后同样以bean的方式进行了声明);否则,通道载荷中的数字必然是奇数,将会返回名为oddChannel的通道(同样以bean方法的方式进行了声明)。

在Java DSL风格中,路由器是通过在流定义中调用route()方法来声明的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
...
.<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping
.subFlowMapping("EVEN", sf -> sf
.<Integer, Integer>transform(n -> n * 10)
.handle((i,h) -> { ... })
)
.subFlowMapping("ODD", sf -> sf
.transform(RomanNumbers::toRoman)
.handle((i,h) -> { ... })
)
)
.get();
}

尽管我们依然可以定义AbstractMessageRouter并将其传递到route(),但是在这个样例中使用了lambda来确定消息载荷是偶数还是奇数。如果是偶数,就会返回值为EVEN的字符串;如果是奇数,就会返回值为ODD的字符串。然后这些值会用来确定该使用哪个子映射处理消息。

9.2.5 切分器

在集成流中,有时候将一个消息切分为多个消息独立处理可能会非常有用。切分器将会负责切分并处理这些消息,如图9.6所示。

image-20211016093512699

图9.6 切分器会将消息拆分为两个或更多独立的消息,它们可以由独立的子流分别进行处理

在很多场景中,切分器都非常有用,但是有两种基本的使用场景我们可以使用切分器。

  • 消息载荷中包含了相同类型条目的一个列表,我们希望将它们作为单独的消息载荷来进行处理。例如,消息中携带了一个商品列表,它们可以切分为多个消息,每个消息的载荷分别对应一件商品。
  • 消息载荷所携带的信息尽管有所关联,但是可以拆分为两个或更多不同类型的消息。例如,一个购买订单可能会包含投递信息、账单以及商品项的信息。投递细节可以通过某个子流来处理,账单由另一个子流来处理,而商品项由其他的子流来处理。在这种情况下,切分器后面通常会紧跟着一个路由器,它根据消息的载荷类型进行路由,确保数据都由正确的子流来进行处理。

在我们将消息载荷切分为两个或更多不同类型的消息时,通常定义一个POJO就足够了,它提取传入消息不同的组成部分,并以元素集合的形式返回。

例如,假设我们想要将带有购买订单的消息切分为两个消息:其中一个携带账单信息,另一个携带商品项的信息。如下的OrderSplitter就可以完成该任务:

1
2
3
4
5
6
7
8
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}

接下来,我们声明一个OrderSplitter bean,并通过@Splitter注解将其作为集成流的一部分:

1
2
3
4
5
6
@Bean
@Splitter(inputChannel="poChannel",
outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}

在这里,购买订单会到达名为poChannel的通道,它们会被OrderSplitter切分。然后,所返回集合中的每个条目都会作为集成流中独立的消息,它们会发布到名为splitOrderChannel的通道上。此时,我们可以在流中声明一个PayloadTypeRouter,将账单信息和商品项分别路由至它们自己的子流上:

1
2
3
4
5
6
7
8
9
10
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(
List.class.getName(), "lineItemsChannel");
return router;
}

顾名思义,PayloadTypeRouter会根据消息的载荷将它们路由至不同的通道。按照这里的配置,载荷为BillingInfo类型的消息将会被路由至名为billingInfoChannel的通道,供后续进行处理。对于商品项来说,它们会放到一个java.util.List集合中,因此,我们将List类型的载荷映射到名为lineItemsChannel的通道中。

按照目前的状况,流将会被切分成两个子流:一个BillingInfo对象的流,另外一个则是List<LineItem>的流。如果我们想要进一步进行拆分,比如不想处理LineItems的列表,而是想要分别处理每个LineItem,又该怎么办呢?要将商品列表拆分为多个消息,其中每个消息包含一个条目,我们只需要编写一个方法(而不是一个bean)即可。这个方法带有@Splitter注解并且要返回LineItem的集合,如下所示:

1
2
3
4
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}

当带有List<LineItem>载荷的消息抵达名为lineItemsChannel的通道时,消息会进入lineItemSplitter()。按照切分器的规则,这个方法必须要返回切分后条目的集合。在本例中,我们已经有了LineItem的集合,所以我们直接返回这个集合就可以了。这样做的结果就是,集合中的每个LineItem都将会发布到一个消息中,这些消息会被发送到名为lineItemChannel的通道中。

如果想要使用Java DSL声明相同的splitter/router配置,那么我们可以通过调用split()和route()来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String> route(
p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo> handle((billingInfo, h) -> {
...
}))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem> handle((lineItem, h) -> {
...
}))
)
.get();

DSL所组成的流定义相当简洁,但是可能会有点难以理解。它使用与Java配置样例相同的OrderSplitter来切分订单。在订单切分之后,它根据类型将其路由至两个独立的子流。

9.2.6 服务激活器

服务激活器接收来自输入通道的消息并将这些消息发送至一个MessageHandler的实现,如图9.7所示。

image-20211016093710409

图9.7 在接收到消息时,服务激活器会通过MessageHandler调用某个服务

Spring Integration提供了多个开箱即用的MessageHandler(PayloadTypeRouter甚至就是MessageHandler的一个实现),但是我们通常会需要提供一些自定义的实现作为服务激活器。作为样例,如下的代码展现了如何声明MessageHandler bean并将其配置为服务激活器:

1
2
3
4
5
6
7
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}

这个bean使用了@ServiceActivator注解,表明它会作为一个服务激活器处理来自someChannel通道的消息。对于MessageHandler本身来讲,它是通过一个lambda表达式实现的。这是一个简单的MessageHandler,当得到消息之后,它会将消息的载荷打印至标准输出流。

另外,我们还可以声明一个服务激活器,让它在返回新载荷之前处理输入消息中的数据。在这种情况下,bean应该是一个GenericHandler,而不是MessageHandler:

1
2
3
4
5
6
7
8
9
@Bean
@ServiceActivator(inputChannel="orderChannel",
outputChannel="completeOrder")
public GenericHandler<Order> orderHandler(
OrderRepository orderRepo) {
return (payload, headers) -> {
return orderRepo.save(payload);
};
}

在本例中,服务激活器是一个GenericHandler,会接收载荷为Order类型的消息。当订单抵达时,我们会通过一个repository将它保存起来,并返回保存之后的Order,这个Order随后被发送至名为completeChannel的输出通道。

你可能已经注意到了,GenericHandler不仅能够得到载荷,还能得到消息头(虽然我们这个样例根本没有用到这些头信息)。我们还可以在Java DSL配置风格中使用服务激活器,此时,只需要将MessageHandler或GenericHandler传递到流定义的handle()方法中即可:

1
2
3
4
5
6
7
8
public IntegrationFlow someFlow() {
return IntegrationFlows
...
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}

在本例中,MessageHandler会得到一个lambda表达式,但是我们也可以为其提供一个方法引用,甚至是实现了MessageHandler接口的类实例。如果我们为其提供lambda表达式或方法引用,就需要记住它们均接受消息作为其参数。

类似的,如果服务激活器不想成为流的终点,那么handle()还可以接受GenericHandler。如果要将前面提到的订单保存服务激活器添加进来,我们可以按照如下的形式使用Java DSL配置流:

1
2
3
4
5
6
7
8
9
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
return IntegrationFlows
...
.<Order>handle((payload, headers) -> {
return orderRepo.save(payload);
})
...
.get();
}

在使用GenericHandler的时候,lambda表达式或方法引用会接受消息负载和头信息作为参数。如果你选择使用GenericHandler作为流的终点,就需要返回null;否则,出现错误,提示没有指定输出通道。

9.2.7 网关

通过网关,应用可以提交数据到集成流中,并且能够可选地接收流的结果作为响应。网关会声明为接口,借助Spring Integration的实现,应用可以调用它来发送消息到集成流中(见图9.8)。

image-20211016093824376

图9.8 服务网关是接口

我们已经见过了消息网关的样例,也就是FileWriterGateway。FileWriterGateway是一个单向的网关,它有一个接受String类型的方法,该方法会将文本写入到文件中,返回void。编写双向的网关同样简单。在编写网关接口的时候,只需确保方法要返回某个值,以便于推送到集成流中。

作为样例,假设有个网关,它面对的是一个简单的集成流,这个流会接受一个String并将给定的String转换成全大写的形式。这个网关接口大致如下所示:

1
2
3
4
5
6
7
8
9
package com.example.demo;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel="inChannel",
defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
String uppercase(String in);
}

最让人开心的是,这个接口不需要实现。Spring Integration会自动在运行时提供一个实现,它会通过特定的通道发送和接收消息。

当uppercase()被调用的时候,给定的String会发布到集成流中,进入名为inChannel的通道。不管流是如何定义的或者它都干了些什么,当数据进入名为outChannel的通道时,它将会从uppercase()方法返回。

对于我们这个转换成大写格式的集成流来说,它是一个非常简单的流,只需要一个将String转换成大写格式的步骤就可以。它可以通过Java DSL配置声明如下:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows
.from("inChannel")
.<String, String> transform(s -> s.toUpperCase())
.channel("outChannel")
.get();
}

按照定义,这个流会从进入inChannel通道的数据开始。消息载荷会由转换器进行处理,也就是执行大写操作(通过lambda表达式来定义)。结果形成的消息会被发送到名为outChannel的通道,也就是我们在UpperCaseGateway中声明的答复通道。

9.2.8 通道适配器

通道适配器代表了集成流的入口和出口。数据通过入站通道适配器(inboundchannel adapter)进入一个集成流,通过出站通道适配器离开一个集成流,如图9.9所示。

image-20211016093919555

图9.9 通道适配器是集成流的入口和出口

根据要将什么数据源引入集成流,入站通道适配器可以有很多种形式。例如,我们可以声明一个入站通道适配器,将来自AtomicInteger不断递增的数字引入到流中。如果使用Java配置,如下所示:

1
2
3
4
5
6
7
8
@Bean
@InboundChannelAdapter(
poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}

这个@Bean方法通过@InboundChannelAdapter注解声明了一个入站通道适配器,根据注入的AtomicInteger每隔一秒(也就是1000毫秒)就提交一个数字给名为numberChannel的通道。

在使用Java配置时,我们可以通过@InboundChannelAdapter注解声明入站通道适配器,而在使用Java DSL定义集成流的时候,我们需要使用from()方法完成同样的事情。如下的流定义程序清单展现了类似的入站通道适配器,它是使用Java DSL定义的:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement",
c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}

通常,通道适配器是由Spring Integration的众多端点模块提供的。假设,我们需要一个入站通道适配器,它会监控一个特定的目录并将写入该目录的文件以消息的形式提交到file-channel通道中。如下的Java配置使用来自Spring Integration的file端点模块实现该功能:

1
2
3
4
5
6
7
8
9
@Bean
@InboundChannelAdapter(channel="file-channel",
poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}

如果使用Java DSL编写同等功能的入站通道适配器,那么我们可以使用Files类的inboundAdapter()。出站通道适配器是集成流的终点,会将最终的消息传递给应用或其他外部系统:

1
2
3
4
5
6
7
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INPUT_DIR))
.patternFilter(FILE_PATTERN))
.get();
}

我们通常会将服务激活器实现为消息处理器,让它作为出站通道适配器,在数据需要传递给应用本身的时候更是如此。我们已经讨论过服务激活器,所以没有必要重复讨论了。

但是,需要注意,Spring Integration端点模块为多个通用场景提供了消息处理器。在程序清单9.3中,我们已经见到过一个这种出站通道适配器的样例,即FileWriting MessageHandler。提到Spring Integration端点模块,我们看一下都有哪些直接可用的集成端点模块。

9.2.9 端点模块

Spring Integration允许我们创建自己的通道适配器,这一点非常好,但是更棒的是Spring Integration提供了二十多个包含通道适配器(同时包括入站和出站的适配器)的端点模块(见表9.1),用于和各种常见的外部系统实现集成。

表9.1 Spring Integration提供的二十多个端点模块

epub_29101559_61

从表9.1我们可以清楚地看到,Spring Integration提供了用途广泛的一组组件,它们能够满足非常多的集成需求。大多数应用程序所使用的只是Spring Integration所提供功能的九牛一毛。需要的话,我们最好还是要知道Spring Integration已经提供了相关的功能。

另外,我们不可能在一章的篇幅中介绍表9.1中的所有通道适配器。我们已经看到了如何使用文件系统模块写入文件的样例,随后将会看到如何使用Email模块来读取Email。

对于每个端点模块的通道适配器,我们可以在Java配置中将其声明为bean,也可以在Java DSL配置中通过静态方法的方式引用。我建议你探索一下自己感兴趣的其他端点模块。你会发现它们在使用方式上是非常一致的。现在,我们关注一下Email端点模块,看一下如何将它用到Taco Cloud应用中。

9.1 声明一个简单的集成流

通常来讲,在使用Spring Integration创建集成流时,是通过声明一个应用程序能够接收或发送哪些数据到应用程序之外的资源来实现的。应用程序可能集成的资源之一就是文件系统。因此,Spring Integration的很多组件都有读入和写入文件的通道适配器(channel adapter)。

为了熟悉Spring Integration,我们将会创建一个集成流,这个流会写入数据到文件系统中。首先,我们需要添加Spring Integration到项目的构建文件中。对于Maven构建来讲,必要的依赖如下所示:

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>

第一项依赖是Spring Integration的Spring Boot starter。不管我们与哪种流进行交互,对于Spring Integration流的开发来讲,这个依赖都是必需的。与所有的Spring Boot starter一样,在Initializr表单中,这个依赖也可以通过复选框进行选择。

第二项依赖是Spring Integration的文件端点模块。这个模块是与外部系统集成的二十多个模块之一。我们会在9.2.9小节中更加详细地讨论端点模块。对于现在来讲,我们只需要知道文件端点模块提供了将文件从文件系统导入集成流和/或将流中的数据写入文件系统的能力即可。

接下来,我们需要为应用创建一种方法,让它能够发送数据到集成流中,这样它才能写入到文件中。为了实现这一点,我们需要创建一个网关接口,这样的网关接口如程序清单9.1所示。

程序清单9.1 将方法调用转换成消息的消息网关接口

1
2
3
4
5
6
7
8
9
10
package sia5;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel="textInChannel") ⇽--- 声明消息网关
public interface FileWriterGateway {
void writeToFile(
@Header(FileHeaders.FILENAME) String filename, ⇽--- 写入文件
String data);
}

尽管这只是一个很简单的Java接口,但是FileWriterGateway有很多东西需要介绍。我们首先看到的是,它使用了@MessagingGateway注解。这个注解会告诉Spring Integration要在运行时生成该接口的实现,这与Spring Data在运行时生成repository接口的实现非常类似。其他地方的代码在希望写入文件的时候将会调用它。

@MessagingGateway的defaultRequestChannel属性表明接口方法调用时所返回的消息要发送至给定的消息通道(message channel)。在本例中,我们声明调用writeToFile()所形成的消息应该发送至名为textInChannel的通道中。

对于writeToFile()方法来说,它以String类型的形式接受一个文件名,另外一个String包含了要写入文件中的文本。关于这个方法的签名,还需要注意filename参数上带有@Header。在本例中,@Header注解表明传递给filename的值应该包含在消息头信息中(通过FileHeaders.FILENAME声明,它将会被解析成file_name),而不是放到消息载荷(payload)中。

现在,我们已经有了消息网关,接下来就需要配置集成流了。尽管我们往构建文件中添加的Spring Integration starter依赖能够启用Spring Integration的自动配置功能,但是满足应用需求的流定义则需要我们自行编写额外的配置。在声明集成流方面,我们有3种配置方案可供选择:

  • XML配置;
  • Java配置;
  • 使用DSL的Java配置。

我们会依次看一下Spring Integration的这3种配置风格,首先从较为老式的XML配置开始。

9.1.1 使用XML定义集成流

尽管在本书中,我尽量避免使用XML配置,但是Spring Integration有使用XML定义集成流的漫长历史。所以,我认为至少展现一个XML定义集成流的样例还是很有价值的。程序清单9.2展现了如何使用XML配置示例集成流。

程序清单9.2 使用Spring XML配置定义集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-
file.xsd">
<int:channel id="textInChannel" /> ⇽--- 声明textInChannel
<int:transformer id="upperCase"
input-channel="textInChannel"
output-channel="fileWriterChannel"
expression="payload.toUpperCase()" /> ⇽--- 转换文本
<int:channel id="fileWriterChannel" /> ⇽--- 声明fileWriterChannel
<int-file:outbound-channel-adapter id="writer"
channel="fileWriterChannel"
directory="/tmp/sia5/files"
mode="APPEND"
append-new-line="true" /> ⇽--- 将文本写入到文件中
</beans>

我们将程序清单9.2中的XML拆分讲解一下。

  • 我们首先配置了一个名为textInChannel的通道。你会发现,它就是FileWriterGateway的请求通道。当FileWriterGateway的writeToFile()方法被调用的时候,结果形成的消息将会发布到这个通道上。
  • 我们还配置了一个转换器(transformer),它会从textInChannel接收消息。它使用Spring表达式语言(Spring Expression Language,SpEL)为消息载荷调用了toUpperCase()方法。进行大写操作之后的结果会发布到fileWriterChannel上。
  • 随后,我们配置了名为fileWriterChannel的通道。这个通道会作为一个导线,将转换器与出站通道适配器(outbound channel adapter)连接在一起。
  • 最后,我们使用int-file命名空间配置了出站通道适配器。这个XML命名空间是由Spring Integration的文件模块提供的,实现文件写入的功能。按照我们的配置,它从fileWriterChannel接收消息,并将消息的载荷写入到一个文件中,这个文件的名称是由消息头信息中的file_name属性指定的,而存入的目录则是由这里的directory属性指定的。如果文件已经存在,那么文件内容会以新行的方式进行追加,而不是覆盖该文件。

图9.1使用《企业集成模式》中的图形元素样式阐述了这个流。

image-20211015210914944

图9.1 文件写入器的集成流

如果想要在Spring Boot应用中使用XML配置,那么我们需要将XML作为资源导入到Spring应用中。最简单的实现方式就是在应用的某个Java配置类上使用Spring的@ImportResource注解:

1
2
3
@Configuration
@ImportResource("classpath:/filewriter-config.xml")
public class FileWriterIntegrationConfig { ... }

尽管基于XML的配置能够很好地用于Spring Integration,但是大多数的开发人员对于XML的使用越来越谨慎。(正如我所言,在本书中,我会尽量避免使用XML配置。)现在,我们抛开这些尖括号,看一下Spring Integration的Java配置风格。

9.1.2 使用Java配置集成流

大多数的现代Spring应用程序都会避免使用XML配置,而更加青睐Java配置。实际上,在Spring Boot应用中,Java配置是自动化配置功能更自然的补充形式。因此,如果我们要为Spring Boot应用添加集成流,最好使用Java来定义流程。

程序清单9.3展示了使用Java配置编写集成流的一个样例。这里的代码依然是功能相同的文件写入集成流,但是这次我们选择使用Java来实现。

程序清单9.3 使用Java配置来定义集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package sia5;
import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;
@Configuration
public class FileWriterIntegrationConfig {
@Bean
@Transformer(inputChannel="textInChannel", ⇽--- 声明转换器
outputChannel="fileWriterChannel")
public GenericTransformer<String, String> upperCaseTransformer() {
return text -> text.toUpperCase();
}
@Bean
@ServiceActivator(inputChannel="fileWriterChannel")
public FileWritingMessageHandler fileWriter() { ⇽--- 声明文件写入器
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File("/tmp/sia5/files"));
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
return handler;
}
}

在Java配置中,我们声明了两个bean:一个转换器,还有一个文件写入的消息处理器。这里的转换器是GenericTransformer。因为GenericTransformer是一个函数式接口,所以我们可以使用lambda表达式为其提供实现,这里调用了消息文本的toUpperCase()方法。转换器bean使用了@Transformer注解,这样会将其声明成集成流中的一个转换器,它接受来自textInChannel通道的消息,然后将消息写入到名为fileWriterChannel的通道中。

负责文件写入的bean则使用了@ServiceActivator注解,表明它会接受来自fileWriter Channel的消息,并且会将消息传递给FileWritingMessageHandler实例所定义的服务。FileWritingMessageHandler是一个消息处理器,它会将消息的载荷写入特定目录的一个文件中,而文件的名称是通过消息的file_name头信息指定的。与XML样例类似,FileWritingMessageHandler也配置为以新行的方式为文件追加内容。

FileWritingMessageHandler bean的一个独特之处在于它调用了setExpectReply(false)方法,通过这个方法能够告知服务激活器(serviceactivator)不要期望会有答复通道(reply channel,通过这样的通道,我们可以将某个值返回到流中的上游组件)。如果我们不调用setExpectReply(),文件写入bean的默认值是true。尽管管道的功能和预期一样,但是在日志中会看到一些错误日志,提示我们没有设置答复通道。

我们在这里没有必要显式声明通道。如果名为textInChannel和fileWriterChannel的bean不存在,那么这两个通道将会自动创建。但是,如果你想要更加精确地控制通道配置,那么可以按照如下的方式显式构建这些bean:

1
2
3
4
5
6
7
8
9
@Bean
public MessageChannel textInChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel fileWriterChannel() {
return new DirectChannel();
}

基于Java的配置方案可能会更易于阅读,也更加简洁,而且符合我在本书中倡导的纯Java配置风格。但是,使用Spring Integration的Java DSL(Domain-SpecificLanguage,领域特定语言)配置风格的话,它可以更加流畅。

9.1.3 使用Spring Integration的DSL配置

我们再次尝试一下文件写入集成流的定义。这一次,我们依然使用Java进行定义,但是会使用Spring Integration的Java DSL。这一次我们不再将流中的每个组件都声明为单独的bean,而是使用一个bean来定义整个流,如程序清单9.4所示。

程序清单9.4 为集成流的设计提供一个流畅的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package sia5;
import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;
@Configuration
public class FileWriterIntegrationConfig {
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel")) ⇽--- 入站通道
.<String, String>transform(t -> t.toUpperCase()) ⇽--- 声明转换器
.handle(Files ⇽--- 处理文件写入
.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}
}

这种配置方式在一个bean方法中定义了整个流,已经做到了尽可能简洁。Integration Flows类初始化构建器API,我们可以通过这个API来定义流。

在程序清单9.4中,我们首先从名为textInchannel的通道接收消息,然后进入一个转换器,将消息载荷转换成大写形式。通过转换器之后,消息会由出站通道适配器来进行处理。这个适配器是由Spring Integration file模块的Files类型创建的。最后,通过对get()的调用返回要构建的IntegrationFlow。简而言之,这个bean方法定义了与XML和Java配置样例相同的集成流。

你可能已经发现,与Java配置样例类似,我们不需要显式声明通道bean。我们引用了textInChannel,如果该名字对应的通道不存在,那么Spring Integration会自动创建它。不过,我们也可以显式声明bean。

对于连接转换器和出站通道适配器的通道,我们甚至没有通过名字引用它。如果需要显式配置通道,那么我们可以在流定义的时候通过调用channel()来引用它的名称:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.channel(MessageChannels.direct("fileWriterChannel"))
.handle(Files
.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}

在使用Spring Integration的Java DSL(与其他的fluent API类似)的时候,我们必须要巧妙地使用空格来保持可读性。在这里的样例中,我小心翼翼地使用缩进来保证代码块的可读性。对于更长、更复杂的流,我们甚至可以考虑将流的一部分抽取到单独的方法或子流中,以实现更好的可读性。

现在,我们已经看到了如何使用3种不同的方式来定义一个简单的流。接下来,我们回过头来看一下Spring Integration的全景。

第9章 Spring集成

本章内容:
  • 实时处理数据
  • 定义集成流
  • 使用Spring Integration的Java DSL定义
  • 与Email、文件系统和其他外部系统进行集成

在旅行时,最让我感到沮丧的一件事就是长途飞行时的互联网连接非常差,或者根本就没有。我喜欢利用空中时间完成一些工作,这本书的很多内容就是这样写出来的。但是,如果没有网络连接,恰好我又想获取某个库或者查看一个JavaDoc文档,那么我就无能为力了。因此,我现在会随身带一本书,以便于在这种场合下阅读。

就像我们需要连接互联网才能提高生产效率一样,很多应用都需要连接外部系统才能完成它们的功能。应用程序可能需要读取或发送Email、与外部API交互或者对写入数据库的数据做出反应。而且,由于数据是在外部系统读取或写入的,应用可能需要以某种方式处理这些数据,这样才能转换为应用程序自己的领域类。

在本章中,我们将会看到如何使用Spring Integration实现通用的集成模式。Spring Integration是众多集成模式的现成实现,这些模式在Gregor Hohpe和Bobby Woolf编写的《企业集成模式》(Enterprise Integration Patterns,Addison-Wesley,2003)中进行了归类。每个模式都实现为一个组件,消息会通过该组件在管道中传递数据。借助Spring配置,我们可以将这些组件组装成一个管道,数据可以通过这个管道来流动。我们从定义一个简单的集成流开始,这个流包含了Spring Integration的众多特性和特点。

8.4 小结

  • 异步消息在要通信的应用程序之间提供了一个中间层,这样能够实现更松散的耦合和更强的可扩展性。
  • Spring支持使用JMS、RabbitMQ或Apache Kafka实现异步消息。
  • 应用程序可以使用基于模板的客户端(JmsTemplate、RabbitTemplate或KafkaTemplate)向消息代理发送消息。
  • 接收消息的应用程序可以借助相同的基于模板的客户端以拉取模式消费消息。
  • 通过使用消息监听器注解(@JmsListener@RabbitListener@KafkaListener),消息也可以推送至消费者的bean方法中。

8.3 使用Kafka的消息

Apache Kafka是我们在本章研究的最新的消息方案。乍看上去,Kafka是与ActiveMQ、Artemis或Rabbit类似的消息代理,其实Kafka有一些独特的技巧。

Kafka设计为集群运行,从而能够实现很强的可扩展性。通过将主题在集群的所有实例上进行分区(partition),它能够具有更强的弹性。RabbitMQ主要处理Exchange中的队列,而Kafka仅使用主题实现消息的发布/订阅。

Kafka主题会复制到集群的所有代理上。集群中的每个节点都会担任一个或多个主题的首领(leader),负责该主题的数据并将其复制到集群中的其他节点上。

更进一步来讲,每个主题可以划分为多个分区。在这种情况下,集群中的每个节点是某个主题一个或多个分区的首领,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。图8.2阐述了它是如何运行的。

image-20211015203522617

图8.2 Kafka集群是由多个代理组成的,每个代理作为主题分区的首领

关于Kafka的独特架构,我建议你阅读Dylan Scott编写的Kafka in Action(Manning,2017)。就我们来讲,我们将会关注如何通过Spring发送和接收Kafka的消息。

8.3.1 为Spring搭建支持Kafka消息的环境

为了搭建Kafka的消息环境,我们需要添加对应的依赖到构建文件中。但是,与JMS和RabbitMQ方案不同,并没有针对Kafka的Spring Boot starter。不过,不用担心,我们只需要添加一项依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

这项依赖会为我们的项目引入Kafka所需的所有内容。另外,它的出现会触发Spring Boot对Kafka的自动配置,除了其他功能之外,它会在Spring应用上下文中创建一个KafkaTemplate。我们所需要做的就是注入KafkaTemplate并使用它来发布和接收消息。

但是,在发送和接收消息之前,我们还需要注意使用Kafka时的一些属性。具体来讲,KafkaTemplate默认会使用localhost上监听9092端口的Kafka代理。在开发应用的时候,在本地启动Kafka代理没有问题,但是在投入生产的时候,我们需要配置不同的主机和端口。

spring.kafka.bootstrap-servers属性能够设置一个或多个Kafka服务器的地址,系统将会使用它来建立到Kafka集群的初始连接。例如,集群中的某个服务器运行在kafka.tacocloud.com上并监听9092端口,那么我们可以按照如下的方式在YAML中配置它的位置:

1
2
3
4
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092

但是需要注意spring.kafka.bootstrap-servers是复数形式,它能接受一个列表。所以,我们可以提供集群中的多个Kafka服务器:

1
2
3
4
5
6
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094

Kafka在项目中准备就绪之后,我们就可以发送和接收消息了。我们首先使用KafkaTemplate发送Order对象到Kafka中。

8.3.2 通过KafkaTemplate发送消息

在很多方面,KafkaTemplate与JMS和RabbitMQ对应的模板非常相似。但同时,它也有很大的差异。在发送消息的时候,这一点非常明显:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
Long timestamp, K key, V data);

我们首先可能会发现,这里没有convertAndSend()方法了。这是因为,KafkaTemplate是通过泛型类型化的,在发送消息的时候,它能够直接处理领域类型。这样的话,所有的send()方法都完成了convertAndSend()的任务。

你可能也会发现,send()和sendDefault()的参数与JMS和Rabbit有很大的差异。在使用Kafka发送消息的时候,我们可以使用如下参数设置消息该如何进行发送:

  • 消息要发送到的主题(send()方法的必选参数);
  • 主题要写入的分区(可选);
  • 记录上要发送的key(可选);
  • 时间戳(可选,默认为System.currentTimeMillis());
  • 载荷(必选)。

主题和载荷是其中最重要的两个参数。分区和key对于如何使用KafkaTemplate几乎没有影响,只是作为额外的信息提供给send()和sendDefault()。对于我们的场景来说,我们只关心将消息载荷发送到给定的主题,不用担心分区和key的问题。

对于send()方法来说,我们还可以选择发送一个ProducerRecord对象,它只是一个简单类型,将上述的参数放到了一个对象中。我们还可以发送Message对象,但是需要将领域对象转换成Message对象。相对创建和发送ProducerRecord和Message对象,使用其他的方法会更简单一些。

借助KafkaTemplate及其send()方法,我们可以编写一个基于Kafka实现的OrderMessagingService实现。程序清单8.8展现了该实现类。

程序清单8.8 使用KafkaTemplate发送订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService
implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}

在这个OrderMessagingService的新实现中,sendOrder()用到了注入的KafkaTemplate对象的send()方法,将Order发送到名为tacocloud.orders.topic的主题中。除了代码中随处可见的“Kafka”之外,它其实与为JMS和Rabbit编写的代码并没有太大的差异。

如果你想要设置默认主题,那么可以稍微简化一下sendOrder()。首先,通过spring.kafka.template.default-topic属性,我们可以将默认主题设置为tacocloud.orders.topic:

1
2
3
4
spring:
kafka:
template:
default-topic: tacocloud.orders.topic

然后,在sendOrder()方法中,我们就可以调用sendDefault()而不是send()了,这样可以不用指定主题的名称:

1
2
3
4
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}

现在,我们已经编写完发送消息的代码了。接下来,我们转移一下注意力,编写从Kafka中接收消息的代码。

8.3.3 编写Kafka监听器

除了send()和sendDefault()特有的方法签名之外,KafkaTemplate与JmsTemplate和RabbitTemplate另一个不同之处在于它没有提供接收消息的方法。这意味着在Spring中想要消费来自Kafka主题的消息只有一种办法,就是编写消息监听器。

对于Kafka消息来说,消息监听器是通过带有@KafkaListener注解的方法来实现的。@KafkaListener大致对应于@JmsListener@RabbitListener,并且使用方式也基本相同。如下的程序清单展示了为Kafka编写的基于监听器的订单接收器。

程序清单8.9 使用@KafkaListener接收订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}

handle()方法使用了@KafkaListener注解,表明当有消息抵达名为tacocloud.orders.topic的主题时,该方法将会被调用。如程序清单8.9所示,我们只将Order(载荷)对象传递给了handle()。如果你想要获取消息中其他的元数据,我们也可以接受ConsumerRecord或Message对象。

例如,如下的handle()实现接受一个ConsumerRecord,这样我们就能在日志中将消息的分区和时间戳记录下来:

1
2
3
4
5
6
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}

类似地,我们还可以用一个Message对象来替代ConsumerRecord,并且能够达到相同的目的:

1
2
3
4
5
6
7
8
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}

值得一提的是,消息载荷也可以通过ConsumerRecord.value()或Message.getPayload()获取到。这意味着我们可以通过这些对象获取Order,而不必直接将其作为handle()的参数。

8.2 使用RabbitMQ和AMQP

RabbitMQ可以说是AMQP最杰出的实现,它提供了比JMS更高级的消息路由策略。JMS消息使用目的地名称来寻址,接收者要从这里检索消息,而AMQP消息使用Exchange和routing key来寻址,这样消息就与接收者要监听的队列解耦了。Exchange和队列的关系如图8.1所示。

image-20211015123622109

图8.1 发送到RabbitMQ Exchange的消息会基于routing key和binding被路由到一个或多个队列上

当消息抵达RabbitMQ代理的时候,它会进入为其设置的Exchange上。Exchange负责将它路由到一个或多个队列中,这个过程会根据Exchange的类型、Exchange和队列之间的binding以及消息的routing key进行路由。

这方面有多个不同类型的Exchange,包括以下内容。

  • Default:这是代理创建的特殊Exchange。它会将消息路由至名字与消息routing key相同的队列。所有的队列都会自动绑定至Default Exchange。
  • Direct:如果消息的routing key与队列的binding key相同,那么消息将会路由到该队列上。
  • Topic:如果消息的routing key与队列binding key(可能会包含通配符)匹配,那么消息将会路由到一个或多个这样的队列上。
  • Fanout:不管routing key和binding key是什么,消息都将会路由到所有绑定队列上。
  • Headers:与Topic Exchange类似,只不过要基于消息的头信息进行路由,而不是routing key。
  • Dead letter:捕获所有无法投递(也就是它们无法匹配所有已定义的Exchange和队列的binding关系)的消息。

最简单的Exchange形式是Default和Fanout,因为它们大致对应了JMS中的队列和主题,但是其他的Exchange允许我们定义更加灵活的路由模式。

这里最重要的是要明白消息会通过routing key发送至Exchange,而消息要在队列中被消费。它们如何从Exchange路由至队列取决于binding的定义以及哪种方式最适合我们的使用场景。

至于使用哪种Exchange类型以及如何定义从Exchange到队列的binding,这本身与如何在Spring应用中发送和接收消息关系不大。因此,我们更加关心如何编写使用Rabbit发送和接收消息的代码。

注意:关于如何绑定队列到Exchange的更详细讨论,请参考Alvaro Videla和Jason J.W. Williams编写的RabbitMQ in Action (Manning,2012)。

8.2.1 添加RabbitMQ到Spring中

在使用Spring发送和接收RabbitMQ消息之前,我们需要将Spring Boot的AMQPstarter依赖添加到构建文件中,替换上文中Artemis或ActiveMQ starter的位置:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加AMQP starter到构建文件中之后,将会触发自动配置功能,这样会为我们创建一个AMQP连接工厂和RabbitTemplate bean,以及其他的一些支撑组件。我们要使用Spring发送和接收RabbitMQ代理的消息,只需要添加这项依赖就可以了。但是,这里还有一些我们需要掌握的有用属性,如表8.4所示。

表8.4 配置RabbitMQ代理位置和凭证的属性

epub_29101559_49

对于开发来说,我们可能会使用不需要认证的RabbitMQ代理,它运行在本地机器上并监听5672端口。在开发阶段,这些属性可能没有太大的用处,但是当应用程序投入生产环境时,它们无疑是非常有用的。

假设我们要将应用投入生产环境,RabbitMQ代理位于名为rabbit.tacocloud.com服务器上,监听5673端口并且需要认证。在这种情况下,当prod profile处于激活状态时,application.yml文件中的如下配置将会设置这些属性:

1
2
3
4
5
6
7
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: l3tm31n

在我们的应用中,RabbitMQ已经配置好了,接下来就可以使用RabbitTemplate发送消息了。

8.2.2 通过RabbitTemplate发送消息

Spring对RabbitMQ消息支持的核心是RabbitTemplate。RabbitTemplate与JmsTemplate类似,提供了一组相似的方法。但是,我们将会看到,这里有一些细微的差异,这是与RabbitMQ独特的运行方式有关的。

在使用RabbitTemplate发送消息方面,我们可以使用与JmsTemplate中同名的send()和convertAndSend()方法。但是,与JmsTemplate的方法只是将消息路由至队列或主题不同,RabbitTemplate会按照Exchanges和routing key来发送消息。下面列出关于使用RabbitTemplate发送消息比较重要的一些方法^1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 发送原始的消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message)
throws AmqpException;
// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message) throws AmqpException;
// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message, MessagePostProcessor mPP)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String exchange, String routingKey,
Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException;

我们可以看到,这些方法与JmsTemplate中对应的方法遵循了相同的模式。前3个send()方法都是发送原始的Message对象。接下来的3个convertAndSend()方法会接受一个对象,这个对象会在发送之前在幕后转换成Message。最后的3个convertAndSend()方法与前面的3个方法类似,但是它们还会接受一个MessagePostProcessor对象,这个对象能够在Message发送至代理之前对其进行操作。

这些方法与JmsTemplate对应方法的不同之处在于,它们会接受String类型的值以指定Exchange和routing key,而不像JmsTemplate那样接受目的地名称(或Destination)。没有接受Exchange参数的方法会将消息发送至DefaultExchange。与之类似,没有指定routing key的方法会把消息路由至默认的routing key。

接下来,我们看一下如何使用RabbitTemplate发送taco订单。有一种方式是使用send()方法,如程序清单8.5所示。但是,在调用send()之前,我们需要将Order对象转换为Message。RabbitTemplate能够通过getMessageConverter()方法获取消息转换器,否则,这项工作会非常乏味。

程序清单8.5 使用RabbitTemplate.send()发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package tacos.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class RabbitOrderMessagingService
implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}

有了MessageConverter之后,将Order转换成Message就是非常简单的任务了。我们必须通过MessageProperties来提供消息属性,但是如果我们不需要设置任何这样的属性,使用默认的MessageProperties实例就可以了。随后,剩下的就是调用send()了,并将Exchange和routing key(这两者都是可选的)连同消息一起传递过去。在本例中,我们只指定了routing key(tacocloud.order)和消息本身,所以会使用默认的Exchange。

这里提到了默认的Exchange,它的名字是“”(空的String),对应RabbitMQ代理自动生成的Default Exchange。与之相似,默认的routing key也是“”(它的路由将会取决于Exchange以及相应的binding)。我们可以通过设置spring.rabbitmq.template.exchange和spring.rabbitmq.template.routing-key属性重写这些默认值:

1
2
3
4
5
spring:
rabbitmq:
template:
exchange: tacocloud.orders
routing-key: kitchens.central

在本例中,所有未指明Exchange的消息都将会自动发送至名为tacocloud.orders的Exchange。如果在调用send()或convertAndSend()的时候也没有指定routingkey,那么消息将会使用值为kitchens.central的routing key。

通过消息转换器创建Message对象是非常简单的,但是使用convertAndSend()让RabbitTemplate处理所有的转换操作则会更加简单:

1
2
3
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order", order);
}

配置消息转换器

默认情况下,消息转换是通过SimpleMessageConverter来实现的,它能够将简单类型(如String)和Serializable对象转换成Message对象。但是,Spring为RabbitTemplate提供了多个消息转换器,包括下面内容。

  • Jackson2JsonMessageConverter:使用Jackson 2 JSON实现对象和JSON的相互转换。
  • MarshallingMessageConverter:使用Spring的Marshaller和Unmarshaller进行转换。
  • SerializerMessageConverter:使用Spring的Serializer和Deserializer转换String和任意种类的原生对象。
  • SimpleMessageConverter:转换String、字节数组和Serializable类型。
  • ContentTypeDelegatingMessageConverter:基于contentType头信息,将转换功能委托给另外一个MessageConverter。
  • MessagingMessageConverter:将消息转换功能委托给另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter。

如果需要变更消息转换器,就要配置一个类型为MessageConverter的bean。例如,对于基于JSON的转换,我们可以按照如下的方式来配置Jackson2JsonMessageConverter:

1
2
3
4
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

Spring Boot的自动配置功能会发现这个bean,并将它注入RabbitTemplate中,替换默认的消息转换器。

设置消息属性

与在JMS中一样,我们可能需要在发送的消息中添加一些头信息。例如,假设我们需要为所有通过Taco Cloud Web站点提交的订单添加一个X_ORDER_SOURCE信息。在自行创建Message的时候,我们可以通过MessageProperties实例设置头信息,随后将这个对象传递给消息转换器。回到程序清单8.5的sendOrder()方法,我们需要做的就是添加一行设置头信息的代码:

1
2
3
4
5
6
7
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}

但是,在使用convertAndSend()的时候,我们无法快速访问MessageProperties对象。不过,此时MessagePostProcessor可以帮助我们:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}

在这里,我们为convertAndSend()提供了MessagePostProcessor接口的匿名内部类实现。在postProcessMessage()中,我们从Message中拉取MessageProperties对象,然后通过setHeader()方法设置X_ORDER_SOURCE头信息。

现在,我们已经看到了如何通过RabbitTemplate发送消息,接下来我们转换视角看一下如何接收来自RabbitMQ队列的消息。

8.2.3 接收来自RabbitMQ的消息

我们看到使用RabbitTemplate发送消息与使用JmsTemplate发送消息并没有太大差别。实际上,接收来自RabbitMQ队列的消息也与JMS没有太大差别。与JMS类似,我们有两个可选方案。

  • 使用RabbitTemplate从队列拉取消息。
  • 将消息推送至带有@RabbitListener注解的方法。

我们首先看一下基于拉取的RabbitTemplate.receive()方法。

RabbitTemplate提供了多个从队列拉取消息的方法。其中,最有用的方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 接收消息
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// 接收由消息转换而成的对象
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws
AmqpException;
// 接收由消息转换而成的类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws
AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T>
type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
ParameterizedTypeReference<T> type)
throws AmqpException;

这些方法对应于前文所述的send()和convertAndSend()方法。send()用于发送原始的Message对象,而receive()则会接收来自队列的原始Message对象。与之类似,receiveAndConvert()接收消息并且在返回之前使用一个消息转换器将它们转换为领域对象。

但是,在方法签名上有一些明显的不同。首先,这些方法都不会接收Exchange和routing key作为参数。这是因为Exchange和routing key是用来将消息路由至队列的,在消息位于队列中之后,它们的目的地是将它们从队列中拉取下来的消费者。消费消息的应用本身并不需要关心Exchange和routing key。消费消息的应用只需要知道队列信息就可以了。

你可能会注意到,很多方法都接收一个long类型的参数,用来指定接收消息的超时时间。默认情况下,接收消息的超时时间是0毫秒。也就是说,调用receive()会立即返回,如果没有可用消息,那么返回值是null。这是与JmsTemplate的receive()的一个显著差异。通过传入一个超时时间的值,我们就可以让receive()和receiveAndConvert()阻塞,直到消息抵达或者超时时间过期。但是,即便我们设置了非零的超时时间,在代码中依然要处理null返回值的场景。

接下来,我们看一下如何实际使用它们。程序清单8.6展现了一个新的基于Rabbit的OrderReceiver实现,它使用RabbitTemplate来接收订单。

程序清单8.6 通过RabbitTemplate从RabbitMQ中拉取订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package tacos.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.orders");
return message != null
? (Order) converter.fromMessage(message)
: null;
}
}

所有的操作都发生在receiveOrder()方法中。它调用了注入的RabbitTemplate对象的receive()方法,从名为tacocloud.orders的队列中拉取一个订单。它并没有提供超时值,所以我们只能假定这个调用会马上返回,要么得到Message对象,要么返回null。如果返回Message对象,就使用RabbitTemplate中的MessageConverter将Message转换成一个Order对象。如果receive()方法返回null,我们就将null作为返回值。

根据使用场景,我们也许能够容忍一定的延迟。例如,在Taco Cloud厨房悬挂的显示器中,如果没有订单,我们可以稍等一会儿。假设在放弃之前,我们决定等待30秒钟,那么receiveOrder()方法可以修改为传递30 000毫秒的延迟给receive()方法:

1
2
3
4
5
6
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.order.queue", 30000);
return message != null
? (Order) converter.fromMessage(message)
: null;
}

如果你像我一样,觉得使用这样一个硬编码的数字会让人觉得不舒服,那么你可能会想,创建一个带有@ConfigurationProperties注解的类并使用Spring Boot的配置属性来设置超时时间,可能会是更好的方案。在一点上,我的想法和你一样,只不过Spring Boot已经为我们提供了一个这样的配置属性。如果你想要通过配置来设置超时时间,只需要在调用receive()的时候移除超时值,并将超时时间设置为spring.rabbitmq.template.receive-timeout属性即可:

1
2
3
4
spring:
rabbitmq:
template:
receive-timeout: 30000

回到receiveOrder()方法,我们必须要使用RabbitTemplate中的消息转换器才能将传入的Message对象转换成Order对象。但是,既然RabbitTemplate已经携带了消息转换器,它为什么不能自动为我们进行转换呢?这就是receiveAndConvert()方法所做的事情。借助receiveAndConvert(),我们可以将receiveOrder()重写为:

1
2
3
public Order receiveOrder() {
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
}

看起来简单了许多,对吧。唯一让我觉得麻烦的就是从Object到Order的类型转换。不过,这种转换还有另外一种实现方式。我们可以传递一个ParameterizedTypeReference引用给receiveAndConvert(),这样我们就可以直接得到Order对象了:

1
2
3
4
public Order receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}

关于这种方式是否真的比类型转换更好,依然还有争论,但是它确实能够更加确保类型安全。唯一需要注意的是,要在receiveAndConvert()中使用ParameterizedTypeReference,消息转换器必须要实现SmartMessageConverter,目前Jackson2JsonMessageConverter是唯一一个可选的内置实现。

RabbitTemplate提供的拉取模式适用于很多使用场景,但是有时候监听消息并在消息抵达的时候对其进行处理会更好一些。接下来,我们看一下如何编写消息驱动的bean,让它对RabbitMQ消息做出回应。

使用监听器处理RabbitMQ的消息

Spring提供了RabbitListener实现消息驱动的RabbitMQ bean,对应于JmsListener。为了声明当消息抵达RabbitMQ队列时某个方法应该被调用,我们可以为bean的方法添加@RabbitListener注解。

例如,程序清单8.7展现了OrderReceiver的RabbitMQ实现,它通过注解声明要监听订单消息,而不是使用RabbitTemplate进行轮询。

程序清单8.7 将方法声明为RabbitMQ的消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package tacos.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}

你肯定会发现它与程序清单8.4的代码非常相似。确实,唯一的变更就是监听器的注解,从@JmsListener换成了@RabbitListener。尽管@RabbitListener注解非常棒,但是几乎重复的代码让我无法找出@RabbitListener具有什么在@JmsListener中没有提到的功能。当消息从各自的代理推送过来的时候,这两个注解都非常适合用来编写对应的代码,其中@JmsListener对应的是JMS代理,而@RabbitListener对应的是RabbitMQ代理。

在前面的段落中,你可能会对@RabbitListener感到索然无趣,但是这并非我的本意。实际上,@RabbitListener和@JmsListener的运行方式非常相似是一件令人兴奋的事情。这意味着当我们使用RabbitMQ替代Artemis或ActiveMQ的时候,不需要学习全新的编程模型。同样令人兴奋的是,RabbitTemplate和JmsTemplate之间也具有这样的相似性。

让我们暂且保持一下这种兴奋,在本章结束之前,我们看一下Spring支持的另外一个消息方案:Apache Kafka。

8.1 使用JMS发送消息

JMS是一个Java标准,定义了使用消息代理(message broker)的通用API,最早于2001年提出。长期以来,JMS一直是实现异步消息的首选方案。在JMS出现之前,每个消息代理都有私有的API,这就使得不同代理之间的消息代码很难通用。但是借助JMS,所有遵从规范的实现都使用通用的接口,这就好像JDBC为数据库操作提供了通用的接口一样。

Spring通过基于模板的抽象为JMS功能提供了支持,这个模板就是JmsTemplate。借助JmsTemplate,我们能够非常容易地在消息生产方发送队列和主题消息,在消费消息的那一方,也能够非常容易地接收这些消息。Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息。

我们将会讨论Spring对JMS的支持,包括JmsTemplate和消息驱动POJO。但是在发送和接收消息之前,我们首先需要一个消息代理(broker),它能够在消息的生产者和消费者之间传递消息。对Spring JMS的探索就从在Spring中搭建消息代理开始吧。

8.1.1 搭建JMS环境

在使用JMS之前,我们必须要将JMS客户端添加到项目的构建文件中。借助SpringBoot,这再简单不过了。我们所需要做的就是添加一个starter依赖到构建文件中。但是,首先,我们需要决定该使用Apache ActiveMQ还是更新的ApacheActiveMQ Artemis代理。

如果选择使用Apache ActiveMQ,那么我们需要添加如下的依赖到项目的pom.xml文件中:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

如果选择使用ActiveMQ Artemis,那么starter依赖将会如下所示:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>

Artemis是重新实现的下一代ActiveMQ,使ActiveMQ变成了遗留方案。因此,Taco Cloud会选择使用Artemis。但是在编写发送和接收消息的代码方面,选择哪种方案几乎没有什么影响。唯一需要注意的重要差异就是如何配置Spring创建到代理的连接。

默认情况下,Spring会假定Artemis代理在localhost的61616端口运行。对于开发来说,这样是没有问题的,但是一旦要将应用部署到生产环境,我们就需要设置一些属性来告诉Spring该如何访问代理。最有用的属性如表8.1所示。

表8.1 配置Artemis代理的位置和凭证信息的属性

epub_29101559_45

例如,我们看看如下的application.yml文件条目,它可能会用于一个非开发的环境:

1
2
3
4
5
6
spring:
artemis:
host: artemis.tacocloud.com
port: 61617
user: tacoweb
password: l3tm31n

这会让Spring创建到Artemis代理的连接,该Artemis代理监听artemis.tacocloud.com的61617端口。它还为应用设置了与代理交互的凭证信息。凭证信息是可选的,但是对于生产环境来说,我们推荐使用它们。

如果你选择使用ActiveMQ而不是Artemis,那么需要使用ActiveMQ特定的属性,如表8.2所示。

表8.2 配置ActiveMQ代理的位置和凭证信息的属性

epub_29101559_46

需要注意,ActiveMQ代理不是分别设置代理的主机和端口,而是使用了一个名为spring.activemq.broker-url的属性来指定代理的地址。URL应该是“tcp://”协议的地址,如下面的YMAL片段所示:

1
2
3
4
5
spring:
activemq:
broker-url: tcp://activemq.tacocloud.com
user: tacoweb
password: l3tm31n

不管你是选择Artemis还是ActiveMQ,如果是在本地开发运行,那么你都不需要配置这些属性。

但是,如果你选择使用ActiveMQ,需要将spring.activemq.in-memory属性设置为false,防止Spring启动内存中运行的代理。内存中运行的代理看起来很有用,但是只有同一个应用发布和消费消息时才能使用它(这限制了它的用途)。

在继续下面的内容之前,我们要安装并启动一个Artemis(或ActiveMQ)代理,而不是选择使用嵌入式的代理。我在这里不再重复讲述安装过程,推荐你参考Artemis和ActiveMQ的文档来了解详细内容。

现在,我们已经在构建文件中添加了对JMS starter的依赖,代理也已经准备好将消息从一个应用传递到另一个应用。接下来,我们就可以开始发送消息了。

8.1.2 使用JmsTemplate发送消息

将JMS starter依赖(不管是Artemis还是ActiveMQ)添加到构建文件之后,Spring Boot会自动配置一个JmsTemplate(以及其他内容),我们可以将它注入到其他bean中,并使用它来发送和接收消息。

JmsTemplate是Spring对JMS集成支持功能的核心。与Spring其他面向模板的组件类似,JmsTemplate消除了大量传统使用JMS时所需的样板代码。如果没有JmsTemplate的话,那么我们需要编写代码来创建到消息代理的连接和会话,还要编写更多的代码来处理发送消息过程中可能出现的异常。JmsTemplate能够让我们关注真正要做的事情:发送消息。

JmsTemplate有多个用来发送消息的方法,包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 发送原始的消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator)
throws JmsException;
void send(String destinationName, MessageCreator messageCreator)
throws JmsException;
// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message)
throws JmsException;
void convertAndSend(String destinationName, Object message)
throws JmsException;
// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message,
MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message,
MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message,
MessagePostProcessor postProcessor) throws JmsException;

我们可以看到,实际上只有两个方法,也就是send()和convertAndSend(),每个方法都有重载形式以支持不同的参数。如果我们仔细观察一下,就会发现convertAndSend()的各种形式又可以分成两个子类型。在考虑这些方法作用的时候,我们对它们进行以下细分。

  • 3个send()方法都需要MessageCreator来生成Message对象。
  • 3个convertAndSend()方法会接受Object对象,并且会在幕后自动将Object转换为Message。
  • 3个convertAndSend()会自动将Object转换为Message,但同时还能接受一个MessagePostProcessor对象,用来在发送之前对Message进行自定义。

这3种方法分类都分别包含3个重载方法,它们的区别在于如何指定JMS的目的地(队列或主题)。

  • 有1个方法不接受目的地参数,它会将消息发送至默认的目的地。
  • 有1个方法接受Destination对象,该对象指定了消息的目的地。
  • 有1个方法接受String,它通过名字的形式指定了消息的目的地。

要让这些方法真正发挥作用,我们看一下程序清单8.1中的JmsOrderMessagingService,它使用了形式最简单的send()方法。

程序清单8.1 使用.send()方法将订单发送至默认的目的地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package tacos.messaging;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private JmsTemplate jms;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms) {
this.jms = jms;
}
@Override
public void sendOrder(Order order) {
jms.send(new MessageCreator() {
@Override
public Message createMessage(Session session)
throws JMSException {
return session.createObjectMessage(order);
}
}
);
}
}

sendOrder()方法调用了jms.send(),并传递了MessageCreator接口的一个匿名内部实现。这个实现类重写了createMessage()方法,从而能够通过给定的Order对象创建新的对象消息。

我不知道你的感觉如何,但是我认为程序清单8.1虽然比较直接,但还是有点啰唆。声明匿名内部类的过程使得原本很简单的方法调用变得很复杂。我们发现MessageCreator是一个函数式接口,所以我们可以通过lambda表达式简化一下sendOrder()方法:

1
2
3
4
@Override
public void sendOrder(Order order) {
jms.send(session -> session.createObjectMessage(order));
}

但是,需要注意对jms.send()的调用并没有指定目的地。为了让它能够运行,我们需要通过名为spring.jms.template.default-destination的属性声明一个默认的目的地名称。例如,我们可以在application.yml文件中这样设置该属性:

1
2
3
4
spring:
jms:
template:
default-destination: tacocloud.order.queue

在很多场景下,使用默认的目的地是最简单的可选方案。借助它,我们只声明一次目的地名称就可以了,代码只关心发送消息,而无须关心消息会发到哪里。但是,如果我们要将消息发送至默认目的地之外的其他地方,那么我们需要通过为send()设置参数来进行指定。

其中一种方式就是传递Destination对象作为send()方法的第一个参数。最简单的方式就是声明一个Destination bean并将其注入处理消息的bean中。例如,如下的bean声明Taco Cloud订单队列的Destination:

1
2
3
4
@Bean
public Destination orderQueue() {
return new ActiveMQQueue("tacocloud.order.queue");
}

很重要的一点需要注意,这里的ActiveMQQueue来源于Artemis(来自org.apache. activemq.artemis.jms.client包)。如果选择使用ActiveMQ(而不是Artemis),那么同样有一个名为ActiveMQQueue的类(来自org.apache.activemq.command包)。

在Destination bean注入到JmsOrderMessagingService之后,调用send()的时候,我们就可以使用它来指定目的地了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Destination orderQueue;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms,
Destination orderQueue) {
this.jms = jms;
this.orderQueue = orderQueue;
}
...
@Override
public void sendOrder(Order order) {
jms.send(
orderQueue,
session -> session.createObjectMessage(order));
}

通过Destination指定目的地的时候,我们其实可以设置Destination的更多属性,而不仅仅是目的地的名称。但是,在实践中,除了目的地名称我们几乎不会设置其他的属性。因此,使用名称作为send()的第一个参数会更加简单:

1
2
3
4
5
6
@Override
public void sendOrder(Order order) {
jms.send(
"tacocloud.order.queue",
session -> session.createObjectMessage(order));
}

尽管send()方法使用起来并不是特别困难(尤其是通过lambda表达式来实现MessageCreator的时候更是如此),但是它要求我们提供MessageCreator还是增加了一点复杂性。如果我们能够只指定要发送的对象(以及可能要用到的目的地),那岂不是更简单?这其实就是convertAndSend()的工作原理。接下来,我们看一下这种方式。

消息发送之前进行转换

JmsTemplates的convertAndSend()方法简化了消息的发布,因为它不再需要MessageCreator。我们将要发送的对象直接传递给convertAndSend(),这个对象在发送之前会被转换成Message。

例如,在如下重新实现的sendOrder()方法中,使用convertAndSend()将Order对象发送到给定名称的目的地:

1
2
3
4
@Override
public void sendOrder(Order order) {
jms.convertAndSend("tacocloud.order.queue", order);
}

与send()方法类似,convertAndSend()将会接受一个Destination对象或String值来确定目的地,我们也可以完全忽略目的地,将消息发送到默认目的地上。

不管使用哪种形式的convertAndSend(),传递给convertAndSend()的Order都会在发送之前转换成Message。在底层,这是通过MessageConverter的实现类来完成的,它替我们做了将对象转换成Message的脏活累活。

配置消息转换器

MessageConverter是Spring定义的接口,只有两个需要实现的方法:

1
2
3
4
5
public interface MessageConverter {
Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException;
Object fromMessage(Message message)
}

尽管这个接口实现起来很简单,但我们通常并没有必要创建自定义的实现。Spring已经提供了多个实现,如表8.3所示。

表8.3 Spring为通用的转换任务提供了多个消息转换器(所有的消息转换器都位于org.springframework.jms.support.converter包中)

epub_29101559_47

默认情况下,将会使用SimpleMessageConverter,但是它需要被发送的对象实现Serializable。这种办法可能也不错,但有时候我们可能想要使用其他的消息转换器来消除这种限制,比如MappingJackson2MessageConverter。

为了使用不同的消息转换器,我们必须要做的事情就是将选中的消息转换器实例声明为一个bean。例如,如下的bean声明将会使用MappingJackson2MessageConverter替代SimpleMessageConverter:

1
2
3
4
5
6
7
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
return messageConverter;
}

需要注意,在返回之前,我们调用了MappingJackson2MessageConverter的setTypeId PropertyName()方法。这非常重要,因为这样能够让接收者知道传入的消息要转换成什么类型。默认情况下,它将会包含要转换的类型的全限定类名。但是,这样的话会不太灵活,要求接收端也包含相同的类型,并且具有相同的全限定类名。

为了实现更大的灵活性,我们可以通过调用消息转换器的setTypeIdMappings()方法将一个合成类型名映射到实际类型上。举例来说,消息转换器bean方法的如下代码变更会将一个合成的order类型ID映射为Order类:

1
2
3
4
5
6
7
8
9
10
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
typeIdMappings.put("order", Order.class);
messageConverter.setTypeIdMappings(typeIdMappings);
return messageConverter;
}

这样的话,消息的_typeId属性中就不用发送全限定类型了,而是会发送order值。在接收端的应用中,将会配置类似的消息转换器,将order映射为它自己能够理解的订单类型。在接收端的订单可能位于不同的包中、有不同的类名,甚至可以只包含发送者Order属性的一个子集。

对消息进行后期处理

假设除了利润丰厚的Web业务之外,Taco Cloud还决定开几家实体的连锁taco店,鉴于任何一家餐馆都可能成为Web业务的运行中心,在餐馆中他们需要有一种方式告诉厨房订单的来源,这样厨房的工作人员就能为店面里的订单和Web上的订单执行不同的流程。

我们尽可以在Order对象上添加一个新的source属性,让它携带该信息:如果是在线订单,就将其设置为WEB;如果是店面里的订单,就将其设置为STORE。但是,这样我们就需要同时修改Web站点的Order类和厨房应用的Order类,但实际上只有taco的准备人员需要该信息。

有一种更简单的方案,就是为消息添加一个自定义的头部,让它携带订单的来源。如果我们使用send()方法来发送taco订单,那么通过调用Message对象的setStringProperty()方法非常容易实现:

1
2
3
4
5
jms.send("tacocloud.order.queue",
session -> {
Message message = session.createObjectMessage(order);
message.setStringProperty("X_ORDER_SOURCE", "WEB");
});

但是,这里的问题在于我们并没有使用send()。在使用convertAndSend()方法的时候,Message是在底层创建的,我们无法访问到它。

幸好,还有一种方式能够在发送之前修改底层创建的Message对象。我们可以传递一个MessagePostProcessor作为convertAndSend()的最后一个参数,借助它我们可以在Message创建之后做任何想做的事情。如下的代码依然使用了convertAndSend(),但是它能够在消息发送之前使用MessagePostProcessor添加X_ORDER_SOURCE头信息:

1
2
3
4
5
6
7
jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
});

你可能已经发现MessagePostProcessor是一个函数式接口。这意味着我们可以将匿名内部类替换为lambda,进一步简化它:

1
2
3
4
5
jms.convertAndSend("tacocloud.order.queue", order,
message -> {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});

尽管在这里我们只是将这个特殊的MessagePostProcessor用到了本次convertAndSend()方法调用中,但是你可能会发现在你的代码中会在不同的地方多次调用convertAndSend(),它们均会用到相同的MessagePostProcessor。在这种情况下,方法引用是比lambda更好的方案,它能避免不必要的代码重复:

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
Order order = buildOrder();
jms.convertAndSend("tacocloud.order.queue", order,
this::addOrderSource);
return "Convert and sent order";
}
private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}

我们已经看到了多种发送消息的方式,但是如果消息无人接收,那么只发送消息也没什么价值。接下来,我们看一下如何使用Spring和JMS接收消息。

8.1.3 接收JMS消息

在消费消息的时候,我们可以选择拉取模式(pull model)和推送模式(pushmodel),前者会在我们的代码中请求消息并一直等待直到消息到达为止,而后者则会在消息可用的时候自动在你的代码中执行。

JmsTemplate提供了多种方式来接收消息,但它们使用的都是拉取模式。我们可以调用其中的某个方法来请求消息,而线程会一直阻塞到一个消息抵达为止(这可能马上发生,也可能需要等待一会儿)。

另外,我们也可以使用推送模式,在这种情况下,我们会定义一个消息监听器,每当有消息可用时,它就会被调用。

这两种方案能够适用于各种用户场景。人们普遍觉得推送模式是更好的方案,因为它不会阻塞线程;但是,在某些场景下,如果消息抵达的速度太快,那么监听器可能会过载。而拉取模式允许消费者声明它们何时才为接收新消息做好准备。

我们将会看一下这两种方案,首先从JmsTemplate提供的拉取模式开始。

使用JmsTemplate来接收消息

JmsTemplate提供了多个对代理的拉取方法,其中包括:

1
2
3
4
5
6
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;

我们可以看到,这6个方法简直就是JmsTemplate中send()和convertAndSend()方法的镜像。receive()方法接收原始的Message,而receiveAndConvert()则会使用一个配置好的消息转换器将消息转换成领域对象。对于其中的每种方法,我们都可以指定Destination或者包含目的地名称的String值,否则,我们将会从默认目的地拉取消息。

为了实际看一下它是如何运行的,我们编写代码从tacocloud.order.queue目的地拉取一个Order。程序清单8.2展现了OrderReceiver,这个服务组件会使用JmsTemplate.receive()来接收订单数据。

程序清单8.2 从队列拉取订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package tacos.kitchen.messaging.jms;
import javax.jms.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
private MessageConverter converter;
@Autowired
public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {
this.jms = jms;
this.converter = converter;
}
public Order receiveOrder() {
Message message = jms.receive("tacocloud.order.queue");
return (Order) converter.fromMessage(message);
}
}

这里我们使用String值来指定从哪个目的地拉取订单。receive()返回的是没有经过转换的Message,但是,我们真正需要的是Message中的Order,所以接下来我们要做的事情就是使用被注入的消息转换器对消息进行转换。消息中的type ID属性将会指导转换器将消息转换成Order,但它返回的是Object,所以在最终返回之前要进行类型转换。

如果我们要探查消息的属性和消息头信息,那么接收原始的Message对象可能会非常有用。但是,通常来讲,我们只需要消息的载荷。将载荷转换成领域对象是一个需要两步操作的过程,而且它需要将消息转换器注入组件中。如果你只关心载荷,那么使用receiveAndConvert()会更简单一些。程序清单8.3展现了如何使用receiveAndConvert()替换receive()来重新实现JmsOrderReceiver。

程序清单8.3 接收已经转换好的Order对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package tacos.kitchen.messaging.jms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
@Autowired
public JmsOrderReceiver(JmsTemplate jms) {
this.jms = jms;
}
public Order receiveOrder() {
return (Order) jms.receiveAndConvert("tacocloud.order.queue");
}
}

这个新版本的JmsOrderReceiver的receieveOrder()被简化到了只有一行代码。同时,我们不再需要将MessageConverter注入进来了,因为所有的操作都会在receiveAndConvert()方法的幕后完成。

在继续学习下面的内容之前,我们考虑一下如何在Taco Cloud厨房应用中使用receiveOrder()。Taco Cloud厨房中的厨师可能会按下一个按钮或者采取其他操作,表明他已经准备好开始做taco了。此时,receiveOrder()会被调用,然后对receive()或receiveAndConvert()的调用将会阻塞。在订单消息抵达之前,这里不会发生任何事情。一旦订单抵达,对receiveOrder()的调用将会把该订单信息返回,订单的详细信息会展现给厨师,这样他就可以开始工作了。对于拉取模式来说,这似乎是一种很自然的选择。

接下来,我们看一下如何通过声明JMS监听器来实现推送模式。

声明消息监听器

拉取模式需要显式调用receive()或receiveAndConvert()才能接收消息,与之不同,消息监听器是一个被动的组件,在消息抵达之前,它会一直处于空闲状态。

要创建能够对JMS消息做出反应的消息监听器,我们需要为组件中的某个方法添加@JmsListener注解。程序清单8.4展示了一个新的OrderListener组件,它会被动地监听消息,而不是主动请求消息。

程序清单8.4 监听订单消息的OrderListener组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package tacos.kitchen.messaging.jms.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@JmsListener(destination = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}

receiveOrder()方法使用了JmsListener注解,这样它就会监听tacocloud.order.queue目的地的消息。该方法不需要使用JmsTemplate,也不会被你的应用显式调用。相反,Spring中的框架代码会等待消息抵达指定的目的地,当消息到达时,receiveOrder()方法会被自动调用,并且会将消息中的Order载荷作为参数。

从很多方面来讲,@JmsListener注解都和Spring MVC中的请求映射注解很相似,比如@GetMapping或@PostMapping。在Spring MVC中,带有请求映射注解的方法会响应指定路径的请求。与之类似,使用@JmsListener注解的方法会对到达指定目的地的消息做出响应。

消息监听器通常被视为最佳选择,因为它不会导致阻塞,并且能够快速处理多个消息。但是在Taco Cloud中,它可能并不是最佳的方案。在系统中,厨师是一个重要的瓶颈,他们可能无法在接收到订单的时候立即准备taco。当新订单出现在屏幕上的时候,可能上一个订单刚刚完成一半。厨房用户界面需要在订单到达时进行缓冲,避免给厨房人员带来过重的负载。

这并不是说消息监听器不好。相反,如果消息能够快速得到处理,那么它们是非常适合的方案。但是,如果消息处理器需要根据自己的时间请求更多消息,那么JmsTemplate提供的拉取模式会更加合适。

JMS是由标准Java规范定义的,所以它得到了众多代理实现的支持,在Java中实现消息时它是常见的可选方案。但是JMS有一些缺点,尤其是作为Java规范,它只能用在Java应用中。RabbitMQ和Kafka等较新的消息传递方案克服了这些缺点,可以用于JVM之外的其他语言和平台。让我们把JMS放在一边,看看如何使用RabbitMQ实现taco订单的消息传递。

第8章 发送异步消息

本章内容:
  • 异步化的消息
  • 使用JMS、RabbitMQ和Kafka发送消息
  • 从代理拉取消息
  • 监听消息

在星期五下午4点55分,再有几分钟你就可以开始休假了。现在,你的时间只够开车到机场赶上航班。但是在你离开办公室之前,你需要确定老板和同事了解你目前的工作进展,这样他们就可以在星期一继续完成你留下的工作。不过,你的一些同事已经提前离开过周末去了,而你的老板正在忙于开会。你该怎么办呢?

要想既传达到你的工作状态又能赶上飞机,最有效的方式就是发送一封电子邮件给你的老板和同事,详述工作进展并且承诺给他们寄张明信片。你不知道他们在哪里,也不知道他们什么时候才能真正读到你的邮件。但是你知道,他们终究会回到他们的办公桌旁,阅读你的邮件。而此时,你可能正在赶往机场的路上。

同步通信,比如我们在前面所看到的REST,有它自己的适用场景。不过,对于开发者而言,这种通信方式并不是应用程序之间进行交互的唯一方式。异步消息是一个应用程序向另一个应用程序间接发送消息的一种方式,这种间接性能够为进行通信的应用带来更松散的耦合和更大的可伸缩性。

在本章中,我们将会使用异步消息从Taco Cloud Web站点发送订单信息到一个单独的应用中,这个应用是Taco Cloud的厨房,在这里会烹制taco。我们将会考虑Spring提供的3种异步消息方案:Java消息服务(Java Message Service,JMS)、RabbitMQ和高级消息队列协议(Advanced Message QueueingProtocol)、Apache Kafka。除了基础的发送和接收消息之外,我们还会看一下Spring对消息驱动POJO的支持,它是一种与EJB的消息驱动Bean(Message-Driven Bean,MDB)类似的消息接收方式。