10.3 使用常见的反应式操作

Flux和Mono是Reactor提供的最基础的构建块,而这两种反应式类型所提供的操作符则是组合使用它们以构建数据流动管线的黏合剂。Flux和Mono共有500多个操作,这些操作都可以大致归类为:

  • 创建操作;
  • 组合操作;
  • 转换操作;
  • 逻辑操作。

虽然对这500多个操作一一探讨会非常有趣,但是本章的篇幅有限,所以我在本节中选择一些有用的操作来进行说明。下面让我们从创建操作开始吧。

注意:Mono的例子呢?因为Mono和Flux的很多操作是相同的,所以没有必要针对Mono和Flux重复进行介绍。此外,虽然Mono的操作也很有用,但是相比而言,Flux上的操作更有趣。我们的大多数示例会使用Flux。读者只需要知道Mono上通常具有相同的名称的操作即可。

10.3.1 创建反应式类型

在Spring中使用反应式类型时,我们通常将会从repository或者service中获取Flux或Mono,并不需要我们自行创建。偶尔,我们可能需要创建一个新的反应式Publisher。

Reactor提供了多种创建Flux和Mono的操作。这里,我们将介绍其中一些有用的创建操作。

根据对象创建

如果你有一个或多个对象,并想据此创建Flux或Mono,那么可以使用Flux或Mono上的静态just()方法来创建一个反应式类型,它们的数据会由这些对象来驱动。例如,下面的测试方法将从5个String对象中创建一个Flux:

1
2
3
4
5
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}

此时我们已经创建了Flux,但是它还没有订阅者。如果没有任何的订阅者,那么数据将不会流动。回想一下花园软管的比喻,假设我们已经将花园软管连接到了水龙头上,另一侧是水厂的水——但是在你打开水龙头之前,水不会流动。订阅反应式类型就如同你打开数据流的水龙头。

要添加一个订阅者,我们可以在Flux上调用subscribe()方法:

1
2
3
fruitFlux.subscribe(
f -> System.out.println("Here's some fruit: " + f)
);

这里传递给subscribe()方法的lambda表达式实际上是一个java.util.Consumer,用来创建反应式流的Subscriber。在调用subscribe()之后,数据会开始流动。在这个例子中,没有中间操作,所以数据从Flux直接流向订阅者。

将来自Flux或Mono的数据项打印到控制台是观察反应式类型运行方式的好方法,实际测试Flux或Mono更好的方法是使用Reactor提供的StepVerifier。对于给定的Flux或Mono,StepVerifier将会订阅该反应式类型,在数据流过时对数据应用断言,并在最后验证反应式流是否按预期完成。

例如,要验证预定义的数据是否流经了fruitFlux,我们可以编写如下所示的测试代码:

1
2
3
4
5
6
7
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();

在这个例子中,StepVerifier订阅了fruitFlux,然后断言Flux中的每个数据项是否与预期的水果名称相匹配。最后,它验证Flux在发布完“Strawberry”之后,整个fruitFlux正常完成。

对于本章的其他例子,你可以使用StepVerifier来编写测试,验证Flux或者Mono行为,研究相应的工作原理,从而帮助你学习和了解Reactor中最有用的操作。

根据集合创建

我们还可以根据数组、Iterable或者Java Stream创建Flux。图10.3使用弹珠图展示如何使用这种方式进行创建。

image-20211016185450377

图10.3 可以根据数组、Iterable或者Java Stream创建Flux

要根据数组创建Flux,可以调用Flux上的静态方法fromArray(),并传入一个源数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}

因为该源数组包含了之前从对象列表创建Flux时所使用的相同的水果名称,所以该Flux发布的数据会有相同的值,可以使用和之前相同的StepVerifier来验证。

如果我们需要根据java.util.List、java.util.Set或者其他任意java.lang.Iterable的实现来创建Flux,那么可以将其传递给静态的fromIterable()方法:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
// ... verify steps
}

或者,我们有一个Java Stream,并且希望将其用作Flux的源,那么可以调用fromStream()方法:

1
2
3
4
5
6
7
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
// ... verify steps
}

同样,我们可以使用和之前一样的StepVerifier来验证该Flux发布的数据。

生成Flux的数据

有时候我们根本没有可用的数据,而只是想要一个作为计数器的Flux,它会在每次发送新值时增加1。要创建一个计数器Flux,我们可以使用静态方法range()。图10.4说明了range()方法的工作原理。

image-20211016185716613

图10.4 从区间创建的Flux会以类似计数器的方式发布消息

下面的测试方法展示了如何创建一个区间Flux:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux =
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}

在这个例子中,我们创建了一个区间Flux,起始值为1,结束值为5。StepVerifier证明了它将发布5个条目,即整数1到5。

另一个与range()方法类似的Flux创建方法是interval()。与range()方法一样,interval()方法会创建一个发布递增值的Flux。interval()的特殊之处在于,我们不是给它设置一个起始值和结束值,而是指定一个应该每隔多长时间发出值的间隔时间。图10.5展示了interval()方法创建Flux的弹珠图。

image-20211016185806209

图10.5 根据指定间隔创建的Flux会周期性地发布条目

例如,要创建一个每秒发布一个值的Flux,你可以使用Flux上的静态interval() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}

需要注意的是,通过interval()方法创建的Flux会从0开始发布值,并且后续的条目依次递增。此外,因为interval()方法没有指定最大值,所以它可能会永远运行。我们也可以使用take()方法将结果限制为前5个条目。我们将在下一节中详细讨论take()方法。

10.3.2 组合反应式类型

有时候,我们会需要操作两种反应式类型,并以某种方式将它们合并在一起。或者,在其他情况下,我们可能需要将Flux拆分为多种反应式类型。在本节中,我们将研究组合以及拆分Reactor的Flux和Mono的操作。

合并反应式类型

假设我们有两个Flux流,并且需要据此创建一个结果Flux,这个形成的Flux会在任意上游Flux流有数据时产生数据。要将一个Flux与另一个Flux合并,可以使用mergeWith()方法,如图10.6中的弹珠图所示。

image-20211016185959408

图10.6 合并两个Flux流(它们的消息将会交错合并为一个新的Flux)

例如,假设有一个值是电视和电影角色名称的Flux,还有另一个值是这些角色喜欢吃的食物的名称的Flux。下面的测试方法展示了如何使用mergeWith()方法合并两个Flux对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void mergeFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();
}

通常,Flux会尽可能快地发布数据。因此,我们在创建的两个Flux流上使用delayElements()方法来减慢它们的速度——每500毫秒发布一个条目。此外,为了使食物Flux在角色名称Flux之后再开始流式传输,我们调用了食物Flux上的delaySubscription()方法,以便它在订阅后再经过250毫秒后才开始发布数据。

在合并了两个Flux对象后,将会创建一个新的合并过后的Flux。当StepVerifier订阅这个合并过后的Flux时,它将依次订阅两个源Flux流并启动数据流。

这个合并过后的Flux数据项发布顺序与源Flux的发布时间一致。因为两个Flux对象都设置为以常规速率进行发布,所以这些值在合并后的Flux中会交错在一起,结果是:一个角色、一个食物、另一个角色、另一个食物,以此类推。如果任何一个Flux的计时发生变化,那么你可能会看到接连发布了两个角色或者两个食物。

