9.1 声明一个简单的集成流

9.1 声明一个简单的集成流

通常来讲,在使用Spring Integration创建集成流时,是通过声明一个应用程序能够接收或发送哪些数据到应用程序之外的资源来实现的。应用程序可能集成的资源之一就是文件系统。因此,Spring Integration的很多组件都有读入和写入文件的通道适配器(channel adapter)。

为了熟悉Spring Integration,我们将会创建一个集成流,这个流会写入数据到文件系统中。首先,我们需要添加Spring Integration到项目的构建文件中。对于Maven构建来讲,必要的依赖如下所示:

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>

第一项依赖是Spring Integration的Spring Boot starter。不管我们与哪种流进行交互,对于Spring Integration流的开发来讲,这个依赖都是必需的。与所有的Spring Boot starter一样,在Initializr表单中,这个依赖也可以通过复选框进行选择。

第二项依赖是Spring Integration的文件端点模块。这个模块是与外部系统集成的二十多个模块之一。我们会在9.2.9小节中更加详细地讨论端点模块。对于现在来讲,我们只需要知道文件端点模块提供了将文件从文件系统导入集成流和/或将流中的数据写入文件系统的能力即可。

接下来,我们需要为应用创建一种方法,让它能够发送数据到集成流中,这样它才能写入到文件中。为了实现这一点,我们需要创建一个网关接口,这样的网关接口如程序清单9.1所示。

程序清单9.1 将方法调用转换成消息的消息网关接口

1
2
3
4
5
6
7
8
9
10
package sia5;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel="textInChannel") ⇽--- 声明消息网关
public interface FileWriterGateway {
void writeToFile(
@Header(FileHeaders.FILENAME) String filename, ⇽--- 写入文件
String data);
}

尽管这只是一个很简单的Java接口,但是FileWriterGateway有很多东西需要介绍。我们首先看到的是,它使用了@MessagingGateway注解。这个注解会告诉Spring Integration要在运行时生成该接口的实现,这与Spring Data在运行时生成repository接口的实现非常类似。其他地方的代码在希望写入文件的时候将会调用它。

@MessagingGateway的defaultRequestChannel属性表明接口方法调用时所返回的消息要发送至给定的消息通道(message channel)。在本例中,我们声明调用writeToFile()所形成的消息应该发送至名为textInChannel的通道中。

对于writeToFile()方法来说,它以String类型的形式接受一个文件名,另外一个String包含了要写入文件中的文本。关于这个方法的签名,还需要注意filename参数上带有@Header。在本例中,@Header注解表明传递给filename的值应该包含在消息头信息中(通过FileHeaders.FILENAME声明,它将会被解析成file_name),而不是放到消息载荷(payload)中。

现在,我们已经有了消息网关,接下来就需要配置集成流了。尽管我们往构建文件中添加的Spring Integration starter依赖能够启用Spring Integration的自动配置功能,但是满足应用需求的流定义则需要我们自行编写额外的配置。在声明集成流方面,我们有3种配置方案可供选择:

  • XML配置;
  • Java配置;
  • 使用DSL的Java配置。

我们会依次看一下Spring Integration的这3种配置风格,首先从较为老式的XML配置开始。

9.1.1 使用XML定义集成流

尽管在本书中,我尽量避免使用XML配置,但是Spring Integration有使用XML定义集成流的漫长历史。所以,我认为至少展现一个XML定义集成流的样例还是很有价值的。程序清单9.2展现了如何使用XML配置示例集成流。

程序清单9.2 使用Spring XML配置定义集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-
file.xsd">
<int:channel id="textInChannel" /> ⇽--- 声明textInChannel
<int:transformer id="upperCase"
input-channel="textInChannel"
output-channel="fileWriterChannel"
expression="payload.toUpperCase()" /> ⇽--- 转换文本
<int:channel id="fileWriterChannel" /> ⇽--- 声明fileWriterChannel
<int-file:outbound-channel-adapter id="writer"
channel="fileWriterChannel"
directory="/tmp/sia5/files"
mode="APPEND"
append-new-line="true" /> ⇽--- 将文本写入到文件中
</beans>

我们将程序清单9.2中的XML拆分讲解一下。

  • 我们首先配置了一个名为textInChannel的通道。你会发现,它就是FileWriterGateway的请求通道。当FileWriterGateway的writeToFile()方法被调用的时候,结果形成的消息将会发布到这个通道上。
  • 我们还配置了一个转换器(transformer),它会从textInChannel接收消息。它使用Spring表达式语言(Spring Expression Language,SpEL)为消息载荷调用了toUpperCase()方法。进行大写操作之后的结果会发布到fileWriterChannel上。
  • 随后,我们配置了名为fileWriterChannel的通道。这个通道会作为一个导线,将转换器与出站通道适配器(outbound channel adapter)连接在一起。
  • 最后,我们使用int-file命名空间配置了出站通道适配器。这个XML命名空间是由Spring Integration的文件模块提供的,实现文件写入的功能。按照我们的配置,它从fileWriterChannel接收消息,并将消息的载荷写入到一个文件中,这个文件的名称是由消息头信息中的file_name属性指定的,而存入的目录则是由这里的directory属性指定的。如果文件已经存在,那么文件内容会以新行的方式进行追加,而不是覆盖该文件。

图9.1使用《企业集成模式》中的图形元素样式阐述了这个流。

image-20211015210914944

图9.1 文件写入器的集成流

如果想要在Spring Boot应用中使用XML配置,那么我们需要将XML作为资源导入到Spring应用中。最简单的实现方式就是在应用的某个Java配置类上使用Spring的@ImportResource注解:

