10.4 小结

  • 反应式编程会涉及创建数据流经的处理管道。
  • 反应式流规范定义了4种类型:Publisher、Subscriber、Subscription和Processor(它是Publisher和Subscriber的组合)。
  • Reactor项目实现了反应式流规范,将反应式流的定义抽象为两个主要的类型,即Flux和Mono,并为每种类型都提供数百个操作。
  • Spring 5利用Reactor提供了反应式控制器、repository、REST客户端以及其他反应式框架的支持。

10.3 使用常见的反应式操作

Flux和Mono是Reactor提供的最基础的构建块,而这两种反应式类型所提供的操作符则是组合使用它们以构建数据流动管线的黏合剂。Flux和Mono共有500多个操作,这些操作都可以大致归类为:

  • 创建操作;
  • 组合操作;
  • 转换操作;
  • 逻辑操作。

虽然对这500多个操作一一探讨会非常有趣,但是本章的篇幅有限,所以我在本节中选择一些有用的操作来进行说明。下面让我们从创建操作开始吧。

注意:Mono的例子呢?因为Mono和Flux的很多操作是相同的,所以没有必要针对Mono和Flux重复进行介绍。此外,虽然Mono的操作也很有用,但是相比而言,Flux上的操作更有趣。我们的大多数示例会使用Flux。读者只需要知道Mono上通常具有相同的名称的操作即可。

10.3.1 创建反应式类型

在Spring中使用反应式类型时,我们通常将会从repository或者service中获取Flux或Mono,并不需要我们自行创建。偶尔,我们可能需要创建一个新的反应式Publisher。

Reactor提供了多种创建Flux和Mono的操作。这里,我们将介绍其中一些有用的创建操作。

根据对象创建

如果你有一个或多个对象,并想据此创建Flux或Mono,那么可以使用Flux或Mono上的静态just()方法来创建一个反应式类型,它们的数据会由这些对象来驱动。例如,下面的测试方法将从5个String对象中创建一个Flux:

1
2
3
4
5
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}

此时我们已经创建了Flux,但是它还没有订阅者。如果没有任何的订阅者,那么数据将不会流动。回想一下花园软管的比喻,假设我们已经将花园软管连接到了水龙头上,另一侧是水厂的水——但是在你打开水龙头之前,水不会流动。订阅反应式类型就如同你打开数据流的水龙头。

要添加一个订阅者,我们可以在Flux上调用subscribe()方法:

1
2
3
fruitFlux.subscribe(
f -> System.out.println("Here's some fruit: " + f)
);

这里传递给subscribe()方法的lambda表达式实际上是一个java.util.Consumer,用来创建反应式流的Subscriber。在调用subscribe()之后,数据会开始流动。在这个例子中,没有中间操作,所以数据从Flux直接流向订阅者。

将来自Flux或Mono的数据项打印到控制台是观察反应式类型运行方式的好方法,实际测试Flux或Mono更好的方法是使用Reactor提供的StepVerifier。对于给定的Flux或Mono,StepVerifier将会订阅该反应式类型,在数据流过时对数据应用断言,并在最后验证反应式流是否按预期完成。

例如,要验证预定义的数据是否流经了fruitFlux,我们可以编写如下所示的测试代码:

1
2
3
4
5
6
7
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();

在这个例子中,StepVerifier订阅了fruitFlux,然后断言Flux中的每个数据项是否与预期的水果名称相匹配。最后,它验证Flux在发布完“Strawberry”之后,整个fruitFlux正常完成。

对于本章的其他例子,你可以使用StepVerifier来编写测试,验证Flux或者Mono行为,研究相应的工作原理,从而帮助你学习和了解Reactor中最有用的操作。

根据集合创建

我们还可以根据数组、Iterable或者Java Stream创建Flux。图10.3使用弹珠图展示如何使用这种方式进行创建。

image-20211016185450377

图10.3 可以根据数组、Iterable或者Java Stream创建Flux

要根据数组创建Flux,可以调用Flux上的静态方法fromArray(),并传入一个源数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}

因为该源数组包含了之前从对象列表创建Flux时所使用的相同的水果名称,所以该Flux发布的数据会有相同的值,可以使用和之前相同的StepVerifier来验证。

如果我们需要根据java.util.List、java.util.Set或者其他任意java.lang.Iterable的实现来创建Flux,那么可以将其传递给静态的fromIterable()方法:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
// ... verify steps
}

或者,我们有一个Java Stream,并且希望将其用作Flux的源,那么可以调用fromStream()方法:

1
2
3
4
5
6
7
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
// ... verify steps
}

同样,我们可以使用和之前一样的StepVerifier来验证该Flux发布的数据。

生成Flux的数据

有时候我们根本没有可用的数据,而只是想要一个作为计数器的Flux,它会在每次发送新值时增加1。要创建一个计数器Flux,我们可以使用静态方法range()。图10.4说明了range()方法的工作原理。

image-20211016185716613

图10.4 从区间创建的Flux会以类似计数器的方式发布消息

下面的测试方法展示了如何创建一个区间Flux:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux =
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}

在这个例子中,我们创建了一个区间Flux,起始值为1,结束值为5。StepVerifier证明了它将发布5个条目,即整数1到5。

另一个与range()方法类似的Flux创建方法是interval()。与range()方法一样,interval()方法会创建一个发布递增值的Flux。interval()的特殊之处在于,我们不是给它设置一个起始值和结束值,而是指定一个应该每隔多长时间发出值的间隔时间。图10.5展示了interval()方法创建Flux的弹珠图。

image-20211016185806209

图10.5 根据指定间隔创建的Flux会周期性地发布条目

例如,要创建一个每秒发布一个值的Flux,你可以使用Flux上的静态interval() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}

需要注意的是,通过interval()方法创建的Flux会从0开始发布值,并且后续的条目依次递增。此外,因为interval()方法没有指定最大值,所以它可能会永远运行。我们也可以使用take()方法将结果限制为前5个条目。我们将在下一节中详细讨论take()方法。

10.3.2 组合反应式类型

有时候,我们会需要操作两种反应式类型,并以某种方式将它们合并在一起。或者,在其他情况下,我们可能需要将Flux拆分为多种反应式类型。在本节中,我们将研究组合以及拆分Reactor的Flux和Mono的操作。

合并反应式类型

假设我们有两个Flux流,并且需要据此创建一个结果Flux,这个形成的Flux会在任意上游Flux流有数据时产生数据。要将一个Flux与另一个Flux合并,可以使用mergeWith()方法,如图10.6中的弹珠图所示。

image-20211016185959408

图10.6 合并两个Flux流(它们的消息将会交错合并为一个新的Flux)

例如,假设有一个值是电视和电影角色名称的Flux,还有另一个值是这些角色喜欢吃的食物的名称的Flux。下面的测试方法展示了如何使用mergeWith()方法合并两个Flux对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void mergeFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();
}

通常,Flux会尽可能快地发布数据。因此,我们在创建的两个Flux流上使用delayElements()方法来减慢它们的速度——每500毫秒发布一个条目。此外,为了使食物Flux在角色名称Flux之后再开始流式传输,我们调用了食物Flux上的delaySubscription()方法,以便它在订阅后再经过250毫秒后才开始发布数据。

在合并了两个Flux对象后,将会创建一个新的合并过后的Flux。当StepVerifier订阅这个合并过后的Flux时,它将依次订阅两个源Flux流并启动数据流。

这个合并过后的Flux数据项发布顺序与源Flux的发布时间一致。因为两个Flux对象都设置为以常规速率进行发布,所以这些值在合并后的Flux中会交错在一起,结果是:一个角色、一个食物、另一个角色、另一个食物,以此类推。如果任何一个Flux的计时发生变化,那么你可能会看到接连发布了两个角色或者两个食物。

因为mergeWith()方法不能完美地保证源Flux之间的先后顺序,所以我们可以考虑使用zip()方法。当两个Flux对象压缩在一起的时候,它将会产生一个新的发布元组的Flux,其中每个元组中都包含了来自每个源Flux的数据项。图10.7说明了如何将两个Flux对象压缩在一起。

image-20211016190043625

图10.7 通过zip()方法合并两个Flux流

要查看zip()操作实际如何运行,可以考虑如下所示的测试方法,它将角色Flux和食物Flux合并在一起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}

需要注意的是,与mergeWith()方法不同,zip()方法是一个静态的创建操作。创建出来的Flux在角色和他们喜欢的食物之间会完美对齐。从这个合并后的Flux发出的每个条目都是一个Tuple2(一个容纳两个其他对象的容器对象)的实例,其中包含了来自每个源Flux的数据项,并保持着它们发布的顺序。

如果你不想使用Tuple2,而想要使用其他类型,就可以为zip()方法提供一个合并函数来生成你想要的任何对象,合并函数会传入这两个数据项(如图10.8中的弹珠图所示)。

image-20211016190146793

图10.8 zip操作的另一种形式(从每个传入Flux中各取一个元素,然后创建消息对象,并产生这些消息组成的Flux)

例如,下面的测试方法会将角色Flux与食物Flux合并在一起,以便生成一个包含String对象的Flux:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}