因为mergeWith()方法不能完美地保证源Flux之间的先后顺序,所以我们可以考虑使用zip()方法。当两个Flux对象压缩在一起的时候,它将会产生一个新的发布元组的Flux,其中每个元组中都包含了来自每个源Flux的数据项。图10.7说明了如何将两个Flux对象压缩在一起。

image-20211016190043625

图10.7 通过zip()方法合并两个Flux流

要查看zip()操作实际如何运行,可以考虑如下所示的测试方法,它将角色Flux和食物Flux合并在一起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}

需要注意的是,与mergeWith()方法不同,zip()方法是一个静态的创建操作。创建出来的Flux在角色和他们喜欢的食物之间会完美对齐。从这个合并后的Flux发出的每个条目都是一个Tuple2(一个容纳两个其他对象的容器对象)的实例,其中包含了来自每个源Flux的数据项,并保持着它们发布的顺序。

如果你不想使用Tuple2,而想要使用其他类型,就可以为zip()方法提供一个合并函数来生成你想要的任何对象,合并函数会传入这两个数据项(如图10.8中的弹珠图所示)。

image-20211016190146793

图10.8 zip操作的另一种形式(从每个传入Flux中各取一个元素,然后创建消息对象,并产生这些消息组成的Flux)

例如,下面的测试方法会将角色Flux与食物Flux合并在一起,以便生成一个包含String对象的Flux:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}

传递给zip()方法(在这里是一个lambda)的函数只是简单地将两个数据项组装成一个句子,然后通过该合并后的Flux发布出去。

选择第一个反应式类型进行发布

假设我们有两个Flux对象,此时我们不想将它们合并在一起,而是想要创建一个新的Flux,让这个新的Flux从第一个产生值的Flux中发布值。如图10.9所示,first()操作会在两个Flux对象中选择第一个发布值的Flux,并再次发布它的值。

image-20211016192744713

图10.9 first操作将会选择第一个发布消息的Flux并只发布该Flux的值

下面的测试方法创建了一个快速的Flux和一个“缓慢”的Flux(其中“慢”意味着它在被订阅后100毫秒才会发布数据项)。使用first()方法,它将会创建一个新的Flux,这个Flux只会获取第一个源Flux发布的值,并再次发布:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void firstFlux() {
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}

在这种情况下,因为慢速Flux会在快速Flux开始发布之后的100毫秒才发布值,所以新创建的Flux将会简单地忽略慢的Flux,并仅发布来自快速Flux的值。

10.3.3 转换和过滤反应式流

在数据流经一个流时,我们通常需要过滤掉某些值并对其他的值进行处理。在这一节,我们将介绍转换和过滤流经反应式流的数据的操作。

从反应式类型中过滤数据

数据在从Flux流出时,进行过滤的最基本方法之一是简单地忽略第一批指定数目的数据项。skip操作(如图10.10所示)就能完成这样的工作。

image-20211016192837833

图10.10 skip操作跳过指定数目的消息并将剩下的消息继续在结果Flux上进行传递

针对具有多个数据项的Flux,skip操作将创建一个新的Flux,它会首先跳过指定数量的数据项,然后从源Flux中发布剩余的数据项。下面的测试方法展示如何使用skip()方法:

1
2
3
4
5
6
7
8
9
@Test
public void skipAFew() {
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
StepVerifier.create(skipFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}

在这个场景下,我们有一个具有5个String数据项的Flux。在这个Flux上调用skip(3)方法后会产生一个新的Flux,它会跳过前3个数据项,只发布最后2个数据项。

但是,你可能并不想跳过特定数量的条目,而是想要在一段时间之内跳过所有的第一批数据。这是skip()操作的另一种形式,将会产生一个新Flux,在发布来自源Flux的数据项之前等待指定的一段时间,如图10.11所示。

image-20211016192910234

图10.11 skip操作的另一种形式

下面的测试方法使用skip操作创建了一个在发布值之前会等待4秒的Flux。因为Flux是基于一个在发布数据项之间有1秒延迟的Flux创建的(使用了delayElements()操作),所以它只会发布出最后两个数据项:

1
2
3
4
5
6
7
8
9
10
@Test
public void skipAFewSeconds() {
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(skipFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}

我们已经看过skip操作的示例,根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前面几个数据项,而take操作只发布第一批指定数量的数据项,然后将取消订阅(如图10.12中的弹珠图所示)。

1
2
3
4
5
6
7
8
9
10
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.take(3);
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}

image-20211016193007459

图10.12 take操作只发布传入Flux中前面指定数目的数据项

与skip()方法一样,take()方法也有另一种替代形式,基于间隔时间而不是数据项个数。它将接受并发布与源Flux一样多的数据项,直到某段时间结束,之后Flux将会完成,如图10.13所示。

image-20211016193028191

图10.13 take操作的另一种形式(在指定的时间过期之前,一直将消息传递给结果Flux)

下面的测试方法使用take()方法的另一种形式,将会在订阅之后的前3.5秒发布数据条目。

1
2
3
4
5
6
7
8
9
10
11
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}

skip操作和take操作都可以被认为是过滤操作,其过滤条件是基于计数或者持续时间的,而Flux值的更通用过滤则是filter操作。

我们需要指定一个Predicate,用于决定数据项是否能通过Flux,filter操作允许我们根据任何条件进行选择性地发布。图10.14中的弹珠图显示了filter操作的工作原理。

image-20211016193305271

图10.14 可以对传入的Flux进行过滤,这样结果Flux将只会发布满足指定Predicate的消息

要查看filter()的实际效果,可以参考下面的测试方法:

1
2
3
4
5
6
7
8
9
10
@Test
public void filter() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.filter(np -> !np.contains(" "));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Zion")
.verifyComplete();
}

这里我们将一个只接受不包含空格的字符串的Predicate作为lambda传给了filter()方法,因此在结果Flux中”Grand Canyon”和”Grand Teton”被过滤掉了。

我们还可能想要过滤掉已经接收过的数据条目,可以采用distinct操作(如图10.15所示),形成的Flux将只会发布源Flux中尚未发布过的数据项。

image-20211016193416475

图10.15 distinct操作将会过滤掉重复的消息

在下面的测试中,调用distinct()方法产生的Flux只会发布不同的String值:

1
2
3
4
5
6
7
8
9
@Test
public void distinct() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
StepVerifier.create(animalFlux)
.expectNext("dog", "cat", "bird", "anteater")
.verifyComplete();
}

虽然”dog”和”bird”从源Flux中都发布了两次,但是在调用distinct()方法产生的结果Flux中,它们只被发布了一次。

映射反应式数据

在Flux或Mono上最常见的操作之一就是将已发布的数据项转换为其他的形式或类型。Reactor的反应式类型(Flux和Mono)为此提供了map和flatMap操作。

map操作会创建一个新的Flux,只是在重新发布它所接收的每个对象之前会执行给定Function指定的转换。map操作的工作原理如图10.16所示。

image-20211016193528270

图10.16 map操作将传入的消息转换为结果流上的新消息

在下面的test()方法中,包含代表篮球运动员名字的String值的Flux被转换为一个包含Player对象的新Flux。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Jordan"))
.expectNext(new Player("Scottie", "Pippen"))
.expectNext(new Player("Steve", "Kerr"))
.verifyComplete();
}

以lambda形式传递给map()方法的函数会将传入的String值按照空格进行拆分,并使用生成的String数组来创建Player对象。使用just()方法创建的Flux包含了String对象,但是map()方法产生的Flux包含了Player对象。

其中重要的一点是:在每个数据项被源Flux发布时,map操作是同步执行的,如果你想要异步地转换过程,那么你应该考虑使用flatMap操作。

