9.2 Spring Integration功能概览
9.2 Spring Integration功能概览
Spring Integration涵盖了大量的集成场景。如果想将所有的内容放到一章中,就像把一头大象装到信封里一样不现实。在这里,我只会向你展示SpringIntegration这头大象的照片,而不是对Spring Integration进行面面俱到的讲解,目的就是让你能够了解它是如何运行的。随后,我们将会再创建一个集成流,为Taco Cloud应用添加新的功能。
集成流是由一个或多个如下介绍的组件组成的。在继续编写代码之前,我们先看一下这些组件在集成流中所扮演的角色。
- 通道(channel):将消息从一个元素传递到另一个元素。
- 过滤器(filter):基于某些断言,条件化地允许某些消息通过流。
- 转换器(transformer):改变消息的值和/或将消息载荷从一种类型转换成另一种类型。
- 路由器(router):将消息路由至一个或多个通道,通常会基于消息的头信息进行路由。
- 切分器(splitter):将传入的消息切割成两个或更多的消息,然后将每个消息发送至不同的通道;
- 聚合器(aggregator):切分器的反向操作,将来自不同通道的多个消息合并成一个消息。
- 服务激活器(service activator):将消息传递给某个Java方法来进行处理,并将返回值发布到输出通道上。
- 通道适配器(channel adapter):将通道连接到某些外部系统或传输方式,可以接受输入,也可以写出到外部系统。
- 网关(gateway):通过接口将数据传递到集成流中。
在定义文件写入集成流的时候,我们已经看到过其中的一些组件了。FileWriterGateway是一个网关,通过它,应用可以提交要写入文件的文本。我们还定义了一个转换器,将给定的文本转换成大写的形式,随后,我们定义一个出站通道适配器,它执行将文本写入文件的任务。这个流有两个通道,textInChannel和fileWriterChannel,它们将应用中的其他组件连接在了一起。现在,我们按照承诺快速看一下这些集成流组件。
9.2.1 消息通道
消息通道是消息穿行集成通道的一种方式(参见图9.2)。它们是连接SpringIntegration其他组成部分的管道。
Spring Integration提供了多种通道实现。
- PublishSubscribeChannel:发送到PublishSubscribeChannel的消息会被传递到一个或多个消费者中。如果有多个消费者,它们都会接收到消息。
- QueueChannel:发送到QueueChannel的消息会存储到一个队列中,会按照先进先出(First In First Out,FIFO)的方式被拉取出来。如果有多个消费者,只有其中的一个消费者会接收到消息。
- PriorityChannel:与QueueChannel类似,但它不是FIFO的方式,而是会基于消息的priority头信息被消费者拉取出来。
- RendezvousChannel:与QueueChannel类似,但是发送者会一直阻塞通道,直到消费者接收到消息为止,实际上会同步发送者和消费者。
- DirectChannel:与PublishSubscribeChannel类似,但是消息只会发送至一个消费者,它会在与发送者相同的线程中调用消费者。这种方式允许事务跨通道。
- ExecutorChannel:类似于DirectChannel,但是消息分发是通过TaskExecutor实现的,这样会在与发送者独立的线程中执行。这种通道类型不支持事务跨通道。
- FluxMessageChannel:反应式流的发布者消息通道,基于Reactor项目的Flux。(我们将会在第10章讨论反应式流、Reactor和Flux。)
在Java配置和Java DSL中,输入通道都是自动创建的,默认使用的是DirectChannel。但是,如果想要使用不同的通道实现,就需要将通道声明为bean并在集成流中引用它。例如,要声明PublishSubscribeChannel,我们需要声明如下的@Bean
方法:
1 |
|
随后,我们可以在集成流定义中根据通道名称引用它。例如,这个通道要被一个服务激活器bean所消费,那么我们可以在@ServiceActivator
注解的inputChannel属性中引用它:
1 |
或者,使用Java DSL配置风格,我们可以通过调用channel()来引用它:
1 |
|
很重要的一点需要注意,如果使用QueueChannel,消费者必须配置一个poller。例如,声明一个QueueChannel bean:
1 |
|
那么,我们需要确保消费者配置成轮询该通道的消息。如果是服务激活器,@ServiceActivator
注解可能会如下所示:
1 |
在本例中,服务激活器每秒(或者说每1000毫秒)都会轮询名为orderChannel的通道。
9.2.2 过滤器
过滤器放置于集成管道的中间,它能够根据断言允许或拒绝消息进入流程的下一步(见图9.3)。
例如,假设消息包含了整型的值,它们要通过名为numberChannel的通道进行发布,但是我们只想让偶数进入名为evenNumberChannel的通道。在这种情况下,我们可以使用@Filter
注解定义一个过滤器:
1 |
|
作为替代方案,如果使用Java DSL配置风格来定义集成流,那么我们可以按照如下的方式来调用filter():
1 |
|
在本例中,我们使用lambda表达式来实现过滤器。实际上,filter()方法接受GenericSelector作为参数。这意味着,如果我们的过滤器过于复杂,不适合放到一个简单的lambda表达式中,那么我们可以实现GenericSelector接口作为替代方案。
9.2.3 转换器
转换器会对消息执行一些操作,一般会形成不同的消息,有可能还会产生不同的载荷类型(见图9.4)。转换过程可能非常简单,比如执行数字的数学运算或者操作String值。转换也可能会很复杂,比如根据代表ISBN的String值查询并返回对应图书的详细信息。
例如,假设整型值会通过名为numberChannel的通道进行发布,我们希望将这些数字转换成它们的罗马数字形式,以String类型来表示。在这种情况下,我们可以声明一个GenericTransformer类型的bean并为其添加@Transformer
注解,如下所示:
1 |
|
@Transformer
注解可以将这个bean声明为转换器bean,它会从名为numberChannel的通道接收Integer值,然后使用静态方法toRoman()进行转换。(toRoman()是静态方法,定义在名为RomanNumbers的类中,这里通过方法引用来使用它。)转换后的结果会发布到名为romanNumberChannel的通道中。
在Java DSL配置风格中,调用transform()会更加简单,我们只需将对toRoman()的方法引用传递进来就可以了:
1 |
|
尽管在这两个转换器代码中我们都使用了方法引用,但是转换器也可以使用lambda表达式来进行声明。或者,如果转换器足够复杂,需要使用一个单独的类,那么我们可以将其作为一个bean注入流定义中,并将引用传递给transform()方法:
1 |
|
在这里,我们声明了RomanNumberTransformer类型的bean,它本身是SpringIntegration Transformer或GenericTransformer接口的实现。这个bean注入到了transformerFlow()方法中,并且在定义集成流的时候传递给了transform()方法。
9.2.4 路由器
路由器能够基于某个路由断言,实现集成流的分支,从而将消息发送至不同的通道上(见图9.5)。
例如,假设我们有一个名为numberChannel的通道,它会传输整型值。我们想要将带有偶数的消息定向到名为evenChannel的通道,将带有奇数的消息定向到名为oddChannel的通道。要在集成流中创建这样一个路由器,我们可以声明一个AbstractMessageRouter类型的bean,并为其添加@Router
注解:
1 |
|
这里定义的AbstractMessageRouter接收名为numberChannel的输入通道的消息,以匿名内部类的形式检查消息的载荷:如果是偶数,就返回名为evenChannel的通道(在路由器bean之后同样以bean的方式进行了声明);否则,通道载荷中的数字必然是奇数,将会返回名为oddChannel的通道(同样以bean方法的方式进行了声明)。
在Java DSL风格中,路由器是通过在流定义中调用route()方法来声明的,如下所示:
1 |
|
尽管我们依然可以定义AbstractMessageRouter并将其传递到route(),但是在这个样例中使用了lambda来确定消息载荷是偶数还是奇数。如果是偶数,就会返回值为EVEN的字符串;如果是奇数,就会返回值为ODD的字符串。然后这些值会用来确定该使用哪个子映射处理消息。
9.2.5 切分器
在集成流中,有时候将一个消息切分为多个消息独立处理可能会非常有用。切分器将会负责切分并处理这些消息,如图9.6所示。
在很多场景中,切分器都非常有用,但是有两种基本的使用场景我们可以使用切分器。
- 消息载荷中包含了相同类型条目的一个列表,我们希望将它们作为单独的消息载荷来进行处理。例如,消息中携带了一个商品列表,它们可以切分为多个消息,每个消息的载荷分别对应一件商品。
- 消息载荷所携带的信息尽管有所关联,但是可以拆分为两个或更多不同类型的消息。例如,一个购买订单可能会包含投递信息、账单以及商品项的信息。投递细节可以通过某个子流来处理,账单由另一个子流来处理,而商品项由其他的子流来处理。在这种情况下,切分器后面通常会紧跟着一个路由器,它根据消息的载荷类型进行路由,确保数据都由正确的子流来进行处理。
在我们将消息载荷切分为两个或更多不同类型的消息时,通常定义一个POJO就足够了,它提取传入消息不同的组成部分,并以元素集合的形式返回。
例如,假设我们想要将带有购买订单的消息切分为两个消息:其中一个携带账单信息,另一个携带商品项的信息。如下的OrderSplitter就可以完成该任务:
1 | public class OrderSplitter { |
接下来,我们声明一个OrderSplitter bean,并通过@Splitter
注解将其作为集成流的一部分:
1 |
|
在这里,购买订单会到达名为poChannel的通道,它们会被OrderSplitter切分。然后,所返回集合中的每个条目都会作为集成流中独立的消息,它们会发布到名为splitOrderChannel的通道上。此时,我们可以在流中声明一个PayloadTypeRouter,将账单信息和商品项分别路由至它们自己的子流上:
1 |
|
顾名思义,PayloadTypeRouter会根据消息的载荷将它们路由至不同的通道。按照这里的配置,载荷为BillingInfo类型的消息将会被路由至名为billingInfoChannel的通道,供后续进行处理。对于商品项来说,它们会放到一个java.util.List集合中,因此,我们将List类型的载荷映射到名为lineItemsChannel的通道中。
按照目前的状况,流将会被切分成两个子流:一个BillingInfo对象的流,另外一个则是List<LineItem>
的流。如果我们想要进一步进行拆分,比如不想处理LineItems的列表,而是想要分别处理每个LineItem,又该怎么办呢?要将商品列表拆分为多个消息,其中每个消息包含一个条目,我们只需要编写一个方法(而不是一个bean)即可。这个方法带有@Splitter
注解并且要返回LineItem的集合,如下所示:
1 |
|
当带有List<LineItem>
载荷的消息抵达名为lineItemsChannel的通道时,消息会进入lineItemSplitter()。按照切分器的规则,这个方法必须要返回切分后条目的集合。在本例中,我们已经有了LineItem的集合,所以我们直接返回这个集合就可以了。这样做的结果就是,集合中的每个LineItem都将会发布到一个消息中,这些消息会被发送到名为lineItemChannel的通道中。
如果想要使用Java DSL声明相同的splitter/router配置,那么我们可以通过调用split()和route()来实现:
1 | return IntegrationFlows |
DSL所组成的流定义相当简洁,但是可能会有点难以理解。它使用与Java配置样例相同的OrderSplitter来切分订单。在订单切分之后,它根据类型将其路由至两个独立的子流。
9.2.6 服务激活器
服务激活器接收来自输入通道的消息并将这些消息发送至一个MessageHandler的实现,如图9.7所示。
Spring Integration提供了多个开箱即用的MessageHandler(PayloadTypeRouter甚至就是MessageHandler的一个实现),但是我们通常会需要提供一些自定义的实现作为服务激活器。作为样例,如下的代码展现了如何声明MessageHandler bean并将其配置为服务激活器:
1 |
|
这个bean使用了@ServiceActivator
注解,表明它会作为一个服务激活器处理来自someChannel通道的消息。对于MessageHandler本身来讲,它是通过一个lambda表达式实现的。这是一个简单的MessageHandler,当得到消息之后,它会将消息的载荷打印至标准输出流。
另外,我们还可以声明一个服务激活器,让它在返回新载荷之前处理输入消息中的数据。在这种情况下,bean应该是一个GenericHandler,而不是MessageHandler:
1 |
|
在本例中,服务激活器是一个GenericHandler,会接收载荷为Order类型的消息。当订单抵达时,我们会通过一个repository将它保存起来,并返回保存之后的Order,这个Order随后被发送至名为completeChannel的输出通道。
你可能已经注意到了,GenericHandler不仅能够得到载荷,还能得到消息头(虽然我们这个样例根本没有用到这些头信息)。我们还可以在Java DSL配置风格中使用服务激活器,此时,只需要将MessageHandler或GenericHandler传递到流定义的handle()方法中即可:
1 | public IntegrationFlow someFlow() { |
在本例中,MessageHandler会得到一个lambda表达式,但是我们也可以为其提供一个方法引用,甚至是实现了MessageHandler接口的类实例。如果我们为其提供lambda表达式或方法引用,就需要记住它们均接受消息作为其参数。
类似的,如果服务激活器不想成为流的终点,那么handle()还可以接受GenericHandler。如果要将前面提到的订单保存服务激活器添加进来,我们可以按照如下的形式使用Java DSL配置流:
1 | public IntegrationFlow orderFlow(OrderRepository orderRepo) { |
在使用GenericHandler的时候,lambda表达式或方法引用会接受消息负载和头信息作为参数。如果你选择使用GenericHandler作为流的终点,就需要返回null;否则,出现错误,提示没有指定输出通道。
9.2.7 网关
通过网关,应用可以提交数据到集成流中,并且能够可选地接收流的结果作为响应。网关会声明为接口,借助Spring Integration的实现,应用可以调用它来发送消息到集成流中(见图9.8)。
我们已经见过了消息网关的样例,也就是FileWriterGateway。FileWriterGateway是一个单向的网关,它有一个接受String类型的方法,该方法会将文本写入到文件中,返回void。编写双向的网关同样简单。在编写网关接口的时候,只需确保方法要返回某个值,以便于推送到集成流中。
作为样例,假设有个网关,它面对的是一个简单的集成流,这个流会接受一个String并将给定的String转换成全大写的形式。这个网关接口大致如下所示:
1 | package com.example.demo; |
最让人开心的是,这个接口不需要实现。Spring Integration会自动在运行时提供一个实现,它会通过特定的通道发送和接收消息。
当uppercase()被调用的时候,给定的String会发布到集成流中,进入名为inChannel的通道。不管流是如何定义的或者它都干了些什么,当数据进入名为outChannel的通道时,它将会从uppercase()方法返回。
对于我们这个转换成大写格式的集成流来说,它是一个非常简单的流,只需要一个将String转换成大写格式的步骤就可以。它可以通过Java DSL配置声明如下:
1 |
|
按照定义,这个流会从进入inChannel通道的数据开始。消息载荷会由转换器进行处理,也就是执行大写操作(通过lambda表达式来定义)。结果形成的消息会被发送到名为outChannel的通道,也就是我们在UpperCaseGateway中声明的答复通道。
9.2.8 通道适配器
通道适配器代表了集成流的入口和出口。数据通过入站通道适配器(inboundchannel adapter)进入一个集成流,通过出站通道适配器离开一个集成流,如图9.9所示。
根据要将什么数据源引入集成流,入站通道适配器可以有很多种形式。例如,我们可以声明一个入站通道适配器,将来自AtomicInteger不断递增的数字引入到流中。如果使用Java配置,如下所示:
1 |
|
这个@Bean
方法通过@InboundChannelAdapter
注解声明了一个入站通道适配器,根据注入的AtomicInteger每隔一秒(也就是1000毫秒)就提交一个数字给名为numberChannel的通道。
在使用Java配置时,我们可以通过@InboundChannelAdapter
注解声明入站通道适配器,而在使用Java DSL定义集成流的时候,我们需要使用from()方法完成同样的事情。如下的流定义程序清单展现了类似的入站通道适配器,它是使用Java DSL定义的:
1 |
|
通常,通道适配器是由Spring Integration的众多端点模块提供的。假设,我们需要一个入站通道适配器,它会监控一个特定的目录并将写入该目录的文件以消息的形式提交到file-channel通道中。如下的Java配置使用来自Spring Integration的file端点模块实现该功能:
1 |
|
如果使用Java DSL编写同等功能的入站通道适配器,那么我们可以使用Files类的inboundAdapter()。出站通道适配器是集成流的终点,会将最终的消息传递给应用或其他外部系统:
1 |
|
我们通常会将服务激活器实现为消息处理器,让它作为出站通道适配器,在数据需要传递给应用本身的时候更是如此。我们已经讨论过服务激活器,所以没有必要重复讨论了。
但是,需要注意,Spring Integration端点模块为多个通用场景提供了消息处理器。在程序清单9.3中,我们已经见到过一个这种出站通道适配器的样例,即FileWriting MessageHandler。提到Spring Integration端点模块,我们看一下都有哪些直接可用的集成端点模块。
9.2.9 端点模块
Spring Integration允许我们创建自己的通道适配器,这一点非常好,但是更棒的是Spring Integration提供了二十多个包含通道适配器(同时包括入站和出站的适配器)的端点模块(见表9.1),用于和各种常见的外部系统实现集成。
表9.1 Spring Integration提供的二十多个端点模块
从表9.1我们可以清楚地看到,Spring Integration提供了用途广泛的一组组件,它们能够满足非常多的集成需求。大多数应用程序所使用的只是Spring Integration所提供功能的九牛一毛。需要的话,我们最好还是要知道Spring Integration已经提供了相关的功能。
另外,我们不可能在一章的篇幅中介绍表9.1中的所有通道适配器。我们已经看到了如何使用文件系统模块写入文件的样例,随后将会看到如何使用Email模块来读取Email。
对于每个端点模块的通道适配器,我们可以在Java配置中将其声明为bean,也可以在Java DSL配置中通过静态方法的方式引用。我建议你探索一下自己感兴趣的其他端点模块。你会发现它们在使用方式上是非常一致的。现在,我们关注一下Email端点模块,看一下如何将它用到Taco Cloud应用中。