传递给zip()方法(在这里是一个lambda)的函数只是简单地将两个数据项组装成一个句子,然后通过该合并后的Flux发布出去。

选择第一个反应式类型进行发布

假设我们有两个Flux对象,此时我们不想将它们合并在一起,而是想要创建一个新的Flux,让这个新的Flux从第一个产生值的Flux中发布值。如图10.9所示,first()操作会在两个Flux对象中选择第一个发布值的Flux,并再次发布它的值。

image-20211016192744713

图10.9 first操作将会选择第一个发布消息的Flux并只发布该Flux的值

下面的测试方法创建了一个快速的Flux和一个“缓慢”的Flux(其中“慢”意味着它在被订阅后100毫秒才会发布数据项)。使用first()方法,它将会创建一个新的Flux,这个Flux只会获取第一个源Flux发布的值,并再次发布:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void firstFlux() {
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}

在这种情况下,因为慢速Flux会在快速Flux开始发布之后的100毫秒才发布值,所以新创建的Flux将会简单地忽略慢的Flux,并仅发布来自快速Flux的值。

10.3.3 转换和过滤反应式流

在数据流经一个流时,我们通常需要过滤掉某些值并对其他的值进行处理。在这一节,我们将介绍转换和过滤流经反应式流的数据的操作。

从反应式类型中过滤数据

数据在从Flux流出时,进行过滤的最基本方法之一是简单地忽略第一批指定数目的数据项。skip操作(如图10.10所示)就能完成这样的工作。

image-20211016192837833

图10.10 skip操作跳过指定数目的消息并将剩下的消息继续在结果Flux上进行传递

针对具有多个数据项的Flux,skip操作将创建一个新的Flux,它会首先跳过指定数量的数据项,然后从源Flux中发布剩余的数据项。下面的测试方法展示如何使用skip()方法:

1
2
3
4
5
6
7
8
9
@Test
public void skipAFew() {
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
StepVerifier.create(skipFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}

在这个场景下,我们有一个具有5个String数据项的Flux。在这个Flux上调用skip(3)方法后会产生一个新的Flux,它会跳过前3个数据项,只发布最后2个数据项。

但是,你可能并不想跳过特定数量的条目,而是想要在一段时间之内跳过所有的第一批数据。这是skip()操作的另一种形式,将会产生一个新Flux,在发布来自源Flux的数据项之前等待指定的一段时间,如图10.11所示。

image-20211016192910234

图10.11 skip操作的另一种形式

下面的测试方法使用skip操作创建了一个在发布值之前会等待4秒的Flux。因为Flux是基于一个在发布数据项之间有1秒延迟的Flux创建的(使用了delayElements()操作),所以它只会发布出最后两个数据项:

1
2
3
4
5
6
7
8
9
10
@Test
public void skipAFewSeconds() {
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(skipFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}

我们已经看过skip操作的示例,根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前面几个数据项,而take操作只发布第一批指定数量的数据项,然后将取消订阅(如图10.12中的弹珠图所示)。

1
2
3
4
5
6
7
8
9
10
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.take(3);
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}

image-20211016193007459

图10.12 take操作只发布传入Flux中前面指定数目的数据项

与skip()方法一样,take()方法也有另一种替代形式,基于间隔时间而不是数据项个数。它将接受并发布与源Flux一样多的数据项,直到某段时间结束,之后Flux将会完成,如图10.13所示。

image-20211016193028191

图10.13 take操作的另一种形式(在指定的时间过期之前,一直将消息传递给结果Flux)

下面的测试方法使用take()方法的另一种形式,将会在订阅之后的前3.5秒发布数据条目。

1
2
3
4
5
6
7
8
9
10
11
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}

skip操作和take操作都可以被认为是过滤操作,其过滤条件是基于计数或者持续时间的,而Flux值的更通用过滤则是filter操作。

我们需要指定一个Predicate,用于决定数据项是否能通过Flux,filter操作允许我们根据任何条件进行选择性地发布。图10.14中的弹珠图显示了filter操作的工作原理。

image-20211016193305271

图10.14 可以对传入的Flux进行过滤,这样结果Flux将只会发布满足指定Predicate的消息

要查看filter()的实际效果,可以参考下面的测试方法:

1
2
3
4
5
6
7
8
9
10
@Test
public void filter() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.filter(np -> !np.contains(" "));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Zion")
.verifyComplete();
}

这里我们将一个只接受不包含空格的字符串的Predicate作为lambda传给了filter()方法,因此在结果Flux中”Grand Canyon”和”Grand Teton”被过滤掉了。

我们还可能想要过滤掉已经接收过的数据条目,可以采用distinct操作(如图10.15所示),形成的Flux将只会发布源Flux中尚未发布过的数据项。

image-20211016193416475

图10.15 distinct操作将会过滤掉重复的消息

在下面的测试中,调用distinct()方法产生的Flux只会发布不同的String值:

1
2
3
4
5
6
7
8
9
@Test
public void distinct() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
StepVerifier.create(animalFlux)
.expectNext("dog", "cat", "bird", "anteater")
.verifyComplete();
}

虽然”dog”和”bird”从源Flux中都发布了两次,但是在调用distinct()方法产生的结果Flux中,它们只被发布了一次。

映射反应式数据

在Flux或Mono上最常见的操作之一就是将已发布的数据项转换为其他的形式或类型。Reactor的反应式类型(Flux和Mono)为此提供了map和flatMap操作。

map操作会创建一个新的Flux,只是在重新发布它所接收的每个对象之前会执行给定Function指定的转换。map操作的工作原理如图10.16所示。

image-20211016193528270

图10.16 map操作将传入的消息转换为结果流上的新消息

在下面的test()方法中,包含代表篮球运动员名字的String值的Flux被转换为一个包含Player对象的新Flux。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Jordan"))
.expectNext(new Player("Scottie", "Pippen"))
.expectNext(new Player("Steve", "Kerr"))
.verifyComplete();
}

以lambda形式传递给map()方法的函数会将传入的String值按照空格进行拆分,并使用生成的String数组来创建Player对象。使用just()方法创建的Flux包含了String对象,但是map()方法产生的Flux包含了Player对象。

其中重要的一点是:在每个数据项被源Flux发布时,map操作是同步执行的,如果你想要异步地转换过程,那么你应该考虑使用flatMap操作。

对于flatMap操作,我们可能需要一些思考和练习才能完全掌握。如图10.17所示,flatMap并不像map操作那样简单地将一个对象转换到另一个对象,而是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()方法结合使用时,flatMap操作可以释放Reactor反应式的异步能力。

image-20211016193621512

图10.17 flatMap操作使用一个中间的Flux来实现异步转换

下面的测试方法展示如何使用flatMap()方法和subscribeOn()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
List<Player> playerList = Arrays.asList(
new Player("Michael", "Jordan"),
new Player("Scottie", "Pippen"),
new Player("Steve", "Kerr"));
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}

需要注意的是,我们为flatMap()方法指定了一个lambda形式的函数,传入的String将会转换为一个Mono类型的String,然后在这个Mono上通过map()方法将字符串转换为一个Player。

如果到此为止,那么产生的Flux将同样包含Player对象,与使用map()方法的例子相同,顺序同步地生成。但是我们对Mono做的最后一个动作就是调用subscribeOn()方法,它声明每个订阅都应该在并行线程中进行,因此可以异步并行地执行多个String对象的转换操作。

尽管subscribeOn()方法的命名与subscribe()方法类似,但是它们的含义却完全不同。 subscribe()方法是一个动词,订阅并驱动反应式流;而subscribeOn()方法则更具描述性,指定了如何并发地处理订阅。Reactor本身并不强制使用特定的并发模型,通过subscribeOn()方法,我们可以使用Schedulers中的任意一个静态方法来指定并发模型。在这个例子中,我们使用了parallel()方法,使用来自固定线程池(大小与CPU核心数量相同)的工作线程。Schedulers支持多种并发模型,如表10.1所示。

表10.1 Schedulers支持的并发模型

epub_29101559_80

使用flatMap()和subscribeOn()的好处是:我们可以在多个并行线程之间拆分工作,从而增加流的吞吐量。因为工作是并行完成的,无法保证哪项工作首先完成,所以结果Flux中数据项的发布顺序是未知的。因此,StepVerifier只能验证发出的每个数据项是否存在于预期Player对象列表中,并且在Flux完成之前会有3个这样的数据项。

在反应式流上缓存数据

在处理流经Flux的数据时,你可能会发现将数据流拆分为小块会带来一定的收益。如图10.18所示的buffer操作可以帮助你解决这个问题。

image-20211016193756362

图10.18 buffer操作会产生一个新的包含列表Flux(具备最大长度限制的列表,包含从传入的Flux中收集来的数据)

我们给定一个包含多个String值的Flux,其中每个值代表一种水果的名称,我们可以创建一个新的包含List集合的Flux,其中每个List只有不超过指定数量的元素:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}

在这种情况下,String元素的Flux被缓冲到一个新的包含List集合的Flux中,其中每个集合不超过3个条目。因此,发出5个String值的原始Flux将会被转换为一个新的Flux,它会发出两个List集合,其中一个包含3个水果,而另一个包含2个水果。