对于flatMap操作,我们可能需要一些思考和练习才能完全掌握。如图10.17所示,flatMap并不像map操作那样简单地将一个对象转换到另一个对象,而是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()方法结合使用时,flatMap操作可以释放Reactor反应式的异步能力。

image-20211016193621512

图10.17 flatMap操作使用一个中间的Flux来实现异步转换

下面的测试方法展示如何使用flatMap()方法和subscribeOn()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
List<Player> playerList = Arrays.asList(
new Player("Michael", "Jordan"),
new Player("Scottie", "Pippen"),
new Player("Steve", "Kerr"));
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}

需要注意的是,我们为flatMap()方法指定了一个lambda形式的函数,传入的String将会转换为一个Mono类型的String,然后在这个Mono上通过map()方法将字符串转换为一个Player。

如果到此为止,那么产生的Flux将同样包含Player对象,与使用map()方法的例子相同,顺序同步地生成。但是我们对Mono做的最后一个动作就是调用subscribeOn()方法,它声明每个订阅都应该在并行线程中进行,因此可以异步并行地执行多个String对象的转换操作。

尽管subscribeOn()方法的命名与subscribe()方法类似,但是它们的含义却完全不同。 subscribe()方法是一个动词,订阅并驱动反应式流;而subscribeOn()方法则更具描述性,指定了如何并发地处理订阅。Reactor本身并不强制使用特定的并发模型,通过subscribeOn()方法,我们可以使用Schedulers中的任意一个静态方法来指定并发模型。在这个例子中,我们使用了parallel()方法,使用来自固定线程池(大小与CPU核心数量相同)的工作线程。Schedulers支持多种并发模型,如表10.1所示。

表10.1 Schedulers支持的并发模型

epub_29101559_80

使用flatMap()和subscribeOn()的好处是:我们可以在多个并行线程之间拆分工作,从而增加流的吞吐量。因为工作是并行完成的,无法保证哪项工作首先完成,所以结果Flux中数据项的发布顺序是未知的。因此,StepVerifier只能验证发出的每个数据项是否存在于预期Player对象列表中,并且在Flux完成之前会有3个这样的数据项。

在反应式流上缓存数据

在处理流经Flux的数据时,你可能会发现将数据流拆分为小块会带来一定的收益。如图10.18所示的buffer操作可以帮助你解决这个问题。

image-20211016193756362

图10.18 buffer操作会产生一个新的包含列表Flux(具备最大长度限制的列表,包含从传入的Flux中收集来的数据)

我们给定一个包含多个String值的Flux,其中每个值代表一种水果的名称,我们可以创建一个新的包含List集合的Flux,其中每个List只有不超过指定数量的元素:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}

在这种情况下,String元素的Flux被缓冲到一个新的包含List集合的Flux中,其中每个集合不超过3个条目。因此,发出5个String值的原始Flux将会被转换为一个新的Flux,它会发出两个List集合,其中一个包含3个水果,而另一个包含2个水果。

这有什么意义呢?将反应式的Flux缓冲到非反应式的Flux中看起来适得其反,但是,当组合使用buffer()方法和flatMap()方法时,我们可以对每个List集合进行并行处理。

1
2
3
4
5
6
7
8
9
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();

在这个新例子中,我们仍然将5个String值的Flux缓冲到一个新的包含List的Flux中,但是这次将flatMap()应用于包含List集合的Flux。这将获取每个List缓冲区,并为其中的元素创建一个新的Flux,然后对其应用map操作。因此,每个List缓冲区都会在各个线程中执行进一步并行处理。

为了观察实际的效果,在代码中还包括了一个log()操作,它用于每个子Flux。log()操作记录了所有的反应式事件,以便于观察实际发生了什么事情。在日志中将会记录如下的条目(为简洁起见,删除了时间戳的部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
[main] INFO reactor.Flux.SubscribeOn.1 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()

如同日志记录所清晰展示的,第一个缓冲区(apple、orange和banana)中的水果在parallel-1线程中处理;与此同时,第二个缓冲区(kiwi和strawberry)中的水果在parallel-2线程中处理。从缓冲区的日志记录交织在一起的事实可以明显地看出,对两个缓冲区的处理是并行执行的。

如果由于某些原因需要将Flux发布的所有数据项都收集到一个List中,那么可以使用不带参数的buffer()方法:

1
Flux<List<String>> bufferedFlux = fruitFlux.buffer();

这将会产生一个新的Flux。这个Flux将会发布一个List,其中包含源Flux发布的所有数据项。我们可以使用collectList操作实现相同的功能,如图10.19中的弹珠图所示。

image-20211016195110114

图10.19 collectList操作将产生一个包含传入Flux发布的所有消息的Mono

collectList()方法会产生一个发布List的Mono,而不是发布List的Flux。下面的测试方法展示了它的用法:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void collectList() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier
.create(fruitListMono)
.expectNext(Arrays.asList(
"apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}

一种更加有趣的收集Flux发出的数据项的方法是将它们收集到Map中。如图10.20所示,collectMap操作将会产生一个发布Map的Mono,这个Map中填充了由给定Function计算key值所生成的条目。

image-20211016195145928

图10.20 collectMap操作将会产生一个Mono(包含了由传入Flux所发出的消息产生的Map,这个Map的key是从传入消息的某些特征衍生而来的)

要查看collectMap()的效果,请参考下面的测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void collectMap() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(animalMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('a').equals("aardvark") &&
map.get('e').equals("eagle") &&
map.get('k').equals("kangaroo");
})
.verifyComplete();
}

源Flux会发布一些动物的名字。基于这个Flux,我们使用collectMap操作创建了一个发布Map的新Mono,其中key由动物名称的首字母确定,而值则为动物名称本身。如果两个动物名称以相同的字母开头(如elephant和eagle,或者koala和kangaroo),那么流经该流的最后一个条目将会覆盖先前的条目。

10.3.4 在反应式类型上执行逻辑操作

有时候我们想要知道由Mono或者Flux发布的条目是否满足某些条件,那么all()和any()方法可以实现这样的逻辑。图10.21和图10.22展示了all()和any()的工作方式。

image-20211016195234658

图10.21 可以使用all()方法来确保Flux中的所有消息都满足某些条件

image-20211016195251014

图10.22 可以使用any()方法来确保Flux中至少有一个消息满足某些条件

假设我们想知道Flux发布的每个String中是否都包含了字母a或字母k,那么下面的测试将使用all()方法来检查这个条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void all() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}

在第一个StepVerifier中,我们检查了字母a。all()方法应用于源Flux,会产生布尔类型的Mono。在本例中,所有动物名称都包含了字母a,所以从生成的Mono中会发布true。但是在第二个StepVerifier中,产生的Mono将会发出false,因为并非所有动物名称都包含字母k。

如果至少有一个元素匹配条件即可,而不是要求所有元素均满足条件,那么any()就是我们所需的方法。下面这个新的测试用例使用any()来检查字母t和z:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void any() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}

在第一个StepVerifier中,我们会看到生成的Mono发布了true,因为至少有一种动物的名称含有字母t(尤其是elephant)。而在第二种情况下,生成的Mono发布了false,因为没有任何一种动物的名称包含字母z。

第3部分 反应式Spring

