12.1 理解Spring Data的反应式概况

12.1 理解Spring Data的反应式概况

从Spring Data Kay release train开始,Spring Data首次提供对反应式repository的支持,其中包括使用Cassandra、MongoDB、Couchbase或Redis持久化数据的反应式编程模型。

名称的由来

尽管Spring Data的各个项目都有自己的节奏,但是它们都按照一个releasetrain来进行发布,每个release train的命名对应计算机科学中一个重要人物的名字。

这些名字是按照字母排序的,比如Babbage、Codd、Dijkstra、Evans、Fowler、Gosling、Hopper和Ingalls。在编写本书的时候,最新的releasetrain版本是Spring Data Kay,这是根据Alan Kay来命名的,Alan Kay是Smalltalk编程语言的设计者之一。

你可能也发现了,在这里我并没有提到关系型数据库或JPA。令人遗憾的是,目前还没有对反应式JPA的支持。尽管关系型数据库依然是行业中使用最广泛的数据库方案,但是要让Spring Data JPA支持反应式编程模型,需要数据库和相关的JDBC都支持非阻塞的反应式模型。不幸的是,至少目前还不支持关系数据库的反应式处理。希望这种情况能在不久的将来得到解决。[^1]

本章的重点是使用Spring Data为支持反应式模型的数据库开发使用反应式类型的repository。我们首先对比一下Spring Data的反应式模型和非反应式模型。

12.1.1 Spring Data反应式本质论

Spring Data反应式的本质可以概括为一句话,那就是在反应式repository的方法中,要接受和返回Mono和Flux,而不是领域实体和集合。根据配料类型,从后端数据库中获取Ingredient对象的repository,可以声明为如下的repository接口:

1
Flux<Ingredient> findByType(Ingredient.Type type);

我们可以看到,这个findByType()方法会返回Flux<Ingredient>,而不是像对应的非反应式实现那样返回List<Ingredient>Iterable<Ingredient>

类似的,在保存Taco的时候,repository的saveAll()方法签名如下所示:

1
<Taco> Flux<Taco> saveAll(Publisher<Taco> tacoPublisher);

在本例中,saveAll()方法接受一个Taco类型的Publisher(可能是Mono<Taco>Flux<Taco>)并返回一个Flux<Taco>。这与非反应式的repository是不同的,它的save()方法直接处理领域类型,接受Taco对象并返回保存的Taco对象。

简而言之,Spring Data的反应式repository与我们在第3章看到的Spring Data的非反应式repository共享几乎相同的编程模型。唯一重要的区别是,反应式repository的方法接受和返回Flux和Mono,而不是原始的领域类型和集合。

12.1.2 反应式和非反应式类型之间的转换

在进一步研究如何使用Spring Data编写反应式repository之前,我们看一下如何解决遗留的巨大问题。我们可能已经使用了关系型数据库,将数据迁移至SpringData反应式编程模型支持的4种数据库之一是不太现实的,那是否就意味着我们无法在应用中使用反应式编程了呢?

从头到尾使用反应式模型(包括数据库层面)时,我们才能够得到反应式编程的全部收益,但是在非反应式数据库之上使用反应式流的话,我们也能得到一部分收益。即便我们所选择的数据库不支持非阻塞的反应式查询,我们依然可以以阻塞的方式获取数据并将其转换为反应式类型,从而使上游组件从中收益。

例如,假设我们正在使用关系型数据库并利用Spring Data JPA进行持久化。我们的OrderRepository可能会有一个如下签名的方法:

1
List<Order> findByUser(User user);

这个方法会返回一个非反应式的List<Order>,包含给定User的所有Order信息。当findByUser()被调用的时候,查询执行的过程中该方法会阻塞,结果会收集到一个List中。因为List并不是反应式类型,所以我们不能在它上面执行Flux提供的任何操作。另外,如果调用者是控制器,那么它无法以反应式的方式处理结果,实现提高可扩展性的目的。

在JPA repository的阻塞性方面我们确实无能为力。但是,我们可以在接收到非反应式List的时候就将其转换成Flux,这样我们就可以从这里开始以反应式的方式处理结果了。为了实现这一点,我们可以使用Flux.fromIterable():

1
2
List<Order> orders = repo.findByUser(someUser);
Flux<Order> orderFlux = Flux.fromIterable(orders);

与之类似,如果我们想要根据ID获取一个Order,我们就可以立即将其转换为Mono:

1
2
Order order repo.findById(Long id);
Mono<Order> orderMono = Mono.just(order);

通过使用Mono.just()和Flux的fromIterable()、fromArray()和fromStream()方法,我们可以将非反应式阻塞代码隔离在repository中,在应用的其他地方,我们都可以使用反应式类型。

那反方向怎么样呢?如果我们有一个Mono或Flux,此时需要调用非反应式JPArepository的save()方法又该怎么办呢?好消息是,Mono和Flux都提供了将它们发布的数据抽取到领域类型或Iterable中的操作。

例如,假设WebFlux控制器接受的是Mono<Taco>,那么我们需要使用SpringData JPA repository的save()方法将其保存起来。没有问题,我们只需调用Mono的block()方法就可以抽取Taco对象:

1
2
Taco taco = tacoMono.block();
tacoRepo.save(taco);

顾名思义,block()方法会执行一个阻塞操作,完成数据的抽取过程。

如果要从Flux中抽取数据,我们可以使用toIterable()。假设我们有一个Flux<Taco>,并且要调用Spring Data JPA repository的saveAll()方法,如下的代码片段将从Flux<Taco>中抽取Iterable<Taco>

1
2
Iterable<Taco> tacos = tacoFlux.toIterable();
tacoRepo.saveAll(tacos);

与Mono.block()类似,Flux.toIterable()在将Flux发布的对象抽取到Iterable的过程中是阻塞的。因为它们本质上是阻塞的,所以应该谨慎使用Mono.block()和Flux.toIterable(),并且要清楚地认识到使用它们会打破反应式编程模型。

要避免阻塞的抽取操作,还有一种更具反应式的方法,就是订阅Mono或Flux,并在其发布每个元素的时候执行所需的操作。例如,要使用非反应式的repository保存Flux<Taco>发布的Taco对象,我们可以这样做:

1
2
3
tacoFlux.subscribe(taco -> {
tacoRepo.save(taco);
});

虽然调用repository的save()方法依然是非反应式的阻塞操作,但是在消费和处理Flux或Mono发布的数据时,使用subscribe()是一种更自然、更加反应式的方式。

关于非反应式repository,我们已经讨论得够多了。接下来,我们见识一下SpringData反应式功能的真正威力,为Taco Cloud应用创建反应式repository。

12.1.3 开发反应式repository

正如我们在第3章中看到的那样,Spring Data最令人赞叹的特性之一就是我们只须声明repository接口即可,在运行时Spring Data会自动实现它们。在那一章中,我们主要关注Spring Data JPA,但是同样的编程模型也适用于非关系数据库,包括Cassandra和MongoDB。

除了Spring Data Cassandra和Spring Data MongoDB对非反应式repository的支持之外,它们都提供了反应式的编程模型。这些数据库在后端提供数据持久化功能,Spring应用可以真正实现从Web层到数据库的端到端反应式流。我们首先看看如何使用反应式Spring Data repository将数据持久化到Cassandra。

[^1]: Spring Data R2DBC致力于解决关系型数据库的反应式访问问题。——译者注