这有什么意义呢?将反应式的Flux缓冲到非反应式的Flux中看起来适得其反,但是,当组合使用buffer()方法和flatMap()方法时,我们可以对每个List集合进行并行处理。

1
2
3
4
5
6
7
8
9
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();

在这个新例子中,我们仍然将5个String值的Flux缓冲到一个新的包含List的Flux中,但是这次将flatMap()应用于包含List集合的Flux。这将获取每个List缓冲区,并为其中的元素创建一个新的Flux,然后对其应用map操作。因此,每个List缓冲区都会在各个线程中执行进一步并行处理。

为了观察实际的效果,在代码中还包括了一个log()操作,它用于每个子Flux。log()操作记录了所有的反应式事件,以便于观察实际发生了什么事情。在日志中将会记录如下的条目(为简洁起见,删除了时间戳的部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
[main] INFO reactor.Flux.SubscribeOn.1 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()

如同日志记录所清晰展示的,第一个缓冲区(apple、orange和banana)中的水果在parallel-1线程中处理;与此同时,第二个缓冲区(kiwi和strawberry)中的水果在parallel-2线程中处理。从缓冲区的日志记录交织在一起的事实可以明显地看出,对两个缓冲区的处理是并行执行的。

如果由于某些原因需要将Flux发布的所有数据项都收集到一个List中,那么可以使用不带参数的buffer()方法:

1
Flux<List<String>> bufferedFlux = fruitFlux.buffer();

这将会产生一个新的Flux。这个Flux将会发布一个List,其中包含源Flux发布的所有数据项。我们可以使用collectList操作实现相同的功能,如图10.19中的弹珠图所示。

image-20211016195110114

图10.19 collectList操作将产生一个包含传入Flux发布的所有消息的Mono

collectList()方法会产生一个发布List的Mono,而不是发布List的Flux。下面的测试方法展示了它的用法:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void collectList() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier
.create(fruitListMono)
.expectNext(Arrays.asList(
"apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}

一种更加有趣的收集Flux发出的数据项的方法是将它们收集到Map中。如图10.20所示,collectMap操作将会产生一个发布Map的Mono,这个Map中填充了由给定Function计算key值所生成的条目。

image-20211016195145928

图10.20 collectMap操作将会产生一个Mono(包含了由传入Flux所发出的消息产生的Map,这个Map的key是从传入消息的某些特征衍生而来的)

要查看collectMap()的效果,请参考下面的测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void collectMap() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(animalMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('a').equals("aardvark") &&
map.get('e').equals("eagle") &&
map.get('k').equals("kangaroo");
})
.verifyComplete();
}

源Flux会发布一些动物的名字。基于这个Flux,我们使用collectMap操作创建了一个发布Map的新Mono,其中key由动物名称的首字母确定,而值则为动物名称本身。如果两个动物名称以相同的字母开头(如elephant和eagle,或者koala和kangaroo),那么流经该流的最后一个条目将会覆盖先前的条目。

10.3.4 在反应式类型上执行逻辑操作

有时候我们想要知道由Mono或者Flux发布的条目是否满足某些条件,那么all()和any()方法可以实现这样的逻辑。图10.21和图10.22展示了all()和any()的工作方式。

image-20211016195234658

图10.21 可以使用all()方法来确保Flux中的所有消息都满足某些条件

image-20211016195251014

图10.22 可以使用any()方法来确保Flux中至少有一个消息满足某些条件

假设我们想知道Flux发布的每个String中是否都包含了字母a或字母k,那么下面的测试将使用all()方法来检查这个条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void all() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}

在第一个StepVerifier中,我们检查了字母a。all()方法应用于源Flux,会产生布尔类型的Mono。在本例中,所有动物名称都包含了字母a,所以从生成的Mono中会发布true。但是在第二个StepVerifier中,产生的Mono将会发出false,因为并非所有动物名称都包含字母k。

如果至少有一个元素匹配条件即可,而不是要求所有元素均满足条件,那么any()就是我们所需的方法。下面这个新的测试用例使用any()来检查字母t和z:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void any() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}

在第一个StepVerifier中,我们会看到生成的Mono发布了true,因为至少有一种动物的名称含有字母t(尤其是elephant)。而在第二种情况下,生成的Mono发布了false,因为没有任何一种动物的名称包含字母z。

第3部分 反应式Spring

在第3部分,我们将探索Spring对反应式编程提供的全新支持。第10章讨论使用Reactor项目进行反应式编程的基础知识。Reactor项目是支撑Spring 5反应式特性的反应式编程库。然后,我们将会介绍Reactor中一些常用的反应式操作。在第11章中,我们会重新探讨REST API的开发,介绍全新的Web框架Spring WebFlex。该框架借用了很多Spring MVC的理念,为Web开发提供了新的反应式模型。第12章总结第3部分,介绍如何通过Spring Data对Cassandra和Mongo数据库进行读写,实现反应式数据持久化。

9.3 创建Email集成流

我们决定Taco Cloud应该允许客户通过Email提交taco设计和创建订单。我们发放传单并在报纸上刊登外卖广告,邀请每个人通过Email发送taco订单。这非常成功!但是,令人遗憾的是,它过于成功了。有太多的Email涌了进来,我们不得不申请临时帮助,让别人阅读所有的Email并将订单提交到订单系统中。

在本节中,我们将会实现一个集成流,轮询Taco Cloud的taco订单Email的收件箱、解析Email中的订单细节并将订单提交给Taco Cloud来进行处理。简而言之,在我们所创建的集成流中,入站通道适配器将会使用Email端点模块摄取TacoCloud收件箱中的Email到集成流中。

集成流的下一步会将Email解析为订单对象,这些订单对象会被传递给另一个处理器,从而将订单提交至Taco Cloud的REST API中,在这里我们会像其他订单那样处理它们。首先,我们定义一个简单的配置属性类,它会捕获处理Taco CloudEmail的特定信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
@ConfigurationProperties(prefix="tacocloud.email")
@Component
public class EmailProperties {
private String username;
private String password;
private String host;
private String mailbox;
private long pollRate = 30000;
public String getImapUrl() {
return String.format("imaps://%s:%s@%s/%s",
this.username, this.password, this.host, this.mailbox);
}
}

我们可以看到,EmailProperties会捕获生成IMAP URL的属性。这个流会使用这个URL连接Taco Cloud Email服务器并轮询Email。在捕获的属性中包括Email用户的用户名和密码以及IMAP服务器的主机、要轮询的邮箱以及邮箱轮询的频率(默认为30秒)。

EmailProperties在类级别使用了@ConfigurationProperties注解,并将prefix属性设置为tacocloud.email。这意味着,我们可以在application.yml文件中按照下述方式配置消费Email的详细信息:

1
2
3
4
5
6
7
tacocloud:
email:
host: imap.tacocloud.com
mailbox: INBOX
username: taco-in-flow
password: 1L0v3T4c0s
poll-rate: 10000

现在,我们使用EmailProperties来配置集成流。我们想要创建的流大致如图9.10所示。

image-20211016100608960

图9.10 通过Email接受taco订单的集成流

我们有两种方案来定义这个流。

  • 在Taco Cloud应用中进行定义:在流的结束点,服务激活器要调用我们之前定义的创建订单的repository。
  • 在单独的应用中进行定义:在流的结束点,服务激活器要发送POST请求到Taco Cloud API以提交taco订单。

不管选择哪种方式,除了服务激活器的实现方式之外,对流的本身影响并不大。但是,因为我们需要一些表示taco、订单和配料的类型,它们与Taco Cloud主应用可能会略微有所差异,所以我们会在一个单独的应用中定义集成流,避免与已有的领域类型相混淆。

我们还可以选择使用XML配置、Java配置或者Java DSL来定义流。我更喜欢DSL的优雅,所以在这里将会使用这种方案。如果你想要一些额外的挑战,也可以选择其他配置风格编写流的定义。现在,我们看一下taco Email订单流的Java DSL配置,如程序清单9.5所示。

程序清单9.5 定义接收Email并将其提交为订单的集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package tacos.email;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
@Configuration
public class TacoOrderEmailIntegrationConfig {
@Bean
public IntegrationFlow tacoOrderEmailFlow(
EmailProperties emailProps,
EmailToOrderTransformer emailToOrderTransformer,
OrderSubmitMessageHandler orderSubmitHandler) {
return IntegrationFlows
.from(Mail.imapInboundAdapter(emailProps.getImapUrl()),
e -> e.poller(
Pollers.fixedDelay(emailProps.getPollRate())))
.transform(emailToOrderTransformer)
.handle(orderSubmitHandler)
.get();
}
}

根据tacoOrderEmailFlow()方法的定义,taco Email订单流由3个不同的组件组成。

  • MAP Email入站通道适配器:使用IMP URL创建,而URL是根据EmailProperties的getImapUrl()方法创建的,并且会根据EmailProperties中设置的pollRate属性进行轮询。传入的Email会传递给一个通道,然后连接到转换器。
  • 将Email转换成订单对象的转换器:转换器是通过EmailToOrderTransformer实现的,它会注入tacoOrderEmailFlow()方法中。转换所形成的订单会通过另外一个通道传递给最后一个组件。
  • 处理器(作为出站通道适配器):处理器接受订单对象并将其提交至TacoCloud的REST API。