在第3部分,我们将探索Spring对反应式编程提供的全新支持。第10章讨论使用Reactor项目进行反应式编程的基础知识。Reactor项目是支撑Spring 5反应式特性的反应式编程库。然后,我们将会介绍Reactor中一些常用的反应式操作。在第11章中,我们会重新探讨REST API的开发,介绍全新的Web框架Spring WebFlex。该框架借用了很多Spring MVC的理念,为Web开发提供了新的反应式模型。第12章总结第3部分,介绍如何通过Spring Data对Cassandra和Mongo数据库进行读写,实现反应式数据持久化。

9.4 小结

  • 借助Spring Integration能够定义流,在进入和离开应用的时候可以对数据进行处理。
  • 集成流可以使用XML、Java或简洁的Java DSL配置风格来进行定义。
  • 消息网关和通道适配器会作为集成流的入口和出口。
  • 在流动的过程中,消息可以进行转换、切分、聚合、路由,也可以由服务激活器对其进行处理。
  • 消息通道连接集成流中的各个组件。

第10章 理解反应式编程

本章内容:
  • 反应式编程概览
  • Reactor项目简介
  • 反应式地处理数据

你曾有过订阅报纸或者杂志的经历吗?互联网的确从传统的出版发行商那儿分得了一杯羹,但是过去订阅报纸真的是我们了解时事的最佳方式。那时,我们每天早上都会收到一份新鲜出炉的报纸,并在早饭时间或上班路上阅读。

现在假设一下,在支付完订阅费用之后,几天的时间过去了,你却没有收到任何报纸。又过了几天,你打电话给报社的销售部门询问为什么还没有收到报纸。想象一下,如果他们告诉你:“因为你支付的是一整年的订阅费用,而现在这一年还没有结束,当这一年结束时,你肯定可以一次性完整地收到它们。”那么你会有多么惊讶。

值得庆幸的是,这并非订阅的真正运作方式。报纸具有一定的时效性。在出版后,报纸需要及时投递,以确保在阅读它们时内容仍然是新鲜的。此外,当你在阅读最新一期的报纸时,记者们正在为未来的版本撰写内容,同时印刷机正在满速运转,印刷下一期的内容——一切都是并行的。

在开发应用程序代码时,我们可以编写两种风格的代码,即命令式和反应式。

  • 命令式(Imperative)的代码:非常类似于上文所提的虚构的报纸订阅方式。它由一组任务组成,每次只运行一项任务,每项任务又都依赖于前面的任务。数据会按批次进行处理,在前一项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下一项处理任务。
  • 反应式(Reactive)的代码:非常类似于真实的报纸订阅方式。它定义了一组用来处理数据的任务,但是这些任务可以并行地执行。每项任务处理数据的一部分子集,并将结果交给处理流程中的下一项任务,同时继续处理数据的另一部分子集。

在本章中,我们将暂别Taco Cloud应用程序,转而探索Reactor项目。Reactor是一个反应式编程库,同时也是Spring家族的一部分。它是Spring 5反应式编程功能的基础,所以在我们学习使用Spring构建反应式控制器和repository之前,理解Reactor是非常重要的。不过,在我们开始学习Reactor之前,还需要花点时间研究一下反应式编程的基本要素。

10.1 反应式编程概览

反应式编程是一种可以替代命令式编程的编程范式。这种可替代性存在的原因在于反应式编程解决了命令式编程中的一些限制。理解这些限制,有助于你更好地理解反应式编程模型的优点。

注意:反应式编程不是银弹。你不应该从这一章或者其他任何关于反应式编程的讨论中得出“命令式编程是邪恶的,而反应式编程才是你的救星”的结论。如同我们作为开发者学习到的任何技术一样,反应式编程对于某些使用场景来说的确是完美的,但是在其他的一些场景中可能不那么适用。建议以实用主义为上。

如果你和我以及绝大多数的开发者一样,是从命令式编程开始入行的,那么很可能你现在编写的大部分(或者所有)代码在将来依然是命令式的。命令式编程相当直观,没有编程经验的学生们可以在学校的STEM教育课程中轻松地学习它,而且足够强大。在驱动大型企业运行的代码中,绝大部分都是命令式的。

它的理念很简单:你可以一次一个地按照顺序将代码编写为需要遵循的指令列表。在某项任务开始执行之后,程序在开始下一项任务之前需要等待当前任务完成。在整个处理过程中的每一步,要处理的数据都必须是完全可用的,以便将它们作为一个整体进行处理。

一开始一切都很美好,直到我们遇到问题。在执行一项任务的时候,特别是IO任务(将数据写入DB或者从远程服务器获取数据),触发这项任务的线程实际上是被阻塞的,在任务完成之前它不能做任何事情。坦白来说,阻塞线程是一种浪费。

大多数编程语言(包括Java)都支持并发编程。在Java中创建一个线程,然后让它执行某些操作,而调用线程继续执行其他工作,这是相当容易实现的。虽然创建线程很简单,但是这些线程多半最终会被阻塞。管理多线程中的并发极具挑战,而更多线程则意味着更多的复杂性。

相比之下,反应式编程本质上是函数式的和声明式的。相对于描述一组将依次执行的步骤,反应式编程描述了数据将会流经的管道或者流。相对于要求将被处理的数据作为一个整体进行处理,反应式流可以在数据可用时立即开始处理。实际上,传入的数据可能是无限的(比如,一个某个地理位置的实时温度测量数据的恒定流)。

拿现实世界类比一下,可以将命令式编程看作是水气球,而将反应式编程看作是花园里的软管。在夏天,这两者都是偷袭和愉悦毫无戒心的朋友的好方式,但是它们的运作方式却不同:

  • 水气球只能一次性地填满有效载荷,并在撞到目标时弄湿对象。水气球的容量有限,如果你想要弄湿更多人(或者把同一个人弄得更加湿透一点),那么唯一的选择就是增加水气球的数量。
  • 花园软管的有效载荷是从水龙头到喷嘴的水流。在特定的时间点,花园软管的容量可能是有限的,但是在打水仗的过程中它的容量却是无限的。只要水源源不断地从龙头流入软管中,水就会源源不断地从喷嘴喷出去。同一个软管非常好扩展,你可以尽情地和更多的朋友打水仗。

虽然水气球(或者命令式编程)没有什么固有的问题,但是持有软管(或者能够应用反应式编程)的人通常在伸缩性和性能方面更具优势。

定义反应式流

反应式流(Reactive Streams)是由Netflix、Lightbend和Pivotal(Spring背后的公司)的工程师于2013年年底开始制定的一种规范。反应式流旨在提供无阻塞回压的异步流处理标准。

我们已经触及了反应式编程的异步特性,它使我们能够并行执行任务,从而实现更高的可伸缩性。通过回压,数据消费者可以限制它们想要处理的数据数量,避免被过快的数据源所淹没。

Java的流和反应式流

Java的流和反应式流之间有很多相似之处。首先,它们的名字中都有流(Stream)这个词。它们还提供了用于处理数据的函数式API。事实上,正如你稍后将会在我们介绍Reactor时看到的那样,它们甚至可以共享许多相同的操作。

Java的流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使用函数来对集合进行迭代的一种方式。

反应式流支持异步处理任意大小的数据集,同样也包括无限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。

反应式流规范可以总结为4个接口:Publisher、Subscriber、Subscription和Processor。Publisher负责生成数据,并将数据发送给Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法subscribe(),Subscriber可以通过该方法向Publisher发起订阅。

反应式流规范可以总结为4个接口:Publisher、Subscriber、Subscription和Processor。Publisher负责生成数据,并将数据发送给Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法subscribe(),Subscriber可以通过该方法向Publisher发起订阅。