1
2
3
@Configuration
@ImportResource("classpath:/filewriter-config.xml")
public class FileWriterIntegrationConfig { ... }

尽管基于XML的配置能够很好地用于Spring Integration,但是大多数的开发人员对于XML的使用越来越谨慎。(正如我所言,在本书中,我会尽量避免使用XML配置。)现在,我们抛开这些尖括号,看一下Spring Integration的Java配置风格。

9.1.2 使用Java配置集成流

大多数的现代Spring应用程序都会避免使用XML配置,而更加青睐Java配置。实际上,在Spring Boot应用中,Java配置是自动化配置功能更自然的补充形式。因此,如果我们要为Spring Boot应用添加集成流,最好使用Java来定义流程。

程序清单9.3展示了使用Java配置编写集成流的一个样例。这里的代码依然是功能相同的文件写入集成流,但是这次我们选择使用Java来实现。

程序清单9.3 使用Java配置来定义集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package sia5;
import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;
@Configuration
public class FileWriterIntegrationConfig {
@Bean
@Transformer(inputChannel="textInChannel", ⇽--- 声明转换器
outputChannel="fileWriterChannel")
public GenericTransformer<String, String> upperCaseTransformer() {
return text -> text.toUpperCase();
}
@Bean
@ServiceActivator(inputChannel="fileWriterChannel")
public FileWritingMessageHandler fileWriter() { ⇽--- 声明文件写入器
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File("/tmp/sia5/files"));
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
return handler;
}
}

在Java配置中,我们声明了两个bean:一个转换器,还有一个文件写入的消息处理器。这里的转换器是GenericTransformer。因为GenericTransformer是一个函数式接口,所以我们可以使用lambda表达式为其提供实现,这里调用了消息文本的toUpperCase()方法。转换器bean使用了@Transformer注解,这样会将其声明成集成流中的一个转换器,它接受来自textInChannel通道的消息,然后将消息写入到名为fileWriterChannel的通道中。

负责文件写入的bean则使用了@ServiceActivator注解,表明它会接受来自fileWriter Channel的消息,并且会将消息传递给FileWritingMessageHandler实例所定义的服务。FileWritingMessageHandler是一个消息处理器,它会将消息的载荷写入特定目录的一个文件中,而文件的名称是通过消息的file_name头信息指定的。与XML样例类似,FileWritingMessageHandler也配置为以新行的方式为文件追加内容。

FileWritingMessageHandler bean的一个独特之处在于它调用了setExpectReply(false)方法,通过这个方法能够告知服务激活器(serviceactivator)不要期望会有答复通道(reply channel,通过这样的通道,我们可以将某个值返回到流中的上游组件)。如果我们不调用setExpectReply(),文件写入bean的默认值是true。尽管管道的功能和预期一样,但是在日志中会看到一些错误日志,提示我们没有设置答复通道。

我们在这里没有必要显式声明通道。如果名为textInChannel和fileWriterChannel的bean不存在,那么这两个通道将会自动创建。但是,如果你想要更加精确地控制通道配置,那么可以按照如下的方式显式构建这些bean:

1
2
3
4
5
6
7
8
9
@Bean
public MessageChannel textInChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel fileWriterChannel() {
return new DirectChannel();
}

基于Java的配置方案可能会更易于阅读,也更加简洁,而且符合我在本书中倡导的纯Java配置风格。但是,使用Spring Integration的Java DSL(Domain-SpecificLanguage,领域特定语言)配置风格的话,它可以更加流畅。

9.1.3 使用Spring Integration的DSL配置

我们再次尝试一下文件写入集成流的定义。这一次,我们依然使用Java进行定义,但是会使用Spring Integration的Java DSL。这一次我们不再将流中的每个组件都声明为单独的bean,而是使用一个bean来定义整个流,如程序清单9.4所示。

程序清单9.4 为集成流的设计提供一个流畅的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package sia5;
import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;
@Configuration
public class FileWriterIntegrationConfig {
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel")) ⇽--- 入站通道
.<String, String>transform(t -> t.toUpperCase()) ⇽--- 声明转换器
.handle(Files ⇽--- 处理文件写入
.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}
}

这种配置方式在一个bean方法中定义了整个流,已经做到了尽可能简洁。Integration Flows类初始化构建器API,我们可以通过这个API来定义流。

在程序清单9.4中,我们首先从名为textInchannel的通道接收消息,然后进入一个转换器,将消息载荷转换成大写形式。通过转换器之后,消息会由出站通道适配器来进行处理。这个适配器是由Spring Integration file模块的Files类型创建的。最后,通过对get()的调用返回要构建的IntegrationFlow。简而言之,这个bean方法定义了与XML和Java配置样例相同的集成流。

你可能已经发现,与Java配置样例类似,我们不需要显式声明通道bean。我们引用了textInChannel,如果该名字对应的通道不存在,那么Spring Integration会自动创建它。不过,我们也可以显式声明bean。

对于连接转换器和出站通道适配器的通道,我们甚至没有通过名字引用它。如果需要显式配置通道,那么我们可以在流定义的时候通过调用channel()来引用它的名称:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.channel(MessageChannels.direct("fileWriterChannel"))
.handle(Files
.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}

在使用Spring Integration的Java DSL(与其他的fluent API类似)的时候,我们必须要巧妙地使用空格来保持可读性。在这里的样例中,我小心翼翼地使用缩进来保证代码块的可读性。对于更长、更复杂的流,我们甚至可以考虑将流的一部分抽取到单独的方法或子流中,以实现更好的可读性。

现在,我们已经看到了如何使用3种不同的方式来定义一个简单的流。接下来,我们回过头来看一下Spring Integration的全景。