我们只有将Email端点模块作为依赖项添加到项目构建文件中,才能调用Mail.imap InboundAdapter()。Maven依赖如下所示:

1
2
3
4
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>

EmailToOrderTransformer是Spring Integration Transformer接口的实现,扩展了AbstractMailMessageTransformer(如程序清单9.6所示)。

程序清单9.6 使用集成转换器将传入的Email转换为taco订单

1
2
3
4
5
6
7
8
9
10
11
@Component
public class EmailToOrderTransformer
extends AbstractMailMessageTransformer<Order> {
@Override
protected AbstractIntegrationMessageBuilder<Order>
doTransform(Message mailMessage) throws Exception {
Order tacoOrder = processPayload(mailMessage);
return MessageBuilder.withPayload(tacoOrder);
}
...
}

AbstractMailMessageTransformer是一个很便利的基类,适用于载荷为Email的消息。它会抽取传入消息Email的信息,并将它放到一个Message对象中,传递给doTransform()方法。在doTransform()方法中,我们将Message对象传递给一个名为processPayload()的private方法,将Email解析为Order对象。这个Order对象尽管和主Taco Cloud应用中的Order对象有些相似,但是并不完全相同,这里更加简单一些:

1
2
3
4
5
6
7
8
9
10
11
12
package tacos.email;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
@Data
public class Order {
private final String email;
private List<Taco> tacos = new ArrayList<>();
public void addTaco(Taco taco) {
this.tacos.add(taco);
}
}

这个Order类不包含客户完整的投递信息和账单信息,而是只携带了客户的Email地址(通过传入的Email获取的)。

将Email解析成订单是一项非常重要的任务。实际上,即便最简单的实现也需要几十行代码。这些代码对于进一步讨论Spring Integration和如何实现转换器并没有任何助益。所以,为了节省空间,我在这里省略了processPayload()方法的细节。

EmailToOrderTransformer做的最后一件事情就是返回一个MessageBuilder,让消息的载荷中包含Order对象。MessageBuilder所生成的消息会发送至集成流的最后一个组件:将订单提交至Taco Cloud API的消息处理器。OrderSubmitMessageHandler实现了Spring Integration的GenericHandler,它会处理带有Order载荷的消息,如程序清单9.7所示。

程序清单9.7 通过消息处理器将订单提交至Taco Cloud API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package tacos.email;
import java.util.Map;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@Component
public class OrderSubmitMessageHandler
implements GenericHandler<Order> {
private RestTemplate rest;
private ApiProperties apiProps;
public OrderSubmitMessageHandler(
ApiProperties apiProps, RestTemplate rest) {
this.apiProps = apiProps;
this.rest = rest;
}
@Override
public Object handle(Order order, Map<String, Object> headers) {
rest.postForObject(apiProps.getUrl(), order, String.class);
return null;
}
}

为了满足GenericHandler接口的要求,OrderSubmitMessageHandler重写了handle()方法,这个方法接收传入的Order对象,并使用注入的RestTemplate利用POST请求将Order提交至ApiProperties对象指定的URL。最后,handle()方法返回null,表明这个处理器是流的终点。

这里使用ApiProperties避免在postForObject()时硬编码URL。它是一个配置属性类,如下所示:

1
2
3
4
5
6
@Data
@ConfigurationProperties(prefix="tacocloud.api")
@Component
public class ApiProperties {
private String url;
}

在application.yml中,Taco Cloud API的URL可能会配置如下:

1
2
3
tacocloud:
api:
url: http://api.tacocloud.com

为了让这个应用能够使用RestTemplate,并自动注入OrderSubmitMessageHandler中,我们需要在项目的构建文件中添加SpringBoot web starter依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

这不仅会将RestTemplate添加到类路径中,还会触发Spring MVC的自动配置功能。作为独立的Spring Integration流,这个应用并不需要Spring MVC,更不需要自动配置所提供的嵌入式Tomcat。所以,我们可以在application.yml中通过如下的配置条目禁用Spring MVC的自动配置:

1
2
3
spring:
main:
web-application-type: none

spring.main.web-application-type属性可以设置为servlet、reactive或none。当Spring MVC位于类路径之中时,自动配置功能会将其设置为servlet。我们在这里将其重写为none,所以Spring MVC和Tomcat将不会进行自动配置(我们将会在第11章介绍反应式Web应用是什么样子的)。

9.4 小结

  • 借助Spring Integration能够定义流,在进入和离开应用的时候可以对数据进行处理。
  • 集成流可以使用XML、Java或简洁的Java DSL配置风格来进行定义。
  • 消息网关和通道适配器会作为集成流的入口和出口。
  • 在流动的过程中,消息可以进行转换、切分、聚合、路由,也可以由服务激活器对其进行处理。
  • 消息通道连接集成流中的各个组件。

9.2 Spring Integration功能概览

Spring Integration涵盖了大量的集成场景。如果想将所有的内容放到一章中,就像把一头大象装到信封里一样不现实。在这里,我只会向你展示SpringIntegration这头大象的照片,而不是对Spring Integration进行面面俱到的讲解,目的就是让你能够了解它是如何运行的。随后,我们将会再创建一个集成流,为Taco Cloud应用添加新的功能。

集成流是由一个或多个如下介绍的组件组成的。在继续编写代码之前,我们先看一下这些组件在集成流中所扮演的角色。

  • 通道(channel):将消息从一个元素传递到另一个元素。
  • 过滤器(filter):基于某些断言,条件化地允许某些消息通过流。
  • 转换器(transformer):改变消息的值和/或将消息载荷从一种类型转换成另一种类型。
  • 路由器(router):将消息路由至一个或多个通道,通常会基于消息的头信息进行路由。
  • 切分器(splitter):将传入的消息切割成两个或更多的消息,然后将每个消息发送至不同的通道;
  • 聚合器(aggregator):切分器的反向操作,将来自不同通道的多个消息合并成一个消息。
  • 服务激活器(service activator):将消息传递给某个Java方法来进行处理,并将返回值发布到输出通道上。
  • 通道适配器(channel adapter):将通道连接到某些外部系统或传输方式,可以接受输入,也可以写出到外部系统。
  • 网关(gateway):通过接口将数据传递到集成流中。

在定义文件写入集成流的时候,我们已经看到过其中的一些组件了。FileWriterGateway是一个网关,通过它,应用可以提交要写入文件的文本。我们还定义了一个转换器,将给定的文本转换成大写的形式,随后,我们定义一个出站通道适配器,它执行将文本写入文件的任务。这个流有两个通道,textInChannel和fileWriterChannel,它们将应用中的其他组件连接在了一起。现在,我们按照承诺快速看一下这些集成流组件。

9.2.1 消息通道

消息通道是消息穿行集成通道的一种方式(参见图9.2)。它们是连接SpringIntegration其他组成部分的管道。

image-20211015211632575

图9.2 消息通道是集成流中数据在其他组件之间流动的管道

Spring Integration提供了多种通道实现。

  • PublishSubscribeChannel:发送到PublishSubscribeChannel的消息会被传递到一个或多个消费者中。如果有多个消费者,它们都会接收到消息。
  • QueueChannel:发送到QueueChannel的消息会存储到一个队列中,会按照先进先出(First In First Out,FIFO)的方式被拉取出来。如果有多个消费者,只有其中的一个消费者会接收到消息。
  • PriorityChannel:与QueueChannel类似,但它不是FIFO的方式,而是会基于消息的priority头信息被消费者拉取出来。
  • RendezvousChannel:与QueueChannel类似,但是发送者会一直阻塞通道,直到消费者接收到消息为止,实际上会同步发送者和消费者。
  • DirectChannel:与PublishSubscribeChannel类似,但是消息只会发送至一个消费者,它会在与发送者相同的线程中调用消费者。这种方式允许事务跨通道。
  • ExecutorChannel:类似于DirectChannel,但是消息分发是通过TaskExecutor实现的,这样会在与发送者独立的线程中执行。这种通道类型不支持事务跨通道。
  • FluxMessageChannel:反应式流的发布者消息通道,基于Reactor项目的Flux。(我们将会在第10章讨论反应式流、Reactor和Flux。)

在Java配置和Java DSL中,输入通道都是自动创建的,默认使用的是DirectChannel。但是,如果想要使用不同的通道实现,就需要将通道声明为bean并在集成流中引用它。例如,要声明PublishSubscribeChannel,我们需要声明如下的@Bean方法:

1
2
3
4
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}

随后,我们可以在集成流定义中根据通道名称引用它。例如,这个通道要被一个服务激活器bean所消费,那么我们可以在@ServiceActivator注解的inputChannel属性中引用它:

1
@ServiceActivator(inputChannel="orderChannel")

或者,使用Java DSL配置风格,我们可以通过调用channel()来引用它:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow orderFlow() {
return IntegrationFlows
...
.channel("orderChannel")
...
.get();
}

很重要的一点需要注意,如果使用QueueChannel,消费者必须配置一个poller。例如,声明一个QueueChannel bean:

1
2
3
4
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}

那么,我们需要确保消费者配置成轮询该通道的消息。如果是服务激活器,@ServiceActivator注解可能会如下所示:

1
2
@ServiceActivator(inputChannel="orderChannel",
poller=@Poller(fixedRate="1000"))

在本例中,服务激活器每秒(或者说每1000毫秒)都会轮询名为orderChannel的通道。

9.2.2 过滤器

过滤器放置于集成管道的中间,它能够根据断言允许或拒绝消息进入流程的下一步(见图9.3)。

image-20211015211913017

图9.3 过滤器会基于某个断言允许或拒绝消息在管道中进行处理

例如,假设消息包含了整型的值,它们要通过名为numberChannel的通道进行发布,但是我们只想让偶数进入名为evenNumberChannel的通道。在这种情况下,我们可以使用@Filter注解定义一个过滤器:

1
2
3
4
5
@Filter(inputChannel="numberChannel",
outputChannel="evenNumberChannel")
public boolean evenNumberFilter(Integer number) {
return number % 2 == 0;
}

作为替代方案,如果使用Java DSL配置风格来定义集成流,那么我们可以按照如下的方式来调用filter():

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
...
.<Integer>filter((p) -> p % 2 == 0)
...
.get();
}

在本例中,我们使用lambda表达式来实现过滤器。实际上,filter()方法接受GenericSelector作为参数。这意味着,如果我们的过滤器过于复杂,不适合放到一个简单的lambda表达式中,那么我们可以实现GenericSelector接口作为替代方案。

9.2.3 转换器

转换器会对消息执行一些操作,一般会形成不同的消息,有可能还会产生不同的载荷类型(见图9.4)。转换过程可能非常简单,比如执行数字的数学运算或者操作String值。转换也可能会很复杂,比如根据代表ISBN的String值查询并返回对应图书的详细信息。

image-20211016093244978

图9.4 转换器会改变流经集成流的消息

例如,假设整型值会通过名为numberChannel的通道进行发布,我们希望将这些数字转换成它们的罗马数字形式,以String类型来表示。在这种情况下,我们可以声明一个GenericTransformer类型的bean并为其添加@Transformer注解,如下所示:

1
2
3
4
5
6
@Bean
@Transformer(inputChannel="numberChannel",
outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
return RomanNumbers::toRoman;
}

@Transformer注解可以将这个bean声明为转换器bean,它会从名为numberChannel的通道接收Integer值,然后使用静态方法toRoman()进行转换。(toRoman()是静态方法,定义在名为RomanNumbers的类中,这里通过方法引用来使用它。)转换后的结果会发布到名为romanNumberChannel的通道中。

在Java DSL配置风格中,调用transform()会更加简单,我们只需将对toRoman()的方法引用传递进来就可以了:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows
...
.transform(RomanNumbers::toRoman)
...
.get();
}

尽管在这两个转换器代码中我们都使用了方法引用,但是转换器也可以使用lambda表达式来进行声明。或者,如果转换器足够复杂,需要使用一个单独的类,那么我们可以将其作为一个bean注入流定义中,并将引用传递给transform()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public RomanNumberTransformer romanNumberTransformer() {
return new RomanNumberTransformer();
}
@Bean
public IntegrationFlow transformerFlow(
RomanNumberTransformer romanNumberTransformer) {
return IntegrationFlows
...
.transform(romanNumberTransformer)
...
.get();
}

在这里,我们声明了RomanNumberTransformer类型的bean,它本身是SpringIntegration Transformer或GenericTransformer接口的实现。这个bean注入到了transformerFlow()方法中,并且在定义集成流的时候传递给了transform()方法。

9.2.4 路由器

路由器能够基于某个路由断言,实现集成流的分支,从而将消息发送至不同的通道上(见图9.5)。

image-20211016093412464

图9.5 路由器会根据应用于消息的断言将消息定向至不同的通道

例如,假设我们有一个名为numberChannel的通道,它会传输整型值。我们想要将带有偶数的消息定向到名为evenChannel的通道,将带有奇数的消息定向到名为oddChannel的通道。要在集成流中创建这样一个路由器,我们可以声明一个AbstractMessageRouter类型的bean,并为其添加@Router注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel>
determineTargetChannels(Message<?> message) {
Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel());
}
return Collections.singleton(oddChannel());
}
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}

这里定义的AbstractMessageRouter接收名为numberChannel的输入通道的消息,以匿名内部类的形式检查消息的载荷:如果是偶数,就返回名为evenChannel的通道(在路由器bean之后同样以bean的方式进行了声明);否则,通道载荷中的数字必然是奇数,将会返回名为oddChannel的通道(同样以bean方法的方式进行了声明)。

在Java DSL风格中,路由器是通过在流定义中调用route()方法来声明的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
...
.<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping
.subFlowMapping("EVEN", sf -> sf
.<Integer, Integer>transform(n -> n * 10)
.handle((i,h) -> { ... })
)
.subFlowMapping("ODD", sf -> sf
.transform(RomanNumbers::toRoman)
.handle((i,h) -> { ... })
)
)
.get();
}

尽管我们依然可以定义AbstractMessageRouter并将其传递到route(),但是在这个样例中使用了lambda来确定消息载荷是偶数还是奇数。如果是偶数,就会返回值为EVEN的字符串;如果是奇数,就会返回值为ODD的字符串。然后这些值会用来确定该使用哪个子映射处理消息。

9.2.5 切分器

在集成流中,有时候将一个消息切分为多个消息独立处理可能会非常有用。切分器将会负责切分并处理这些消息,如图9.6所示。

image-20211016093512699

图9.6 切分器会将消息拆分为两个或更多独立的消息,它们可以由独立的子流分别进行处理

在很多场景中,切分器都非常有用,但是有两种基本的使用场景我们可以使用切分器。

  • 消息载荷中包含了相同类型条目的一个列表,我们希望将它们作为单独的消息载荷来进行处理。例如,消息中携带了一个商品列表,它们可以切分为多个消息,每个消息的载荷分别对应一件商品。
  • 消息载荷所携带的信息尽管有所关联,但是可以拆分为两个或更多不同类型的消息。例如,一个购买订单可能会包含投递信息、账单以及商品项的信息。投递细节可以通过某个子流来处理,账单由另一个子流来处理,而商品项由其他的子流来处理。在这种情况下,切分器后面通常会紧跟着一个路由器,它根据消息的载荷类型进行路由,确保数据都由正确的子流来进行处理。

在我们将消息载荷切分为两个或更多不同类型的消息时,通常定义一个POJO就足够了,它提取传入消息不同的组成部分,并以元素集合的形式返回。

例如,假设我们想要将带有购买订单的消息切分为两个消息:其中一个携带账单信息,另一个携带商品项的信息。如下的OrderSplitter就可以完成该任务:

1
2
3
4
5
6
7
8
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}

接下来,我们声明一个OrderSplitter bean,并通过@Splitter注解将其作为集成流的一部分:

1
2
3
4
5
6
@Bean
@Splitter(inputChannel="poChannel",
outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}

在这里,购买订单会到达名为poChannel的通道,它们会被OrderSplitter切分。然后,所返回集合中的每个条目都会作为集成流中独立的消息,它们会发布到名为splitOrderChannel的通道上。此时,我们可以在流中声明一个PayloadTypeRouter,将账单信息和商品项分别路由至它们自己的子流上:

1
2
3
4
5
6
7
8
9
10
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(
List.class.getName(), "lineItemsChannel");
return router;
}

顾名思义,PayloadTypeRouter会根据消息的载荷将它们路由至不同的通道。按照这里的配置,载荷为BillingInfo类型的消息将会被路由至名为billingInfoChannel的通道,供后续进行处理。对于商品项来说,它们会放到一个java.util.List集合中,因此,我们将List类型的载荷映射到名为lineItemsChannel的通道中。

按照目前的状况,流将会被切分成两个子流:一个BillingInfo对象的流,另外一个则是List<LineItem>的流。如果我们想要进一步进行拆分,比如不想处理LineItems的列表,而是想要分别处理每个LineItem,又该怎么办呢?要将商品列表拆分为多个消息,其中每个消息包含一个条目,我们只需要编写一个方法(而不是一个bean)即可。这个方法带有@Splitter注解并且要返回LineItem的集合,如下所示:

1
2
3
4
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}

当带有List<LineItem>载荷的消息抵达名为lineItemsChannel的通道时,消息会进入lineItemSplitter()。按照切分器的规则,这个方法必须要返回切分后条目的集合。在本例中,我们已经有了LineItem的集合,所以我们直接返回这个集合就可以了。这样做的结果就是,集合中的每个LineItem都将会发布到一个消息中,这些消息会被发送到名为lineItemChannel的通道中。

如果想要使用Java DSL声明相同的splitter/router配置,那么我们可以通过调用split()和route()来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String> route(
p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo> handle((billingInfo, h) -> {
...
}))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem> handle((lineItem, h) -> {
...
}))
)
.get();

DSL所组成的流定义相当简洁,但是可能会有点难以理解。它使用与Java配置样例相同的OrderSplitter来切分订单。在订单切分之后,它根据类型将其路由至两个独立的子流。