1
2
3
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}

一旦Subscriber订阅成功,就可以接收来自Publisher的事件。这些事件是通过Subscriber接口上的方法发送的:

1
2
3
4
5
6
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onNext(T item);
void onError(Throwable ex);
void onComplete();
}

Subscriber的第一个事件是通过对onSubscribe()方法的调用接收的。Publisher调用onSubscribe() 方法时,会将Subscription对象传递给Subscriber。通过Subscription,Subscriber可以管理其订阅情况:

1
2
3
4
public interface Subscription {
void request(long n);
void cancel();
}

Subscriber可以通过调用request()方法来请求Publisher发送数据,或者通过调用cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用request()时,Subscriber可以传入一个long类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免Publisher发送多于Subscriber能够处理的数据量。在Publisher发送完所请求数量的数据项之后,Subscriber可以再次调用request()方法来请求更多的数据。

Subscriber请求数据之后,数据就会开始流经反应式流。Publisher发布的每个数据项都会通过调用Subscriber的onNext()方法递交给Subscriber。如果有任何错误,就会调用onError()方法。如果Publisher没有更多的数据,也不会继续产生更多的数据,那么将会调用Subscriber的onComplete()方法来告知Subscriber它已经结束。

至于Processor接口,它是Subscriber和Publisher的组合,如下所示:

1
2
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {}

当作为Subscriber时,Processor会接收数据并以某种方式对数据进行处理。然后它会将角色转变为Publisher,并将处理的结果发布给它的Subscriber。

正如你所看到的,反应式流的规范非常简单,很容易就能想出如何构建一个以Publisher作为开始的数据处理管道,并让数据通过零个或多个Processor,然后将最终结果投递给Subscriber。

然而,反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。我们将会在后面的内容中看到,Reactor构成了Spring 5反应式编程模型的基础。在本章的其余部分,我们将会探讨(并且,我敢说这个过程非常有意思)Reactor项目。

9.3 创建Email集成流

我们决定Taco Cloud应该允许客户通过Email提交taco设计和创建订单。我们发放传单并在报纸上刊登外卖广告,邀请每个人通过Email发送taco订单。这非常成功!但是,令人遗憾的是,它过于成功了。有太多的Email涌了进来,我们不得不申请临时帮助,让别人阅读所有的Email并将订单提交到订单系统中。

在本节中,我们将会实现一个集成流,轮询Taco Cloud的taco订单Email的收件箱、解析Email中的订单细节并将订单提交给Taco Cloud来进行处理。简而言之,在我们所创建的集成流中,入站通道适配器将会使用Email端点模块摄取TacoCloud收件箱中的Email到集成流中。

集成流的下一步会将Email解析为订单对象,这些订单对象会被传递给另一个处理器,从而将订单提交至Taco Cloud的REST API中,在这里我们会像其他订单那样处理它们。首先,我们定义一个简单的配置属性类,它会捕获处理Taco CloudEmail的特定信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
@ConfigurationProperties(prefix="tacocloud.email")
@Component
public class EmailProperties {
private String username;
private String password;
private String host;
private String mailbox;
private long pollRate = 30000;
public String getImapUrl() {
return String.format("imaps://%s:%s@%s/%s",
this.username, this.password, this.host, this.mailbox);
}
}

我们可以看到,EmailProperties会捕获生成IMAP URL的属性。这个流会使用这个URL连接Taco Cloud Email服务器并轮询Email。在捕获的属性中包括Email用户的用户名和密码以及IMAP服务器的主机、要轮询的邮箱以及邮箱轮询的频率(默认为30秒)。

EmailProperties在类级别使用了@ConfigurationProperties注解,并将prefix属性设置为tacocloud.email。这意味着,我们可以在application.yml文件中按照下述方式配置消费Email的详细信息:

1
2
3
4
5
6
7
tacocloud:
email:
host: imap.tacocloud.com
mailbox: INBOX
username: taco-in-flow
password: 1L0v3T4c0s
poll-rate: 10000

现在,我们使用EmailProperties来配置集成流。我们想要创建的流大致如图9.10所示。

image-20211016100608960

图9.10 通过Email接受taco订单的集成流

我们有两种方案来定义这个流。

  • 在Taco Cloud应用中进行定义:在流的结束点,服务激活器要调用我们之前定义的创建订单的repository。
  • 在单独的应用中进行定义:在流的结束点,服务激活器要发送POST请求到Taco Cloud API以提交taco订单。

不管选择哪种方式,除了服务激活器的实现方式之外,对流的本身影响并不大。但是,因为我们需要一些表示taco、订单和配料的类型,它们与Taco Cloud主应用可能会略微有所差异,所以我们会在一个单独的应用中定义集成流,避免与已有的领域类型相混淆。

我们还可以选择使用XML配置、Java配置或者Java DSL来定义流。我更喜欢DSL的优雅,所以在这里将会使用这种方案。如果你想要一些额外的挑战,也可以选择其他配置风格编写流的定义。现在,我们看一下taco Email订单流的Java DSL配置,如程序清单9.5所示。

程序清单9.5 定义接收Email并将其提交为订单的集成流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package tacos.email;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
@Configuration
public class TacoOrderEmailIntegrationConfig {
@Bean
public IntegrationFlow tacoOrderEmailFlow(
EmailProperties emailProps,
EmailToOrderTransformer emailToOrderTransformer,
OrderSubmitMessageHandler orderSubmitHandler) {
return IntegrationFlows
.from(Mail.imapInboundAdapter(emailProps.getImapUrl()),
e -> e.poller(
Pollers.fixedDelay(emailProps.getPollRate())))
.transform(emailToOrderTransformer)
.handle(orderSubmitHandler)
.get();
}
}

根据tacoOrderEmailFlow()方法的定义,taco Email订单流由3个不同的组件组成。

  • MAP Email入站通道适配器:使用IMP URL创建,而URL是根据EmailProperties的getImapUrl()方法创建的,并且会根据EmailProperties中设置的pollRate属性进行轮询。传入的Email会传递给一个通道,然后连接到转换器。
  • 将Email转换成订单对象的转换器:转换器是通过EmailToOrderTransformer实现的,它会注入tacoOrderEmailFlow()方法中。转换所形成的订单会通过另外一个通道传递给最后一个组件。
  • 处理器(作为出站通道适配器):处理器接受订单对象并将其提交至TacoCloud的REST API。

我们只有将Email端点模块作为依赖项添加到项目构建文件中,才能调用Mail.imap InboundAdapter()。Maven依赖如下所示:

1
2
3
4
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>

EmailToOrderTransformer是Spring Integration Transformer接口的实现,扩展了AbstractMailMessageTransformer(如程序清单9.6所示)。

程序清单9.6 使用集成转换器将传入的Email转换为taco订单

1
2
3
4
5
6
7
8
9
10
11
@Component
public class EmailToOrderTransformer
extends AbstractMailMessageTransformer<Order> {
@Override
protected AbstractIntegrationMessageBuilder<Order>
doTransform(Message mailMessage) throws Exception {
Order tacoOrder = processPayload(mailMessage);
return MessageBuilder.withPayload(tacoOrder);
}
...
}

AbstractMailMessageTransformer是一个很便利的基类,适用于载荷为Email的消息。它会抽取传入消息Email的信息,并将它放到一个Message对象中,传递给doTransform()方法。在doTransform()方法中,我们将Message对象传递给一个名为processPayload()的private方法,将Email解析为Order对象。这个Order对象尽管和主Taco Cloud应用中的Order对象有些相似,但是并不完全相同,这里更加简单一些:

1
2
3
4
5
6
7
8
9
10
11
12
package tacos.email;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
@Data
public class Order {
private final String email;
private List<Taco> tacos = new ArrayList<>();
public void addTaco(Taco taco) {
this.tacos.add(taco);
}
}

这个Order类不包含客户完整的投递信息和账单信息,而是只携带了客户的Email地址(通过传入的Email获取的)。

将Email解析成订单是一项非常重要的任务。实际上,即便最简单的实现也需要几十行代码。这些代码对于进一步讨论Spring Integration和如何实现转换器并没有任何助益。所以,为了节省空间,我在这里省略了processPayload()方法的细节。

EmailToOrderTransformer做的最后一件事情就是返回一个MessageBuilder,让消息的载荷中包含Order对象。MessageBuilder所生成的消息会发送至集成流的最后一个组件:将订单提交至Taco Cloud API的消息处理器。OrderSubmitMessageHandler实现了Spring Integration的GenericHandler,它会处理带有Order载荷的消息,如程序清单9.7所示。

程序清单9.7 通过消息处理器将订单提交至Taco Cloud API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package tacos.email;
import java.util.Map;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@Component
public class OrderSubmitMessageHandler
implements GenericHandler<Order> {
private RestTemplate rest;
private ApiProperties apiProps;
public OrderSubmitMessageHandler(
ApiProperties apiProps, RestTemplate rest) {
this.apiProps = apiProps;
this.rest = rest;
}
@Override
public Object handle(Order order, Map<String, Object> headers) {
rest.postForObject(apiProps.getUrl(), order, String.class);
return null;
}
}

为了满足GenericHandler接口的要求,OrderSubmitMessageHandler重写了handle()方法,这个方法接收传入的Order对象,并使用注入的RestTemplate利用POST请求将Order提交至ApiProperties对象指定的URL。最后,handle()方法返回null,表明这个处理器是流的终点。

这里使用ApiProperties避免在postForObject()时硬编码URL。它是一个配置属性类,如下所示:

1
2
3
4
5
6
@Data
@ConfigurationProperties(prefix="tacocloud.api")
@Component
public class ApiProperties {
private String url;
}

在application.yml中,Taco Cloud API的URL可能会配置如下:

1
2
3
tacocloud:
api:
url: http://api.tacocloud.com

为了让这个应用能够使用RestTemplate,并自动注入OrderSubmitMessageHandler中,我们需要在项目的构建文件中添加SpringBoot web starter依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

这不仅会将RestTemplate添加到类路径中,还会触发Spring MVC的自动配置功能。作为独立的Spring Integration流,这个应用并不需要Spring MVC,更不需要自动配置所提供的嵌入式Tomcat。所以,我们可以在application.yml中通过如下的配置条目禁用Spring MVC的自动配置:

1
2
3
spring:
main:
web-application-type: none

spring.main.web-application-type属性可以设置为servlet、reactive或none。当Spring MVC位于类路径之中时,自动配置功能会将其设置为servlet。我们在这里将其重写为none,所以Spring MVC和Tomcat将不会进行自动配置(我们将会在第11章介绍反应式Web应用是什么样子的)。

10.2 初识Reactor

反应式编程要求我们采取和命令式编程不一样的思维方式。此时我们不会再描述每一步要进行的步骤,反应式编程意味着要构建数据将要流经的管道。当数据流经管道时,可以对它们进行某种形式的修改或者使用。

例如,假设我们想要接受一个英文人名,然后将所有的字母都转换为大写,并用得到的结果创建一个问候消息,并最终打印它。使用命令式编程模型,代码看起来如下所示:

1
2
3
4
String name = "Craig";
String capitalName = name.toUpperCase();
String greeting = "Hello, " + capitalName + "!";
System.out.println(greeting);

使用命令式编程模型,每行代码执行一个步骤,按部就班,并且肯定在同一个线程中进行。每一步在执行完成之前都会阻止执行线程执行下一步。

与之不同,如下的函数式、反应式代码完成了相同的事情:

1
2
3
4
5
6
7
8
9

String name = "Craig";
String capitalName = name.toUpperCase();
String greeting = "Hello, " + capitalName + "!";
System.out.println(greeting);
Mono.just("Craig")
.map(n -> n.toUpperCase())
.map(cn -> "Hello, " + cn + "!")
.subscribe(System.out::println);

不用过度关心这个例子中的细节,我们很快将会详细讨论just()、map()和subscribe() 方法。现在,重要的是要理解:虽然这个反应式的例子看起来依然保持着按步骤执行的模型,但实际是数据会流经处理管线。在处理管线的每一步,都对数据进行了某种形式的加工,但是我们不能判断数据会在哪个线程上执行操作。它们既可能在同一个线程,也可能在不同的线程。

这个例子中的Mono是Reactor的两种核心类型之一,另一个类型是Flux。两者都实现了反应式流的Publisher接口。Flux代表具有零个、一个或者多个(可能是无限个)数据项的管道。Mono是一种特殊的反应式类型,针对数据项不超过一个的场景,它进行了优化。

Reactor与RxJava(ReactiveX)的对比

如果你熟悉RxJava或者ReactiveX,那么你可能认为Mono和Flux类似于Observable和Single。事实上它们不仅在语义上大致相同,还共享了很多相同的操作符。

虽然我们在本书中主要介绍Reactor,但是Reactor和RxJava的类型可以互相转换,我相信你对这一点会感到很开心。甚至,在接下来的章节中我们还会看到,Spring也可以使用RxJava的类型。

实际上,在前面的例子中有3个Mono。其中,just() 操作创建了第一个Mono。当该Mono发送一个值的时候,这个值被传递给了将字母转换为大写的map()操作,据此又创建了另一个Mono。当第二个Mono发布它的数据时,数据被传递给了第二个map()操作,并且会在此进行一些字符串连接操作,而结果将用于创建第三个Mono。最后,对第三个Mono上的subscribe()方法调用时,会接收数据并将数据打印出来。

10.2.1 绘制反应式流图

反应式流程通常使用弹珠图(marble diagram)表示,如图10.1所示。弹珠图的展现形式非常简单,在顶部描述了数据流经Flux或者Mono的时间线,在中间描述了要执行的操作,在底部描述了结果形成的Flux或者Mono的时间线。我们将会看到,当数据流经原始的Flux时,某些操作将会对它进行处理,并产生一个新的Flux,已经处理过的数据将会在新Flux中流动。

image-20211016111138194

图10.1 描绘Flux基本流程的弹珠图

图10.2展示了一个类似的弹珠图,但是针对的是Mono。我们可以看到,这里主要的不同是Mono将会有零个或者一个数据项,或者一个错误。

image-20211016111229961

图10.2 描绘Mono基本流程的弹珠图

10.2.2 添加Reactor依赖

要开始使用Reactor,请将下面的依赖项添加到项目的构建文件中:

1
2
3
4
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

Reactor还提供了非常棒的测试支持。我们将会围绕Reactor代码编写大量的测试,因此绝对需要将下面的依赖添加到构建文件中:

1
2
3
4
5
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

如果你计划将这些依赖添加到一个Spring Boot工程中,那么Spring Boot工程会替你管理依赖。但是,如果要在非Spring Boot项目中使用Reactor,就需要在构建文件中设置Reactor的BOM(Bill Of Materials,物料清单)。下面的依赖管理条目将会把Reactor的Bismuth版本添加到构建文件中:

1
2
3
4
5
6
7
8
9
10
11
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

现在Reactor已经位于工程的构建文件中了,我们可以开始使用Mono和Flux来创建反应式的处理管线。在本章的剩余部分,我们将介绍Mono和Flux所提供的几个操作。

10.4 小结

  • 反应式编程会涉及创建数据流经的处理管道。
  • 反应式流规范定义了4种类型:Publisher、Subscriber、Subscription和Processor(它是Publisher和Subscriber的组合)。
  • Reactor项目实现了反应式流规范,将反应式流的定义抽象为两个主要的类型,即Flux和Mono,并为每种类型都提供数百个操作。
  • Spring 5利用Reactor提供了反应式控制器、repository、REST客户端以及其他反应式框架的支持。

我们已经了解了反应式编程和Reactor项目,现在可以开始在Spring应用程序中使用这些技术了。在本章中,我们将利用Spring 5的反应式编程模型重新讨论在第6章中编写的控制器。

具体来讲,我们将一起探讨Spring 5中新添加的反应式Web框架——SpringWebFlux。我们很快会发现,Spring WebFlux与Spring MVC非常相似,这样使得它非常易于使用,我们已经掌握的如何在Spring中构建REST API的知识依然有用。

11.1 使用Spring WebFlux

传统的基于Servlet的Web框架,如Spring MVC,在本质上都是阻塞和多线程的,每个连接都会使用一个线程。在请求处理的时候,会在线程池中拉取一个工作者(worker)线程来对请求进行处理。同时,请求线程是阻塞的,直到工作者线程提示它已经完成为止。

这样带来的后果就是阻塞式Web框架在大量请求下无法有效地扩展。缓慢的工作者线程所带来的延迟会使情况变得更糟,因为它将花费更长的时间才能将工作者线程送回池中,准备处理另一个请求。在某些场景中,这种设计完全可以接受。事实上,在很大程度上这就是十多年来大多数Web应用程序的开发方式,但是时代在改变。

这些Web应用程序的客户端以前是偶尔浏览网站的人们,而现在这些人会频繁消费内容而且会使用与HTTP API协作的应用程序。如今,物联网(甚至不需要人类)产生了汽车、喷气式发动机和其他非传统的客户端,它们会持续地和Web API交换数据。随着消费Web应用的客户端越来越多,可扩展性比以往任何时候都更加重要。

异步的Web框架能够以更少的线程获得更高的可扩展性,通常它们只需要与CPU核心数量相同的线程。通过使用所谓的事件轮询(event looping)机制(如图11.1所示),这些框架能够用一个线程处理很多请求,这样每次连接的成本会更低。

image-20211016212143634

图11.1 异步Web框架借助事件轮询机制能够以更少的线程处理更多的请求

在事件轮询中,所有事情都是以事件的方式来进行处理的,包括请求以及密集型操作(如数据库和网络操作)的回调。当需要执行成本高昂的操作时,事件轮询会为该操作注册一个回调,这样操作可以并行执行,而事件轮询则会继续处理其他的事件。

当操作完成时,事件轮询机制会将其作为一个事件,这一点与请求是相同的。这样达到的效果就是,在面临大量负载的时候,异步Web框架能够以更少的线程实现更好的可扩展性,这样会减少线程管理的开销。

Spring 5引入了一个非阻塞、异步的Web框架,该框架在很大程度上是基于Reactor项目的,能够解决Web应用和API中对更好的可扩展性的需求。接下来我们看一下Spring WebFlux:面向Spring的反应式Web框架。

11.1.1 Spring WebFlux简介

当Spring团队思考如何向Web层添加反应式编程模型时,如果不在Spring MVC中做大量工作,显然很难实现这一点。这会在代码中产生分支以决定是否要以反应式的方式来处理请求。如果这样做,本质上就是将两个Web框架打包成一个,依靠if语句来区分反应式和非反应式。

与其将反应式编程模型硬塞进Spring MVC中,还不如创建一个单独的反应式Web框架,并尽可能多地借鉴Spring MVC。这样,Spring WebFlux就应运而生了。Spring 5定义的完整Web开发技术栈如图11.2所示。

image-20211016212213773

图11.2 Spring 5通过名为WebFlux的新Web框架来支持反应式Web应用

在图11.2的左侧,我们会看到Spring MVC技术栈,这是Spring框架2.5版本就引入的。Spring MVC(在第2章和第6章已经进行了讨论)建立在Java Servlet API之上,因此需要Servlet容器(比如Tomcat)才能执行。

与之不同,Spring WebFlux(在图11.2的右侧,和Spring MVC系出同门,并且很多核心组件都是公用的)并不会绑定Servlet API,所以它构建在Reactive HTTPAPI之上,这个API与Servlet API具有相同的功能,只不过是采用了反应式的方式。因为Spring WebFlux没有与Servlet API耦合,所以它的运行并不需要Servlet容器。它可以运行在任意非阻塞Web容器中,包括Netty、Undertow、Tomcat、Jetty或任意Servlet 3.1及以上的容器。

在图11.2中,最值得注意的是左上角,它代表了Spring MVC和Spring WebFlux公用的组件,主要用来定义控制器的注解。因为Spring MVC和Spring WebFlux会使用相同的注解,所以Spring WebFlux与Spring MVC在很多方面并没有区别。

右上角的方框表示另一种编程模型,它使用函数式编程范式来定义控制器,而不是使用注解。在11.2节中,我们将更多地讨论Spring的函数式Web编程模型。

Spring MVC和Spring WebFlux之间最显著的区别在于,我们要将哪个依赖项添加到构建文件中。在使用Spring WebFlux时,我们需要添加Spring Boot WebFluxstarter依赖项,而不是标准的Web starter(例如,spring-boot-starter-web)。在项目的pom.xml文件中,如下所示:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
注意:与Spring Boot的大多数starter依赖类似,这个starter也可以在Initializr中通过选中Reactive Web复选框添加到项目中。

使用WebFlux有一个很有意思的副作用,即WebFlux的默认嵌入式服务器是Netty而不是Tomcat。Netty是一个异步、事件驱动的服务器,非常适合SpringWebFlux这样的反应式Web框架。

除了使用不同的starter依赖之外,Spring WebFlux的控制器方法要接受和返回反应式类型,如Mono和Flux,而不是领域类型和集合。Spring WebFlux控制器也能处理RxJava类型,如Observable、Single和Completable。

反应式Spring MVC

尽管Spring WebFlux控制器通常会返回Mono和Flux,但是这并不意味着SpringMVC无法体验反应式类型的乐趣。如果你愿意,那么Spring MVC也可以返回Mono和Flux。

这里的区别在于,这些类型会如何被使用。Spring WebFlux是真正的反应式Web框架,允许在事件轮询中处理请求;而Spring MVC是基于Servlet的,依赖于多线程来处理多个请求。

接下来,我们让Spring WebFlux运行起来,借助Spring WebFlux重新编写TacoCloud的API控制器。

11.1.2 编写反应式控制器