9.2.6 服务激活器

服务激活器接收来自输入通道的消息并将这些消息发送至一个MessageHandler的实现,如图9.7所示。

image-20211016093710409

图9.7 在接收到消息时,服务激活器会通过MessageHandler调用某个服务

Spring Integration提供了多个开箱即用的MessageHandler(PayloadTypeRouter甚至就是MessageHandler的一个实现),但是我们通常会需要提供一些自定义的实现作为服务激活器。作为样例,如下的代码展现了如何声明MessageHandler bean并将其配置为服务激活器:

1
2
3
4
5
6
7
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}

这个bean使用了@ServiceActivator注解,表明它会作为一个服务激活器处理来自someChannel通道的消息。对于MessageHandler本身来讲,它是通过一个lambda表达式实现的。这是一个简单的MessageHandler,当得到消息之后,它会将消息的载荷打印至标准输出流。

另外,我们还可以声明一个服务激活器,让它在返回新载荷之前处理输入消息中的数据。在这种情况下,bean应该是一个GenericHandler,而不是MessageHandler:

1
2
3
4
5
6
7
8
9
@Bean
@ServiceActivator(inputChannel="orderChannel",
outputChannel="completeOrder")
public GenericHandler<Order> orderHandler(
OrderRepository orderRepo) {
return (payload, headers) -> {
return orderRepo.save(payload);
};
}

在本例中,服务激活器是一个GenericHandler,会接收载荷为Order类型的消息。当订单抵达时,我们会通过一个repository将它保存起来,并返回保存之后的Order,这个Order随后被发送至名为completeChannel的输出通道。

你可能已经注意到了,GenericHandler不仅能够得到载荷,还能得到消息头(虽然我们这个样例根本没有用到这些头信息)。我们还可以在Java DSL配置风格中使用服务激活器,此时,只需要将MessageHandler或GenericHandler传递到流定义的handle()方法中即可:

1
2
3
4
5
6
7
8
public IntegrationFlow someFlow() {
return IntegrationFlows
...
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}

在本例中,MessageHandler会得到一个lambda表达式,但是我们也可以为其提供一个方法引用,甚至是实现了MessageHandler接口的类实例。如果我们为其提供lambda表达式或方法引用,就需要记住它们均接受消息作为其参数。

类似的,如果服务激活器不想成为流的终点,那么handle()还可以接受GenericHandler。如果要将前面提到的订单保存服务激活器添加进来,我们可以按照如下的形式使用Java DSL配置流:

1
2
3
4
5
6
7
8
9
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
return IntegrationFlows
...
.<Order>handle((payload, headers) -> {
return orderRepo.save(payload);
})
...
.get();
}

在使用GenericHandler的时候,lambda表达式或方法引用会接受消息负载和头信息作为参数。如果你选择使用GenericHandler作为流的终点,就需要返回null;否则,出现错误,提示没有指定输出通道。

9.2.7 网关

通过网关,应用可以提交数据到集成流中,并且能够可选地接收流的结果作为响应。网关会声明为接口,借助Spring Integration的实现,应用可以调用它来发送消息到集成流中(见图9.8)。

image-20211016093824376

图9.8 服务网关是接口

我们已经见过了消息网关的样例,也就是FileWriterGateway。FileWriterGateway是一个单向的网关,它有一个接受String类型的方法,该方法会将文本写入到文件中,返回void。编写双向的网关同样简单。在编写网关接口的时候,只需确保方法要返回某个值,以便于推送到集成流中。

作为样例,假设有个网关,它面对的是一个简单的集成流,这个流会接受一个String并将给定的String转换成全大写的形式。这个网关接口大致如下所示:

1
2
3
4
5
6
7
8
9
package com.example.demo;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel="inChannel",
defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
String uppercase(String in);
}

最让人开心的是,这个接口不需要实现。Spring Integration会自动在运行时提供一个实现,它会通过特定的通道发送和接收消息。

当uppercase()被调用的时候,给定的String会发布到集成流中,进入名为inChannel的通道。不管流是如何定义的或者它都干了些什么,当数据进入名为outChannel的通道时,它将会从uppercase()方法返回。

对于我们这个转换成大写格式的集成流来说,它是一个非常简单的流,只需要一个将String转换成大写格式的步骤就可以。它可以通过Java DSL配置声明如下:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows
.from("inChannel")
.<String, String> transform(s -> s.toUpperCase())
.channel("outChannel")
.get();
}

按照定义,这个流会从进入inChannel通道的数据开始。消息载荷会由转换器进行处理,也就是执行大写操作(通过lambda表达式来定义)。结果形成的消息会被发送到名为outChannel的通道,也就是我们在UpperCaseGateway中声明的答复通道。

9.2.8 通道适配器

通道适配器代表了集成流的入口和出口。数据通过入站通道适配器(inboundchannel adapter)进入一个集成流,通过出站通道适配器离开一个集成流,如图9.9所示。

image-20211016093919555

图9.9 通道适配器是集成流的入口和出口

根据要将什么数据源引入集成流,入站通道适配器可以有很多种形式。例如,我们可以声明一个入站通道适配器,将来自AtomicInteger不断递增的数字引入到流中。如果使用Java配置,如下所示:

1
2
3
4
5
6
7
8
@Bean
@InboundChannelAdapter(
poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}

这个@Bean方法通过@InboundChannelAdapter注解声明了一个入站通道适配器,根据注入的AtomicInteger每隔一秒(也就是1000毫秒)就提交一个数字给名为numberChannel的通道。

在使用Java配置时,我们可以通过@InboundChannelAdapter注解声明入站通道适配器,而在使用Java DSL定义集成流的时候,我们需要使用from()方法完成同样的事情。如下的流定义程序清单展现了类似的入站通道适配器,它是使用Java DSL定义的:

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement",
c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}

通常,通道适配器是由Spring Integration的众多端点模块提供的。假设,我们需要一个入站通道适配器,它会监控一个特定的目录并将写入该目录的文件以消息的形式提交到file-channel通道中。如下的Java配置使用来自Spring Integration的file端点模块实现该功能:

1
2
3
4
5
6
7
8
9
@Bean
@InboundChannelAdapter(channel="file-channel",
poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}

如果使用Java DSL编写同等功能的入站通道适配器,那么我们可以使用Files类的inboundAdapter()。出站通道适配器是集成流的终点,会将最终的消息传递给应用或其他外部系统:

1
2
3
4
5
6
7
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INPUT_DIR))
.patternFilter(FILE_PATTERN))
.get();
}

我们通常会将服务激活器实现为消息处理器,让它作为出站通道适配器,在数据需要传递给应用本身的时候更是如此。我们已经讨论过服务激活器,所以没有必要重复讨论了。

但是,需要注意,Spring Integration端点模块为多个通用场景提供了消息处理器。在程序清单9.3中,我们已经见到过一个这种出站通道适配器的样例,即FileWriting MessageHandler。提到Spring Integration端点模块,我们看一下都有哪些直接可用的集成端点模块。

9.2.9 端点模块

Spring Integration允许我们创建自己的通道适配器,这一点非常好,但是更棒的是Spring Integration提供了二十多个包含通道适配器(同时包括入站和出站的适配器)的端点模块(见表9.1),用于和各种常见的外部系统实现集成。

表9.1 Spring Integration提供的二十多个端点模块

epub_29101559_61

从表9.1我们可以清楚地看到,Spring Integration提供了用途广泛的一组组件,它们能够满足非常多的集成需求。大多数应用程序所使用的只是Spring Integration所提供功能的九牛一毛。需要的话,我们最好还是要知道Spring Integration已经提供了相关的功能。

另外,我们不可能在一章的篇幅中介绍表9.1中的所有通道适配器。我们已经看到了如何使用文件系统模块写入文件的样例,随后将会看到如何使用Email模块来读取Email。

对于每个端点模块的通道适配器,我们可以在Java配置中将其声明为bean,也可以在Java DSL配置中通过静态方法的方式引用。我建议你探索一下自己感兴趣的其他端点模块。你会发现它们在使用方式上是非常一致的。现在,我们关注一下Email端点模块,看一下如何将它用到Taco Cloud应用中。

9.1 声明一个简单的集成流

通常来讲,在使用Spring Integration创建集成流时,是通过声明一个应用程序能够接收或发送哪些数据到应用程序之外的资源来实现的。应用程序可能集成的资源之一就是文件系统。因此,Spring Integration的很多组件都有读入和写入文件的通道适配器(channel adapter)。

为了熟悉Spring Integration,我们将会创建一个集成流,这个流会写入数据到文件系统中。首先,我们需要添加Spring Integration到项目的构建文件中。对于Maven构建来讲,必要的依赖如下所示:

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>

第一项依赖是Spring Integration的Spring Boot starter。不管我们与哪种流进行交互,对于Spring Integration流的开发来讲,这个依赖都是必需的。与所有的Spring Boot starter一样,在Initializr表单中,这个依赖也可以通过复选框进行选择。