你可能还记得在第6章中我们为Taco Cloud的REST API创建了一些控制器,这些控制器中包含请求处理方法,这些方法会以领域类型(如Order和Taco)或领域类型集合的方式处理输入和输出。作为提醒,我们看一下在第6章所编写的DesignTacoController片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RequestMapping(path="/design",
produces="application/json")
@CrossOrigin(origins="*")
public class DesignTacoController {
...
@GetMapping("/recent")
public Iterable<Taco> recentTacos() {
PageRequest page = PageRequest.of(
0, 12, Sort.by("createdAt").descending());
return tacoRepo.findAll(page).getContent();
}
...
}

按照以上的编写形式,recentTacos()控制器会处理对“/design/recent”的HTTPGET请求,返回最近创建的taco列表。具体来讲,它会返回Taco类型的Iterable。这主要是因为repository的findAll()方法返回的就是该类型,或者更准确地说,这个结果来自findAll()方法所返回的Page对象的getContent()方法。

这样能够很好地运行,但是Iterable并不是反应式类型。我们不能对它使用任何反应式操作,也不能让框架将它视为反应式类型,从而将工作切分到多个线程中。我们希望recentTacos()方法能够返回Flux<Taco>

这里有一个简单但效果有限的方案:重写recentTacos(),将Iterable转换为Flux。而且,在重写的时候,我们可以去掉分页代码,将其替换为调用Flux的take():

1
2
3
4
@GetMapping("/recent")
public Flux<Taco> recentTacos() {
return Flux.fromIterable(tacoRepo.findAll()).take(12);
}

借助Flux.fromIterable(),我们可以将Iterable<Taco>转换为Flux<Taco>。既然我们可以使用Flux了,那么就能使用take操作将Flux返回的值限制为最多12个Taco对象。不仅代码更加简洁,而且我们能够处理反应式的Flux,而不是简单的Iterable。

到目前为止,我们编写反应式代码一切都很顺利。如果repository一开始就给我们一个Flux那就更好了,就没有必要进行转换了。如果能够实现这一点,那么recentTacos()将会写成如下形式:

1
2
3
4
@GetMapping("/recent")
public Flux<Taco> recentTacos() {
return tacoRepo.findAll().take(12);
}

这样就更好了!在理想情况下,反应式控制器将会位于反应式端到端栈的顶部,这个栈包括了控制器、repository、数据库以及在它们之间可能还会包含的服务。这样的端到端反应式栈如图11.3所示。

image-20211016212402176

图11.3 它应该成为完整的端到端反应式栈的一部分(为了最大化反应式Web框架的收益)

这样的端到端技术栈要求repository返回Flux,而不是Iterable。在第12章中,我们将会详细研究如何编写反应式repository,但是反应式TacoRepository大致会如下所示:

1
2
3
public interface TacoRepository
extends ReactiveCrudRepository<Taco, Long> {
}

此时,最需要注意的事情在于除了使用Flux来替换Iterable以及如何获取Flux之外,定义反应式WebFlux控制器的编程模型与非反应式Spring MVC控制器并没有什么差异。它们都使用了@RestController注解并且都在类级别使用了@RequestMapping。它们都有在方法级别使用@GetMapping注解的请求处理函数。真正重要的是处理器方法返回了什么类型。

另外值得注意的是,尽管我们从repository得到了Flux<Taco>,但是我们直接将它返回了,并没有调用subscribe()。框架将会为我们调用subscribe()。这意味着当处理“/design/recent”请求的时候,recentTacos()方法会被调用,在数据真正从数据库取出之前它就能立即返回。

返回单个值

作为另外一个样例,我们思考一下在第6章中编写的DesignTacoController的tacoById()方法:

1
2
3
4
5
6
7
8
@GetMapping("/{id}")
public Taco tacoById(@PathVariable("id") Long id) {
Optional<Taco> optTaco = tacoRepo.findById(id);
if (optTaco.isPresent()) {
return optTaco.get();
}
return null;
}

在这里,该方法处理对“/design/{id}”的GET请求并返回单个Taco对象。因为repository的findById()返回的是Optional,所以我们必须编写一些烦琐的代码处理它。如果findById()返回的是Mono<Taco>,而不是Optional<Taco>,那么我们可以按照如下的方式重写控制器的tacoById():

1
2
3
4
@GetMapping("/{id}")
public Mono<Taco> tacoById(@PathVariable("id") Long id) {
return tacoRepo.findById(id);
}

这样看上去简单多了。更重要的是,通过返回Mono<Taco>来替代Taco,我们能够让Spring WebFlux以反应式的方式处理响应。这样做的结果就是在面临高负载的时候我们的API能够更好地进行扩展。

使用RxJava类型

值得一提的是,在使用Spring WebFlux时,虽然Flux和Mono是自然而然的选择,但是我们也可以使用像Observable和Single这样的RxJava类型。例如,假设在DesignTacoController和后端repository之间有一个服务,处理的是RxJava类型,那么recentTacos()方法可以编写为:

1
2
3
4
@GetMapping("/recent")
public Observable<Taco> recentTacos() {
return tacoService.getRecentTacos();
}

类似的,tacoById()方法可以编写成处理RxJava Single类型,而不是Mono类型:

1
2
3
4
@GetMapping("/{id}")
public Single<Taco> tacoById(@PathVariable("id") Long id) {
return tacoService.lookupTaco(id);
}

除此之外,Spring WebFlux控制器方法还可以返回RxJava的Completable,后者等价于Reactor中的Mono<Void>。WebFlux也可以返回Flowable,替换Observable或Reactor的Flux。

实现输入的反应式

到目前为止,我们只关心了控制器方法返回什么样的反应式类型。但是,借助Spring WebFlux,我们还可以接受Mono或Flux作为处理器方法的输入。为了阐述这一点,请思考DesignTacoController中原始的postTaco()实现:

1
2
3
4
5
@PostMapping(consumes="application/json")
@ResponseStatus(HttpStatus.CREATED)
public Taco postTaco(@RequestBody Taco taco) {
return tacoRepo.save(taco);
}

按照原始的编写方式,postTaco()不仅会返回一个简单的Taco对象,还会接受一个Taco,这个对象绑定了请求体中的内容。这意味着在请求载荷完成解析并初始化为Taco对象之前,postTaco()方法是不会被调用的。这同时也意味着,在对repository的save()方法的阻塞调用返回之前,postTaco()是不能返回的。简而言之,这个请求阻塞了两次:在进入postTaco()的时候以及在postTaco()调用的过程中。通过为postTaco()添加一些反应式代码,我们能够将它变成完全非阻塞的请求处理方法:

1
2
3
4
5
@PostMapping(consumes="application/json")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Taco> postTaco(@RequestBody Mono<Taco> tacoMono) {
return tacoRepo.saveAll(tacoMono).next();
}

在这里,postTaco()接受一个Mono<Taco>并调用了repository的saveAll()方法。我们将会在第12章看到这个repository能够接受反应式流Publisher的任意实现,包括Mono或Flux。saveAll()方法返回了一个Flux<Taco>,但我们想要的是Mono。我们知道该Flux最多只能发布一个Taco,所以调用next()方法获取postTaco()方法要返回的Mono<Taco>

通过接受Mono<Taco>作为输入,方法会立即调用,不用等待从请求体中解析生成Taco。另外,repository也是反应式的,它接受一个Mono并立即返回Flux<Taco>,所以我们调用Flux的next()来获取最终的Mono<Taco>。方法在请求真正处理之前就能返回。

Spring WebFlux是一个非常棒的Spring MVC替代方案,提供了与Spring MVC相同的开发模型来编写反应式Web应用。其实Spring 5还有另外一项技巧,下面让我们看看如何使用Spring 5的新函数式编程风格创建反应式API。