第二项依赖是Spring Integration的文件端点模块。这个模块是与外部系统集成的二十多个模块之一。我们会在9.2.9小节中更加详细地讨论端点模块。对于现在来讲,我们只需要知道文件端点模块提供了将文件从文件系统导入集成流和/或将流中的数据写入文件系统的能力即可。

接下来,我们需要为应用创建一种方法,让它能够发送数据到集成流中,这样它才能写入到文件中。为了实现这一点,我们需要创建一个网关接口,这样的网关接口如程序清单9.1所示。

程序清单9.1 将方法调用转换成消息的消息网关接口

1
2
3
4
5
6
7
8
9
10
package sia5;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel="textInChannel") ⇽--- 声明消息网关
public interface FileWriterGateway {
void writeToFile(
@Header(FileHeaders.FILENAME) String filename, ⇽--- 写入文件
String data);
}

尽管这只是一个很简单的Java接口,但是FileWriterGateway有很多东西需要介绍。我们首先看到的是,它使用了@MessagingGateway注解。这个注解会告诉Spring Integration要在运行时生成该接口的实现,这与Spring Data在运行时生成repository接口的实现非常类似。其他地方的代码在希望写入文件的时候将会调用它。

@MessagingGateway的defaultRequestChannel属性表明接口方法调用时所返回的消息要发送至给定的消息通道(message channel)。在本例中,我们声明调用writeToFile()所形成的消息应该发送至名为textInChannel的通道中。

对于writeToFile()方法来说,它以String类型的形式接受一个文件名,另外一个String包含了要写入文件中的文本。关于这个方法的签名,还需要注意filename参数上带有@Header。在本例中,@Header注解表明传递给filename的值应该包含在消息头信息中(通过FileHeaders.FILENAME声明,它将会被解析成file_name),而不是放到消息载荷(payload)中。

现在,我们已经有了消息网关,接下来就需要配置集成流了。尽管我们往构建文件中添加的Spring Integration starter依赖能够启用Spring Integration的自动配置功能,但是满足应用需求的流定义则需要我们自行编写额外的配置。在声明集成流方面,我们有3种配置方案可供选择:

  • XML配置;
  • Java配置;
  • 使用DSL的Java配置。

我们会依次看一下Spring Integration的这3种配置风格,首先从较为老式的XML配置开始。

9.1.1 使用XML定义集成流

尽管在本书中,我尽量避免使用XML配置,但是Spring Integration有使用XML定义集成流的漫长历史。所以,我认为至少展现一个XML定义集成流的样例还是很有价值的。程序清单9.2展现了如何使用XML配置示例集成流。

程序清单9.2 使用Spring XML配置定义集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-
file.xsd">
<int:channel id="textInChannel" /> ⇽--- 声明textInChannel
<int:transformer id="upperCase"
input-channel="textInChannel"
output-channel="fileWriterChannel"
expression="payload.toUpperCase()" /> ⇽--- 转换文本
<int:channel id="fileWriterChannel" /> ⇽--- 声明fileWriterChannel
<int-file:outbound-channel-adapter id="writer"
channel="fileWriterChannel"
directory="/tmp/sia5/files"
mode="APPEND"
append-new-line="true" /> ⇽--- 将文本写入到文件中
</beans>

我们将程序清单9.2中的XML拆分讲解一下。

  • 我们首先配置了一个名为textInChannel的通道。你会发现,它就是FileWriterGateway的请求通道。当FileWriterGateway的writeToFile()方法被调用的时候,结果形成的消息将会发布到这个通道上。
  • 我们还配置了一个转换器(transformer),它会从textInChannel接收消息。它使用Spring表达式语言(Spring Expression Language,SpEL)为消息载荷调用了toUpperCase()方法。进行大写操作之后的结果会发布到fileWriterChannel上。
  • 随后,我们配置了名为fileWriterChannel的通道。这个通道会作为一个导线,将转换器与出站通道适配器(outbound channel adapter)连接在一起。
  • 最后,我们使用int-file命名空间配置了出站通道适配器。这个XML命名空间是由Spring Integration的文件模块提供的,实现文件写入的功能。按照我们的配置,它从fileWriterChannel接收消息,并将消息的载荷写入到一个文件中,这个文件的名称是由消息头信息中的file_name属性指定的,而存入的目录则是由这里的directory属性指定的。如果文件已经存在,那么文件内容会以新行的方式进行追加,而不是覆盖该文件。

图9.1使用《企业集成模式》中的图形元素样式阐述了这个流。

image-20211015210914944

图9.1 文件写入器的集成流

如果想要在Spring Boot应用中使用XML配置,那么我们需要将XML作为资源导入到Spring应用中。最简单的实现方式就是在应用的某个Java配置类上使用Spring的@ImportResource注解:

1
2
3
@Configuration
@ImportResource("classpath:/filewriter-config.xml")
public class FileWriterIntegrationConfig { ... }

尽管基于XML的配置能够很好地用于Spring Integration,但是大多数的开发人员对于XML的使用越来越谨慎。(正如我所言,在本书中,我会尽量避免使用XML配置。)现在,我们抛开这些尖括号,看一下Spring Integration的Java配置风格。

9.1.2 使用Java配置集成流

大多数的现代Spring应用程序都会避免使用XML配置,而更加青睐Java配置。实际上,在Spring Boot应用中,Java配置是自动化配置功能更自然的补充形式。因此,如果我们要为Spring Boot应用添加集成流,最好使用Java来定义流程。

程序清单9.3展示了使用Java配置编写集成流的一个样例。这里的代码依然是功能相同的文件写入集成流,但是这次我们选择使用Java来实现。

程序清单9.3 使用Java配置来定义集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package sia5;
import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;
@Configuration
public class FileWriterIntegrationConfig {
@Bean
@Transformer(inputChannel="textInChannel", ⇽--- 声明转换器
outputChannel="fileWriterChannel")
public GenericTransformer<String, String> upperCaseTransformer() {
return text -> text.toUpperCase();
}
@Bean
@ServiceActivator(inputChannel="fileWriterChannel")
public FileWritingMessageHandler fileWriter() { ⇽--- 声明文件写入器
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File("/tmp/sia5/files"));
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
return handler;
}
}

在Java配置中,我们声明了两个bean:一个转换器,还有一个文件写入的消息处理器。这里的转换器是GenericTransformer。因为GenericTransformer是一个函数式接口,所以我们可以使用lambda表达式为其提供实现,这里调用了消息文本的toUpperCase()方法。转换器bean使用了@Transformer注解,这样会将其声明成集成流中的一个转换器,它接受来自textInChannel通道的消息,然后将消息写入到名为fileWriterChannel的通道中。

负责文件写入的bean则使用了@ServiceActivator注解,表明它会接受来自fileWriter Channel的消息,并且会将消息传递给FileWritingMessageHandler实例所定义的服务。FileWritingMessageHandler是一个消息处理器,它会将消息的载荷写入特定目录的一个文件中,而文件的名称是通过消息的file_name头信息指定的。与XML样例类似,FileWritingMessageHandler也配置为以新行的方式为文件追加内容。

FileWritingMessageHandler bean的一个独特之处在于它调用了setExpectReply(false)方法,通过这个方法能够告知服务激活器(serviceactivator)不要期望会有答复通道(reply channel,通过这样的通道,我们可以将某个值返回到流中的上游组件)。如果我们不调用setExpectReply(),文件写入bean的默认值是true。尽管管道的功能和预期一样,但是在日志中会看到一些错误日志,提示我们没有设置答复通道。

我们在这里没有必要显式声明通道。如果名为textInChannel和fileWriterChannel的bean不存在,那么这两个通道将会自动创建。但是,如果你想要更加精确地控制通道配置,那么可以按照如下的方式显式构建这些bean:

1
2
3
4
5
6
7
8
9
@Bean
public MessageChannel textInChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel fileWriterChannel() {
return new DirectChannel();
}

基于Java的配置方案可能会更易于阅读,也更加简洁,而且符合我在本书中倡导的纯Java配置风格。但是,使用Spring Integration的Java DSL(Domain-SpecificLanguage,领域特定语言)配置风格的话,它可以更加流畅。

9.1.3 使用Spring Integration的DSL配置

我们再次尝试一下文件写入集成流的定义。这一次,我们依然使用Java进行定义,但是会使用Spring Integration的Java DSL。这一次我们不再将流中的每个组件都声明为单独的bean,而是使用一个bean来定义整个流,如程序清单9.4所示。

程序清单9.4 为集成流的设计提供一个流畅的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package sia5;
import java.io.File;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;
@Configuration
public class FileWriterIntegrationConfig {
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel")) ⇽--- 入站通道
.<String, String>transform(t -> t.toUpperCase()) ⇽--- 声明转换器
.handle(Files ⇽--- 处理文件写入
.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}
}

这种配置方式在一个bean方法中定义了整个流,已经做到了尽可能简洁。Integration Flows类初始化构建器API,我们可以通过这个API来定义流。

在程序清单9.4中,我们首先从名为textInchannel的通道接收消息,然后进入一个转换器,将消息载荷转换成大写形式。通过转换器之后,消息会由出站通道适配器来进行处理。这个适配器是由Spring Integration file模块的Files类型创建的。最后,通过对get()的调用返回要构建的IntegrationFlow。简而言之,这个bean方法定义了与XML和Java配置样例相同的集成流。

你可能已经发现,与Java配置样例类似,我们不需要显式声明通道bean。我们引用了textInChannel,如果该名字对应的通道不存在,那么Spring Integration会自动创建它。不过,我们也可以显式声明bean。

对于连接转换器和出站通道适配器的通道,我们甚至没有通过名字引用它。如果需要显式配置通道,那么我们可以在流定义的时候通过调用channel()来引用它的名称:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.channel(MessageChannels.direct("fileWriterChannel"))
.handle(Files
.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}

在使用Spring Integration的Java DSL(与其他的fluent API类似)的时候,我们必须要巧妙地使用空格来保持可读性。在这里的样例中,我小心翼翼地使用缩进来保证代码块的可读性。对于更长、更复杂的流,我们甚至可以考虑将流的一部分抽取到单独的方法或子流中,以实现更好的可读性。

现在,我们已经看到了如何使用3种不同的方式来定义一个简单的流。接下来,我们回过头来看一下Spring Integration的全景。

第9章 Spring集成

本章内容:
  • 实时处理数据
  • 定义集成流
  • 使用Spring Integration的Java DSL定义
  • 与Email、文件系统和其他外部系统进行集成

在旅行时,最让我感到沮丧的一件事就是长途飞行时的互联网连接非常差,或者根本就没有。我喜欢利用空中时间完成一些工作,这本书的很多内容就是这样写出来的。但是,如果没有网络连接,恰好我又想获取某个库或者查看一个JavaDoc文档,那么我就无能为力了。因此,我现在会随身带一本书,以便于在这种场合下阅读。

就像我们需要连接互联网才能提高生产效率一样,很多应用都需要连接外部系统才能完成它们的功能。应用程序可能需要读取或发送Email、与外部API交互或者对写入数据库的数据做出反应。而且,由于数据是在外部系统读取或写入的,应用可能需要以某种方式处理这些数据,这样才能转换为应用程序自己的领域类。

在本章中,我们将会看到如何使用Spring Integration实现通用的集成模式。Spring Integration是众多集成模式的现成实现,这些模式在Gregor Hohpe和Bobby Woolf编写的《企业集成模式》(Enterprise Integration Patterns,Addison-Wesley,2003)中进行了归类。每个模式都实现为一个组件,消息会通过该组件在管道中传递数据。借助Spring配置,我们可以将这些组件组装成一个管道,数据可以通过这个管道来流动。我们从定义一个简单的集成流开始,这个流包含了Spring Integration的众多特性和特点。

第2部分 Spring集成

第2部分的章节将会涵盖Spring应用与其他应用集成的话题。

第6章将扩展第2章对Spring MVC的讨论,介绍如何在Spring中编写REST API。我们将会看到如何使用Spring MVC定义REST端点、启用超媒体REST资源以及使用Spring Data REST自动生成基于repository的REST端点。第7章转换视角,关注Spring应用如何消费REST API的话题。在第8章中,我们将会学习如何借助异步通信技术让Spring发送和接收Java Message Service (JMS)、RabbitMQ与Kafka的消息。在最后的第9章中,我们将探讨使用Spring Integration项目实现声明式应用集成的话题。我们会涵盖实时处理数据、定义集成流以及与外部系统(如Email和文件系统)集成的功能。

实例070 实例化Class类的几种方式

实例说明

Java的数据类型可以分成两类,即引用类型和原始类型。无论哪种类型的对象,Java虚拟机都会实例化不可变的java.lang.Class对象。它提供了在运行时检查对象属性的方法,这些属性包括它的成员和类型信息。更重要的是Class对象是所有反射APl的入口。本实例将演示如何获得Class对象。实例的运行效果如图7.15所示。
多学两招:Class类是泛型类,可以使用@SuppressWarnings("unchecked")忽略泛型或者使用Class<?>类型。

实现过程

(2)编写类ClassTest,在该类的main()方法中,演示各种获得Class对象的方法。关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.mingrisoft;

import java.util.Date;

public class ClassTest
{
@SuppressWarnings("unchecked")
public static void main(String[] args) throws ClassNotFoundException
{
System.out.println("第1种方法:Object.getClass()");
// 1.使用getClass()方式获得Class对象
Class c1 = new Date().getClass();
// 输出对象名称
System.out.println(c1.getName());
System.out.println("第2种方法:类名.class语法");
// 2.使用 类名.class 语法获得Class对象
Class c2 = boolean.class;
// 输出对象名称
System.out.println(c2.getName());
System.out.println("第3种方法:Class.forName()");
// 3.使用Class.forName()获得Class对象
Class c3 = Class.forName("java.lang.String");
// 输出对象名称
System.out.println(c3.getName());
System.out.println("第4种方法:包装类.TYPE");
// 4.使用包装类获得Class对象
Class c4 = Double.TYPE;
// 输出对象名称
System.out.println(c4.getName());
}
}

运行效果:

1
2
3
4
5
6
7
8
第1种方法:Object.getClass()
java.util.Date
第2种方法:类名.class语法
boolean
第3种方法:Class.forName()
java.lang.String
第4种方法:包装类.TYPE
double

技术要点

通常有以下4种方式可以获得Class对象。

  • Object.getClass():如果一个类的对象可用,则最简单的获得Class的方法是使用Object.getClass()。当然,这种方式只对引用类型有用。
  • .class语法:如果类型可用,但没有对象则可以在类型后加上“.class”来获得Class对象。这也是使原始类型(基本数据类型)获得Class对象的最简单的方式。
  • Class.forName("类全名"):如果知道类的全名,则可以使用静态方法Class.forName("类全名")来获得Class对象。该方法不能用在原始类型(基本数据类型)上,但是可以用在原始类型数组上。
  • 包装类的TYPE域:每个原始类型和void都有包装类,利用其TYPE域就可以获得Class对象。

实例071 查看类的声明

实例说明

通常类的声明包括常见修饰符(publicprotectedprivateabstractstaticfinalstrictfp等)、类的名称、类的泛型参数、类的继承类(实现的接口)和类的注解等。本实例将演示如何用反射获得这些信息。实例的运行效果如图7.16所示。

实现过程

编写类ClassDeclarationViewer,在main()方法中输出了与类声明相关的各个项。关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.mingrisoft;

import java.lang.annotation.Annotation;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;

public class ClassDeclarationViewer
{
public static void main(String[] args) throws ClassNotFoundException
{
// 获得ArrayList类对象
Class<?> clazz = Class.forName("java.util.ArrayList");
System.out.println("类的标准名称:" + clazz.getCanonicalName());
System.out.println("类的修饰符:" + Modifier.toString(clazz.getModifiers()));

// 输出类的泛型参数
TypeVariable<?>[] typeVariables = clazz.getTypeParameters();
System.out.print("类的泛型参数:");
if (typeVariables.length != 0)
{
for (TypeVariable<?> typeVariable : typeVariables)
{
System.out.println(typeVariable + "\t");
}
} else
{
System.out.println("空");
}

// 输出类所实现的所有接口
Type[] interfaces = clazz.getGenericInterfaces();
System.out.println("类所实现的接口:");
if (interfaces.length != 0)
{
for (Type type : interfaces)
{
System.out.println("\t" + type);
}
} else
{
System.out.println("\t" + "空");
}

// 输出类的直接继承类,如果是继承自Object则返回空
Type superClass = clazz.getGenericSuperclass();
System.out.print("类的直接继承类:");
if (superClass != null)
{
System.out.println(superClass);
} else
{
System.out.println("空");
}

// 输出类的所有注释信息,有些注释信息是不能用反射获得的
Annotation[] annotations = clazz.getAnnotations();
System.out.print("类的注解:");
if (annotations.length != 0)
{
for (Annotation annotation : annotations)
{
System.out.println("\t" + annotation);
}
} else
{
System.out.println("空");
}
}
}

多学两招:通常只能通过API来查看类的定义,不过Java反射还提供了另一种方式来获得类的信息,读者也可以在程序中使用这些信息。另外,使用getInterfaces()方法也可以获得对象类的所有接口,但是不包含泛型信息。即使是getSuperclass()方法也不能获得有泛型信息的父类。

技术要点

Class类的实例表示正在运行的Java应用程序中的类和接口。枚举是一种类,注释是一种接口。每个数组属于被映射为Class对象的一个类,所有具有相同元素类型和维数的数组都共享该Class对象。基本的Java类型(booleanbytecharshortintlongfoatdouble)和关键字void也表示为Class对象。它没有公共构造方法。Class对象是在加载类时由Java虚拟机以及通过调用类加载器中的defineClass方法自动构造的。本实例使用的方法如表71所示。

方法 描述
static Class<?> forName(String className) Returns the Class object associated with the class or interface with the given string name.
TypeVariable<Class<T>>[] getTypeParameters() Returns an array of TypeVariable objects that represent the type variables declared by the generic declaration represented by this GenericDeclaration object, in declaration order.
Type[] getGenericInterfaces() Returns the s representing the interfaces directly implemented by the class or interface represented by this object.Type