13.2 搭建服务注册中心

Spring Cloud是一个非常大的伞形项目,由多个独立的子项目组成,每个子项目都以某种形式支撑着微服务的开发。其中有一个子项目叫作Spring Cloud Netflix,它按照Spring的编码风格重新提供了Netflix的多个组件。在这些组件中包括了Netflix的服务注册中心Eureka。

Eureka赤裸裸的历史真相

Eureka这个词最初的含义是当人们找到或发现某件事情时所发出的欢呼。这使得Eureka非常适合用作服务注册中心的名称,微服务要借助注册中心实现彼此发现的功能。

据传说,Eureka最早是由希腊物理学家阿基米德发明的,他坐在浴缸里的时候发现了浮力的原理,于是他跳出浴缸,赤裸裸地跑回家,嘴里喊着“Eureka!”。

关于阿基米德是否真的光着身子跑回家并大喊“Eureka!”还有一些争论,但无论如何,这个故事非常有意思。话说回来,我们倒是可以衣冠整洁地使用Eureka服务注册中心。

在微服务应用中,Eureka会担当所有服务的注册中心。Eureka本身也可以视为一个微服务,只不过在整体应用中它的目的是让其他的服务能够互相发现。

鉴于它在微服务应用中的角色,在创建需要注册的服务之前,我们最好搭建一个Eureka服务注册中心。为了理解Eureka的运行原理,我们可以参见图13.1所述的流动过程。

当服务实例启动的时候,它会按照名称将自己注册到Eureka中。在图13.1中,服务的名称为“some-service”。“some-service”可能会有多个完全等价的实例,但是在Eureka注册时,它们的名称是相同的。

image-20211020204011914

图13.1 服务使用Eureka服务注册中心进行注册(这样其他的服务就能发现并消费它们了)

在某个时间点,另一个服务(图13.1中名为“other-service”)需要使用“some-service”的端点。在这里,“other-service”没有使用特定的主机和端口信息对“some-service”进行硬编码,而是根据名字从Eureka查找“some-service”。Eureka的回应中将会包含它所知道的“some-service”的所有实例。

现在,“other-service”需要做决策了。它该使用“some-service”的哪个实例呢?如果它们都是完全等同的,其实就没什么关系了。为了避免每次都选择同一个实例,最好用一些客户端负载平衡算法来分散请求。这就是Netflix的另一个项目Ribbon的用武之地了。

虽然“other-service”完全可以自行查找和选择“some-service”的实例,但在这里我们让它依赖Ribbon。Ribbon是一个客户端负载平衡器,会帮助“other-service”做出选择。Ribbon做完选择之后,剩下的就是让“other-service”向Ribbon选择的实例发出请求。

为何要使用客户端负载均衡器

通常,我们会认为负载均衡器是一个中心化的服务,它处理所有的请求并将请求分发到多个目标实例中。与之不同,Ribbon是一个客户端负载均衡器,它会在每个客户端上发起请求。

相对于中心化的负载均衡器,Ribbon作为客户端的负载均衡器会有很多额外的收益。因为有一个在客户端本地的负载均衡器,所以负载均衡器能够很自然地按照客户端的数量成比例伸缩。此外,每个负载均衡器都可以配置成最适合对应客户端的负载平衡算法,而不必对所有的服务都使用相同的配置。

如果你觉得它看上去有些复杂,那么不用担心,随后我们就会看到大多数功能都会以自动化、透明的方式来进行处理。在注册和消费服务之前,我们需要先启用Eureka服务器。

要开始使用Spring Cloud和Eureka,我们需要首先为Eureka本身创建一个全新的项目。最简单的方式是使用Spring Initializr,该项目可以使用任何名称,但是我一般会将其称为service-registry。在选择starter依赖的时候,我们只需要一项依赖:带有Eureka Server标签的复选框。在创建完新项目之后,在Initializr为我们生成的项目中,pom.xml将会包含如下依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

在pom.xml文件中,我们还可以看到名为spring-cloud.version的属性以及一个<dependencyManagement>区域,它们指定了Spring Cloud的发布版本。当我创建service-registry的时候,它引用的是Finchley train的第一个服务发布版本(SR1):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<properties>
...
<spring-cloud.version>Finchley.SR1</spring-cloud.version>
</properties>
...
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

如果你想要使用不同版本的Spring Cloud,只需要将spring-cloud.version属性修改为想要的版本即可。

在构建文件中添加完Eureka starter依赖之后,要启用Eureka服务器,我们还需要做一件事情,那就是打开应用的主引导类并为其添加@EnableEurekaServer注解:

1
2
3
4
5
6
7
@SpringBootApplication
@EnableEurekaServer
public class ServiceRegistryApplication {
public static void main(String[] args) {
SpringApplication.run(ServiceRegistryApplication.class, args);
}
}

好的,这样就可以了!如果此时启动应用,Eureka服务注册中心就会运行起来并监听8080端口。如果此时在浏览器上访问http://localhost:8080 ,将会看到如图13.2所示的Web界面。

image-20211020204747659

图13.2 Eureka基于Web的dashboard

Eureka还对外暴露了REST API,借助它们服务可以自行进行注册,也可以发现其他的服务。你可能不会直接使用REST API,但是你会发现“/eureka/apps”端点非常有意思。它会列出注册中心所有服务实例的细节。此时,我们没有注册任何服务,它的响应如下所示。在注册完服务之后,我们还会研究这个端点:

1
2
3
4
<applications>
<versions__delta>1</versions__delta>
<apps__hashcode></apps__hashcode>
</applications>

你会发现,在Eureka的日志中,每隔大约30秒就会打印出一些异常。不用担心,Eureka正在运行,而且完全符合我们的预期。但是,这些异常表明我们还没有完全配置好服务注册中心。接下来,我们添加一些配置属性来消除这些异常。

13.2.1 配置Eureka

Eureka不喜欢独自工作,并相信数量多会更安全的理念,希望能够成为Eureka服务器集群的一部分。如果有多个Eureka服务器,其中有一个遇到问题,就不会出现单点故障。因此,Eureka的默认行为是与其他Eureka服务器建立关联,尝试获取其他Eureka服务器的服务注册中心,甚至还会将自身注册为其他Eureka服务器的服务。

在生产环境中,Eureka的高可用是非常有价值的。但是,对于开发阶段来说,启动多个Eureka服务器既不方便也没有必要。为了达到开发的目的,有一个单独的Eureka服务器就足够了。除非我们正确配置了Eureka服务器,否则它会以日志文件中异常的形式每隔30秒就抱怨孤独状态。这是因为,每隔30秒,Eureka服务器就会尝试与另外的Eureka服务器建立关联,以注册自己并共享其注册中心中的信息。

我们需要做的就是配置Eureka使其接受当前的孤独状态。为了实现这一点,我们需要在application.yml中设置一些属性,代码片段如下所示:

1
2
3
4
5
6
7
8
eureka:
instance:
hostname: localhost
client:
fetch-registry: false
register-with-eureka: false
service-url:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka

首先,我们将eureka.instance.hostname属性设置为localhost。这会告诉Eureka它正运行在哪个主机(host)上。这个属性是可选的,如果我们不指定它,那么Eureka会尝试通过环境变量确定它的主机。明确设置这个属性能够让我们更加确定它的值。

接下来的两个属性是eureka.client.fetch-registry和eureka.client.register-with-eureka。在其他的微服务中,我们可能会通过这两个属性告诉它们该如何与Eureka服务器进行交互。但是,不要忘了,Eureka也是一个微服务,所以这些属性也可以用到Eureka服务器上,以便于告诉它该如何与其他Eureka服务器进行交互。

这两个属性的默认值都是true,表明Eureka应该从其他的Eureka实例获取注册信息,并且应该将自身注册为其他Eureka服务器中的服务。因为在开发模式下并没有其他的Eureka服务器,所以我们将它们设置为false,这样Eureka将不会尝试与其他的Eureka服务器建立关联。

最后,我们还设置了eureka.client.service-url属性。这个属性包含了zone名称与该zone下一个或多个Eureka服务器之间的映射关系。defaultZone是一个特殊的key,如果客户端(在本例中,也就是Eureka本身)没有指定所需的zone,就将会使用这个zone。因为我们只有一个Eureka,映射到默认zone的URL就是Eureka服务器本身,所以这里使用了占位符变量,由其他属性填充它的值。

指定Eureka的服务器端口

尽管不一定是强制要求,但是我们可能想要修改默认的服务器端口。虽然Eureka非常乐意监听8080端口,但是在开发代码的时候我们可能会在本地机器同时运行多个应用(微服务),也就无法让所有的应用均监听8080端口。因此,在本地开发的时候,设置server.port属性通常是一个比较好的做法:

1
2
server:
port: 8761

在这里,我们将端口设置成了8761,这是Eureka客户端(我们将会在13.3节中进行讨论)默认监听的端口。

禁用自我保护模式

另外一个我们需要考虑设置的属性是eureka.server.enable-self-preservation。如果我们启动Eureka服务器并让它空闲一分钟以上,可能就会在Eureka UI上看到一个非常吓人的错误信息,如图13.3所示。

image-20211020205004645

图13.3 在自我保护模式下,Eureka会在dashboard显示信息

尽管这里使用了红色字体和大写字母,但是这条信息并不像看上去那么严重。Eureka希望服务实例能够注册上来,并且每隔30秒向它发送一次注册更新的请求。通常,如果Eureka在3个更新周期(或者说90秒)内没有收到服务的更新请求,就会将该服务注销。在本例中,Eureka假定出现了网络问题,进入自我保护模式,所以不会注销服务实例。

在生产环境中,自我保护模式是很好的,可以防止在出现网络故障时更新请求无法发送至Eureka所导致的活跃服务被注销。但是,在我们第一次启动Eureka并且还没有注册任何服务时候,出现这样的告警会让人产生疑虑。我们可以将eureka.server.enable-self- preservation属性设置为false,从而禁用自我保护模式:

1
2
3
4
eureka:
...
server:
enable-self-preservation: false

这个属性在开发环境中是非常有用的。在开发环境中,基于各种原因,Eureka可能会收不到更新请求。在这种环境下,我们可能会频繁地启动或关闭服务实例,自我保护模式会将已停止服务的注册项保留下来,另一个服务访问已经不可用的服务时就会产生问题。禁用自动保护模式将会防止这种诡异的问题。然而,我们付出的代价就是会看到另一条恐怖的红色信息(见图13.4)。

image-20211020205040204

图13.4 禁用自我保护模式时,提示自我保护模式已禁用

虽然我们在开发环境可以禁用自我保护模式,但是在投入生产环境时需要将其启用。

13.2.2 扩展Eureka

在开发环境中,单个Eureka实例会更加便利;但是在将应用投入生产环境时,我们可能至少需要两个Eureka实例,以实现高可用性。

生产环境可用的Spring Cloud Services

在将微服务部署到生产环境时,有许多需要考虑的因素。Eureka的高可用性和安全性在开发阶段可能并不太重要,但是在生产环境中就非常关键了。如果你是PivotalCloud Foundry或Pivotal Web Services的客户,就可以让他们来关心这些事情了。

Spring Cloud Services提供了一个Eureka实现,同时还包含了配置服务器和断路器dashboard。我们所需要做的就是从marketplace请求一个p-service-registry服务,然后将自己的微服务绑定到该服务上。在marketplace中,配置服务器和断路器dashboard(我们将会在接下来的两章中讨论它们)的名称分别为p-config-server和p-circuit-breaker-dashboard。

配置两个(或更多)Eureka实例最简单直接的方式就是在application.yml中使用Spring profile,然后针对两个profile各启动一次。例如,程序清单13.1中的配置项会将两个Eureka服务器设置为彼此对等的端。

配置两个(或更多)Eureka实例最简单直接的方式就是在application.yml中使用Spring profile,然后针对两个profile各启动一次。例如,程序清单13.1中的配置项会将两个Eureka服务器设置为彼此对等的端。

程序清单13.1 使用Spring profile将Eureka配置成两个对等的端

eureka:
  client:
    service-url:
      defaultZone: http://${other.eureka.host}:${other.eureka.port}/eureka
---
spring:
  profiles: eureka-1
  application:
    name: eureka-1
server:
  port: 8761
eureka:
  instance:
    hostname: eureka1.tacocloud.com
other:
  eureka:
    host: eureka2.tacocloud.com
    port: 8761
---
spring:
  profiles: eureka-2
  application:
    name: eureka-2
server:
  port: 8762
eureka:
  instance:
    hostname: eureka2.tacocloud.com
other:
  eureka:
    host: eureka1.tacocloud.com
    port: 8762

在默认的profile中(位于程序清单13.1顶部),我们用占位符变量来设置eureka.client. service-url.defaultZone属性,这些占位符都是在每个profile特定的配置中设置的。

在默认的profile之后,我们配置了两个profile,分别为eureka-1和eureka-2。每个profile都按照自己的配置需要指定了端口和eureka.instance.hostname。随后,我们设置了两个略显牵强的other.eureka.host和other.eureka.port属性,在每个profile中它们都指向了其他的Eureka实例。这两个属性与框架本身是没有关系的,但是在默认profile的占位符中会引用它们。

注意,我们在这里没有设置eureka.client.fetch-registry或eureka.client.register-with-eureka。它们的默认值为true,因此能够确保每个Eureka服务器都会向对方进行注册,并且能够从其他Eureka服务器上获取注册信息。

目前,Eureka服务注册中心已经启动并处于运行状态了。但是,它现在就像一个没有人查阅的空电话本。只有让服务开始在注册中心注册,并让其他服务查找和调用它们才行,否则我们的工作都是徒劳的。接下来,我们看一下如何让微服务成为Eureka的客户端。

12.4 小结

  • Spring Data支持为Cassandra、MongoDB、Couchbase和Redis数据库创建反应式repository。
  • Spring Data的反应式repository遵循与非反应式repository相同的编程模型,只不过它们所处理的是反应式发布者,如Flux和Mono。
  • 非反应式repository(比如JPA repository)可以调整为使用Mono和Flux,但是在保存和获取数据时它们依然是阻塞的。
  • 在使用非关系数据库时,需要理解如何恰当地为数据建模,这个建模过程决定了数据库最终如何存储数据。

12.3 编写反应式的MongoDB repository

MongoDB是另一个知名的NoSQL数据库。Cassandra是行存储数据库,而MongoDB则被视为文档数据库。更具体来讲,MongoDB以BSON(BinaryJSON,二进制JSON)格式存储文档,我们可以使用与查询其他数据库中的数据类似的方式查询和检索文档。

与Cassandra一样,必须要明确知道MongoDB不是关系数据库。管理MongoDB服务器集群和数据建模的方式与处理其他类型数据库时的思维方式是不一样的。

不过,使用MongoDB和Spring Data与使用Spring Data处理JPA或Cassandra并没有太大的差异。我们会在领域类上使用注解,将领域类型映射为文档结构。我们还会编写repository接口,这遵循与JPA和Cassandra一样的编程模型。但是在进行任何操作之前,我们必须在项目中启用Spring Data MongoDB。

12.3.1 启用Spring Data MongoDB

要启用Spring Data MongoDB,我们需要将Spring Data MongoDB starter添加到项目的构建文件中。Spring Data MongoDB有两个独立的可选starter。

如果你使用非反应式的MongoDB,那么需要将如下的依赖添加到构建文件中:

1
2
3
4
5
6
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>
spring-boot-starter-data-mongodb
</artifactId>
</dependency>

这项依赖也可以在Spring Initializr中通过选中名为MongoDB的复选框添加进来。但是,本章主要关注的是编写反应式repository,所以我们要选择反应式SpringData MongoDB starter依赖:

1
2
3
4
5
6
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>
spring-boot-starter-data-mongodb-reactive
</artifactId>
</dependency>

在Initializr中,我们可以通过选中Reactive MongoDB复选框将反应式SpringData MongoDB starter添加进来。将这个starter添加到构建文件中之后,自动配置功能将会触发,启用Spring Data对自动化repository接口的支持,这一点与第3章的JPA和第11章的Cassandra类似。

默认情况下,Spring Data MongoDB会假定MongoDB在本地运行并监听27017端口。为了测试和开发的便利性,我们可以选择使用嵌入式的Mongo数据库。为了实现这一点,我们需要将Flapdoodle Embedded MongoDB依赖添加到构建文件中:

1
2
3
4
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>

与我们在关系型数据库中使用H2类似,Flapdoodle嵌入式数据库带来了使用内存Mongo数据库的便利性。也就是说,我们不需要运行单独的数据库,但是所有的数据会在应用重启的时候丢掉。

嵌入式数据库对于开发和测试是很不错的,一旦我们将应用部署到生产环境,就需要设置几个属性,让Spring Data MongoDB知道访问何处的Mongo数据库以及该如何进行访问:

1
2
3
4
5
6
7
8
spring:
data:
mongodb:
host: mongodb.tacocloud.com
port: 27018
username: tacocloud
password: s3cr3tp455w0rd
database: tacoclouddb

在这里,并不是所有的属性都是必需的。如果Mongo数据库不在本地运行,那么这些属性能够为Spring Data MongoDB指明正确的方向。拆分一下上面的配置,如下就是要设置的每个属性。

  • spring.data.mongodb.host:Mongo运行的主机名(默认为localhost)。
  • spring.data.mongodb.port:Mongo服务器监听的端口(默认为27017)。
  • spring.data.mongodb.username:访问安全Mongo数据库的用户名。
  • spring.data.mongodb.password:访问安全Mongo数据库的密码。
  • spring.data.mongodb.database:数据库名(默认为test)。

在我们的项目中,已经启用了Spring Data MongoDB,所以接下来我们需要为领域对象添加注解,以便于将它们持久化为MongoDB中的文档。

12.3.2 将领域对象映射为文档

Spring Data MongoDB提供了多个注解。在将领域对象映射为要持久化到MongoDB中的文档结构时,这些注解是非常有用的。尽管Spring DataMongoDB提供了多个用于映射的注解,但是其中的3个是最常用的。

  • @Id:将某个属性指明为文档的ID(来自Spring Data Commons)。
  • @Document:将领域类型声明为要持久化到MongoDB中的文档。
  • @Field:指定某个属性持久化到文档中的字段名称(以及可选的顺序配置)。

在这3个注解中,@Id和@Document是严格需要的。除非显式指定,否则没有使用@Field注解的属性将假定字段名与属性名相同。

将这些注解应用到Ingredient类上的效果如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>
spring-boot-starter-data-mongodb
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>
spring-boot-starter-data-mongodb-reactive
</artifactId>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
spring:
data:
mongodb:
host: mongodb.tacocloud.com
port: 27018
username: tacocloud
password: s3cr3tp455w0rd
database: tacoclouddb
package tacos;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@Document
public class Ingredient {
@Id
private final String id;
private final String name;
private final Type type;
public static enum Type {
WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
}
}

可以看到,我们在类级别使用了@Document注解,表明Ingredient是一个文档实体,可以在Mongo数据库中执行读取和写入操作。默认情况下,集合名(这是Mongo中与关系型数据库的表对等的概念)是基于类名的,只不过第一个字母会变成小写。因为我们没有特别指定,所以Ingredient对象将会持久化到名为ingredient的集合中。但是,我们可以通过设置@Document的collection属性改变这种行为:

1
2
3
4
5
6
7
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@Document(collection="ingredients")
public class Ingredient {
...
}

我们还会看到,id属性使用了@Id注解。这表明该属性将会作为要持久化的文档的ID。我们可以将@Id注解用到任意Serializable类型的字段上,包括String和Long。在本例中,我们已经使用String定义的id属性作为自然标识符,因此不需要将其更改为其他类型。

到目前为止,一切都很顺利。但是,不要忘了,在本章前面的内容中,我们曾说过Ingredient是进行Cassandra映射时最简单的一个领域类型。其他的类型,比如Taco,就稍微困难一些了。接下来,我们看一下如何映射Taco类,看看它会有哪些惊喜。

在将领域类型映射为MongoDB文档时,我们肯定需要为Taco添加@Document注解。同时,我们还需要通过@Id注解指定ID属性。在添加完支持MongoDB持久化的注解后,我们就会得到如下的Taco类:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Data
@RestResource(rel="tacos", path="tacos")
@Document
public class Taco {
@Id
private String id;
@NotNull
@Size(min=5, message="Name must be at least 5 characters long")
private String name;
private Date createdAt = new Date();
@Size(min=1, message="You must choose at least 1 ingredient")
private List<Ingredient> ingredients;
}

不管你是否相信,这就是所有的内容。在Cassandra中,我们还需要处理两个不同的主键字段并且要引用用户定义类型,但这是Cassandra特有的。对于MongoDB来说,Taco的映射要简单得多。

即便如此,在Taco中还是有一些有意思的事情值得关注。首先,我们要注意,id属性变成了String类型(而不是JPA版本中的Long类型或Cassandra版本中的UUID类型)。正如我在前文所述,@Id注解可以用到任意Serializable类型上。如果选择使用String属性作为ID,我们就可以在保存的时候让Mongo自动设置一个值给它。将其设置为String类型之后,我们就得到了一个数据库管理赋值的ID,而不用再担心如何手动设置该属性。

我们再看一下ingredients属性。它是一个List<Ingredient>,与第3章中的JPA版本非常类似。与JPA版本不同的是,这个列表不会存储到单独的MongoDB集合中。与Cassandra对应的功能类似,配料列表会直接、以非规范化的形式存储到taco文档中。不过,与Cassandra不同,我们不需要创建用户定义类型,MongoDB非常乐意使用任何类型,不管它是带有@Document注解的另一个类型还是简单的POJO,都是可以的。

看到将Taco映射为文档持久化非常容易,我们可以松口气了。这种映射的便利性会延续到Order领域类吗?你可以自行看一下带有MongoDB注解的Order类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
@Document
public class Order implements Serializable {
private static final long serialVersionUID = 1L;
@Id
private String id;
private Date placedAt = new Date();
@Field("customer")
private User user;
// other properties omitted for brevity's sake
private List<Taco> tacos = new ArrayList<>();
public void addDesign(Taco design) {
this.tacos.add(design);
}
}

简单起见,我删除了投递和信用卡相关的各种字段。从剩下的部分可以清楚地看出,与其他领域类型一样,我们只需要@Document@Id注解。即便如此,我们也为user属性使用了@Field,指定在持久化文档中它将会存储为customer。

User领域类的MongoDB持久化映射依然非常简单,看到这里,相信你并不会对此感到意外:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Data
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@RequiredArgsConstructor
@Document
public class User implements UserDetails {
private static final long serialVersionUID = 1L;
@Id
private String id;
private final String username;
private final String password;
private final String fullname;
private final String street;
private final String city;
private final String state;
private final String zip;
private final String phoneNumber;
// UserDetails method omitted for brevity's sake
}

虽然有一些更高级和不常见的场景需要额外的映射,但是我们会发现,对于大多数情况,@Document@Id以及偶尔用到的@Field对于MongoDB映射来说已经足够了。对于Taco Cloud的领域类型,它们完全可以胜任。

剩下的事情就是编写repository接口了。

12.3.3 编写反应式的MongoDB repository接口

Spring Data MongoDB提供的自动化repository功能与Spring Data JPA和Spring Data Cassandra类似。在为MongoDB编写反应式repository的时候,我们可以在ReactiveCrudRepository和ReactiveMongoRepository之间进行选择。核心的差异在于,ReactiveMongoRepository提供多个特殊的insert()方法,它们针对新文档的持久化进行了优化,而ReactiveCrudRepository依赖save()方法来保存新文档和已有的文档。

如何编写非反应式的MongoDB repository?

本章主要关注如何使用Spring Data编写反应式的repository。如果出于某种原因,你希望使用非反应式的repository,那么可以通过让repository接口扩展CrudRepository或MongoRepository来实现,而不是选择扩展ReactiveCrudRepository或ReactiveMongo Repository。这样,我们就可以让repository返回带有Mongo注解的领域类型或这些领域类型的集合。

尽管不是严格要求的,但是你可以将spring-boot-starter-data-mongodb-reactive依赖替换为spring-boot-starter- data-mongodb。

首先,我们来定义将Ingredient对象持久化为文档的repository。在数据库初始化完成之后,我们不会频繁地创建配料的文档,甚至有可能永远不会这样做。因此,ReactiveMongoRepository提供的优化没有太多的用处,我们可以让IngredientRepository扩展ReactiveCrudRepository:

1
2
3
4
5
6
7
8
package tacos.data;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.web.bind.annotation.CrossOrigin;
import tacos.Ingredient;
@CrossOrigin(origins="*")
public interface IngredientRepository
extends ReactiveCrudRepository<Ingredient, String> {
}

稍等片刻!它看起来与我们在12.2.4小节中为Cassandra编写的IngredientRepository接口是完全一样的!实际上,这是同一个接口,没有任何变化。这凸显了扩展ReactiveCrudRepository的一个好处,也就是它在各种数据库类型之间具有更强的可移植性,并且针对MongoDB和Cassandra都可以很好地运行。

因为它是一个反应式repository,所以它的方法处理的是Flux和Mono,而不是原始领域类型或这些领域类型的集合。例如,findAll()方法将返回Flux<Ingredient>,而不是Iterable<Ingredient>。同样,findById()将返回Mono<Ingredient>,而不是Optional<Ingredient>。因此,这个反应式repository可以作为端到端反应式流的一部分。

现在,为了将Taco持久化为MongoDB中的文档,我们定义另一个repository。与配料文档不同,我们会频繁创建taco文档。因此,ReactiveMongoRepository优化过的insert()方法就很有价值了。如下的代码片段展现了支持MongoDB的TacoRepository接口:

1
2
3
4
5
6
7
8
package tacos.data;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Flux;
import tacos.Taco;
public interface TacoRepository
extends ReactiveMongoRepository<Taco, String> {
Flux<Taco> findByOrderByCreatedAtDesc();
}

相对于ReactiveCrudRepository,使用ReactiveMongoRepository唯一的缺点在于它是专属于MongoDB的,不能迁移至其他数据库。在你的项目中,你需要确定这种代价是否值得。如果你预计不会在某个时刻切换到不同的数据库,那么尽可以选择ReactiveMongoRepository并充分利用它针对数据插入操作所带来的优化。

注意,在TacoRepository中,我们引入了一个新的方法。这个方法支持显示最近创建的taco。在JPA版本的repository中,我们需要通过扩展PagingAndSortingRepository实现该功能。但是,在反应式repository中,PagingAndSortingRepository并没有太大的用处(尤其是分页功能)。在Cassandra版本中,排序是通过表定义中的集群键实现的,所以在repository中获取最近创建的taco时,我们并不需要特殊的处理。

对于MongoDB来说,我们想要获取最近创建的taco。尽管名字看上去有些奇怪,但是findByOrderByCreatedAtDesc()方法遵循自定义查询方法命名约定。它说明我们想要查找Taco对象,没有任何查询条件,我们在这里没有设置任何必须匹配的属性。然后,我们告诉它将结果按照createdAt属性降序排列。

在这里,命名中使用空By子句的原因在于方法名称中还有另一个By,这样做可以避免方法名称出现误解。如果将其命名为findAllOrderByCreatedAtDesc(),那么名称中的AllOrder部分将被忽略,Spring Data将尝试通过匹配createdAtDesc属性来查找taco。因为不存在该属性,所以应用将会报错,无法正常启动。

因为findByOrderByCreatedAtDesc()返回的是一个Flux<Taco>,所以我们不用担心分页的事情。相反,我们只需要使用take操作获取Flux发布的前12个Taco即可。例如,在显示最近创建的taco的控制器中,我们可以按照如下方式调用findByOrderBy CreatedAtDesc():

1
2
Flux<Taco> recents = repo.findByOrderByCreatedAtDesc()
.take(12);

最终得到的Flux所发布的Taco条目不会超过12个。

再看OrderRepository接口,它非常简单:

1
2
3
4
5
6
7
package tacos.data;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Flux;
import tacos.Order;
public interface OrderRepository
extends ReactiveMongoRepository<Order, String> {
}

我们会频繁创建Order文档,所以OrderRepository扩展了ReactiveMongoRepository,从而充分利用其insert()方法所带来的优化。除此之外,相对于我们已经定义的repository,它并没有什么新奇之处。

最后,我们看一下将User对象持久化为文档的repository:

1
2
3
4
5
6
7
8
package tacos.data;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Mono;
import tacos.User;
public interface UserRepository
extends ReactiveMongoRepository<User, String> {
Mono<User> findByUsername(String username);
}

讲解到现在,你对这个repository接口应该没有丝毫感到惊讶的地方了。与其他repository类似,它扩展了ReactiveMongoRepository(当然,它也可以扩展ReactiveCrudRepository)。唯一的与众不同之处在于,它有一个findByUsername()方式,这是在第4章中我们为了支持认证功能添加上去的。在这里,将它修改为返回Mono<User>,而不是原始的User对象。

12.2 使用反应式的Cassandra repository

Cassandra是一个分布式、高性能、始终可用、最终一致、分区行存储的NoSQL数据库。

描述该数据库的形容词是非常冗长的,但每一个词都准确说明了Cassandra的威力。简而言之,Cassandra处理的是数据行(row of data),这些数据行会在多个分布式节点中分区。不会有任何节点保存所有的数据,但是任何给定的行都会跨多个节点保存副本,从而消除了单点故障。

Spring Data Cassandra为Cassandra数据库提供了自动化repository的支持,这与Spring Data JPA为关系数据库提供的支持非常相似,但又有着明显的差异。此外,Spring Data Cassandra还提供了映射注解,用于将应用的领域类型映射到支撑的数据库结构之上。

在我们进一步探讨Cassandra之前,有一点很重要,那就是尽管Cassandra与关系数据库(如Oracle和SQL Server)有许多相似的概念,但Cassandra并不是关系数据库,在很多方面与关系数据库截然不同。我将尝试解释Cassandra的独特之处,因为这与如何使用Spring Data有关。我鼓励你阅读Cassandra自己的文档,以全面了解Cassandra的工作原理。

下面我们从在Taco Cloud项目中启用Spring Data Cassandra开始。

12.2.1 启用Spring Data Cassandra

要开始使用Spring Data Cassandra的反应式repository功能,我们需要添加反应式Spring Data Cassandra的Spring Boot starter依赖。实际上,我们可以从两个Spring Data Cassandra starter依赖间进行选择。

如果不打算为Cassandra编写反应式repository,那么我们可以在构建文件中添加如下依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>

这个依赖也可以在Initializr中通过选中Cassandra复选框添加进来。

在本章中,我们主要关注编写反应式repository,所以需要使用另外一个支持反应式Cassandra repository的starter依赖:

1
2
3
4
5
6
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>
spring-boot-starter-data-cassandra-reactive
</artifactId>
</dependency>

如果使用Spring Initializr创建项目,我们可以通过选中Reactive Cassandra复选框将这个依赖添加到构建文件中。

很重要的一点在于,我们使用这个依赖替代了Spring Data JPA starter依赖。此时我们不再通过JPA将数据持久化到关系型数据库中,而是使用Spring Data将数据持久化到Cassandra数据库中。因此,我们可能想要从构建文件中移除SpringData JPA starter依赖和关系型数据库的依赖(如JDBC驱动和H2依赖)。

Spring Data Reactive Cassandra starter依赖会为项目引入多个依赖项,其中包括Spring Data Cassandra库和Reactor。由于这些库位于运行时类路径中,因此将会触发创建反应式Cassandra库的自动配置。这意味着我们马上就能开始编写反应式Cassandra repository,而无须太多显式配置。

不过,少量的配置还是需要的,至少需要配置键空间(key space)的名称,我们的repository要在该键空间中进行操作。为了做到这一点,我们先创建一个键空间。

**注意**:在Cassandra中,键空间是Cassandra节点中的一组表。这与关系数据库中表、视图和约束的分组方式大致类似。

尽管我们可以配置Spring Data Cassandra自动创建键空间,但是手动创建(或使用现有的键空间)通常要容易得多。借助Cassandra CQL (Cassandra QueryLanguage,Cassandra查询语言) shell,我们可以使用如下的create keyspace命令为Taco Cloud应用创建键空间:

1
2
3
cqlsh> create keyspace tacocloud
... with replication={'class':'SimpleStrategy', 'replication_factor':1}
... and durable_writes=true;

简而言之,这里创建了一个名为tacocloud的键空间,并且使用简单策略的复制(replication)和持久性写入(durable writes)。通过将复制因子设置为1,我们希望为每行数据保留一个副本。复制策略决定了该如何处理复制操作。SimpleStrategy复制策略对于单数据中心(和样例)使用来说是不错的选择,但是如果你的Cassandra集群跨多个数据中心,那就应该考虑使用NetworkTopologyStrategy。推荐你阅读一下Cassandra的文档,了解复制策略的更多细节以及创建键空间的其他可选项。

现在,我们已经创建了键空间,接下来应该配置spring.data.cassandra.keyspace-name属性,告诉Spring Data Cassandra该如何使用该键空间:

1
2
3
4
5
spring:
data:
cassandra:
keyspace-name: tacocloud
schema-action: recreate-drop-unused

在这里,我们将spring.data.cassandra.schema-action属性设置为recreate-drop-unused。这项配置在开发阶段非常有用,因为它会保证应用在每次重新启动的时候,所有的表和用户定义类型都将会删除并重建。它的默认值为none,不会对已有模式采取任何操作,在生产环境中,这种设置是非常有用的,因为我们并不想在应用启动的时候删除所有生产环境中的表。

在本地运行Cassandra数据库时,我们只需要设置这两个属性。不过,除了这两个属性之外,你可能还想要设置其他的属性,这取决于你如何配置Cassandra集群。

默认情况下,Spring Data Cassandra会假定Cassandra在本地运行并监听9092端口。如果事实并非如此,那么在生产环境的配置中我们可能还要配置spring.data.cassandra.contact- points和spring.data.cassandra.port属性:

1
2
3
4
5
6
7
8
9
spring:
data:
cassandra:
keyspace-name: tacocloud
contact-points:
- casshost-1.tacocloud.com
- casshost-2.tacocloud.com
- casshost-3.tacocloud.com
port: 9043

注意,spring.data.cassandra.contact-points属性是我们识别Cassandra主机名的地方。每个联系点(contact point)代表了运行Cassandra节点的主机。默认情况下,它会被设置为localhost,但是我们可以将其设置为主机名的一个列表。应用会尝试连接每个连接点,直到能够连接上其中的一个为止。这样能够确保在Cassandra集群中不会出现单点故障,应用能够通过给定的连接点与集群建立连接。

我们可能还需要设置Cassandra集群的用户名和密码。这可以通过设置spring.data.cassandra.username和spring.data.cassandra.password属性来实现:

1
2
3
4
5
6
spring:
data:
cassandra:
...
username: tacocloud
password: s3cr3tP455w0rd

现在,在我们的项目中已经启用和配置好了Spring Data Cassandra,接下来就应该将领域模型与Cassandra表进行映射并编写repository了。在此之前,我们回过头来看一些Cassandra数据模型的基本要点。

12.2.2 理解Cassandra的数据模型

正如前文所述,Cassandra与关系型数据库有很大的不同。在将领域类型映射为Cassandra表之前,理解Cassandra数据模型与关系型数据库数据持久化建模的差异是非常重要的。

关于Cassandra数据模型,有几项很重要的事情需要理解。

  • Cassandra表可能有任意数量的列,但是并不是所有的行都会用到这些列。
  • Cassandra数据库被分割为多个分区。给定表中的任何一行都可以由一个或多个分区管理,但是不太可能每个分区都拥有所有的行。
  • Cassandra表有两种键:分区键(partition key)和集群键(clusteringkey)。Cassandra会对每一行的分区键执行哈希操作,以确定由哪个分区管理该行。集群键决定了行在分区中维护的顺序(不一定是它们在查询结果中出现的顺序)。
  • Cassandra对读操作进行了极大的优化。因此,较为常见和推荐的做法是让表实现高度非规范化,并让数据跨多个表进行复制(比如,客户信息可能会保存在customer表中,同时也会复制到客户所创建的订单表中)。

需要说明一点,将Taco Cloud领域类型调整为使用Cassandra,并不是简单地将几个JPA注解替换为Cassandra注解就可以了。我们必须重新考虑如何对数据进行建模。

12.2.3 将领域对象映射为Cassandra持久化

在第3章中,我们为领域类型(Taco、Ingredient、Order等)添加了JPA规范提供的注解。这些注解会将领域类型映射为要持久化到关系型数据库中的实体。尽管这些注解无法用于Cassandra的持久化,但是Spring Data Cassandra提供了自己的映射注解以达到同样的目的。

我们首先从Ingredient开始,它可以非常容易地映射到Cassandra上。如下是支持Cassandra的新Ingredient类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package tacos;
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@Table("ingredients")
public class Ingredient {
@PrimaryKey
private final String id;
private final String name;
private final Type type;
public static enum Type {
WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
}
}

看上去,Ingredient类与我前面所说的只需替换几个注解就可以的说法相矛盾。在这里,我们不再使用JPA持久化中的@Entity注解,而是使用了@Table注解,这表明配料将会持久化到名为ingredients的表中。另外,我们不再为id属性使用@Id,而是使用@PrimaryKey。到现在为止,我们似乎只是替换了几个注解而已。

但是,不要让Ingredient的映射欺骗了你。Ingredient是最简单的领域类型之一。如果我们将Taco类进行Cassandra持久化映射(如程序清单12.1所示),那就更有意思了。

程序清单12.1 为Taco类添加注解实现Cassandra持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package tacos;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import org.springframework.data.cassandra.core.cql.Ordering;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;
import org.springframework.data.rest.core.annotation.RestResource;
import com.datastax.driver.core.utils.UUIDs;
import lombok.Data;
@Data
@RestResource(rel="tacos", path="tacos")
@Table("tacos") ⇽--- 持久化到tacos表
public class Taco {
@PrimaryKeyColumn(type=PrimaryKeyType.PARTITIONED) ⇽--- 定义分区键
private UUID id = UUIDs.timeBased();
@NotNull
@Size(min=5, message="Name must be at least 5 characters long")
private String name;
@PrimaryKeyColumn(type=PrimaryKeyType.CLUSTERED, ⇽--- 定义集群键
ordering=Ordering.DESCENDING)
private Date createdAt = new Date();
@Size(min=1, message="You must choose at least 1 ingredient")
@Column("ingredients") ⇽--- 将列表映射到ingredients列
private List<IngredientUDT> ingredients;
}

我们可以看到,Taco类的映射会更加复杂。与Ingredient类似,它也使用@Table注解声明taco应该写入到名为tacos的表中。但是,这是它与Ingredient唯一的相似之处。

id属性依然是主键,但它只是两个主键列中的一个而已。具体来讲,id属性使用了@PrimaryKeyColumn注解,并且type的值为PrimaryKeyType.PARTITIONED。这表明id属性要作为分区键,用来确定taco数据的每一行要写入到哪个分区中。

你可能也会发现,id属性现在是UUID类型,而不是Long类型。虽然不是强制要求,但是保存系统生成的ID值的属性通常是UUID类型的。此外,针对新Taco对象,这里的UUID会使用基于时间的UUID进行初始化(但是,从数据库中读取已有Taco时,它可能会被覆盖)。

我们继续往下看,createdAt属性映射到了另外一个主键列。但是,在本例中,@PrimaryKeyColumn的type属性设置成了PrimaryKeyType.CLUSTERED,这意味着createdAt会作为集群键。按照前文所述,集群键用来确定行在集群中的顺序。更具体来讲,我们将顺序设置为降序,所以,在给定的分区中,较新的行会优先出现在taco表中。

最后,ingredients属性是一个IngredientUDT对象的List,而不再是Ingredient对象的List。Cassandra表是高度非规范化的,因此可能会包含与其他表重复的数据。尽管ingredient表代表了所有可用配料的记录,但是taco所选择的配料会重复保存到ingredients列中。我们不会简单地引用ingredients表中的一行或多行,而是会让ingredients属性包含所有已选配料的完整数据。

但是,我们为什么会引入新的IngredientUDT类呢?为何不重用Ingredient类呢?简而言之,包含数据集合的列,比如ingredients列,必须是原生类型(整型、字符串等)的集合或用户定义类型(user-defined type)的集合。

在Cassandra中,用户定义类型能够让我们声明比原生类型更丰富的表的列。通常,它们会作为关系型结构中外键的非规范化模拟形式。但是,外键只是引用另外一张表中的一行数据,与之不同,用户定义类型实际上会持有其他表中某行数据的副本。在tacos表的ingredients列中,它将会包含配料定义的数据结构集合。

我们不能将Ingredient用作用户定义类型,因为@Table注解已经将其映射成了Cassandra中的一个持久化实体。所以,我们必须创建一个新的类,定义该如何将配料信息存储到taco表的ingredients列上。IngredientUDT类(其中UDT代表了用户定义类型,即user-defined type)就是完成这项工作的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package tacos;
import org.springframework.data.cassandra.core.mapping.UserDefinedType;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@UserDefinedType("ingredient")
public class IngredientUDT {
private final String name;
private final Ingredient.Type type;
}

尽管IngredientUDT和Ingredient看上去非常相似,但是它的映射需求要简单得多。它使用了@UserDefinedType注解,表明这是Cassandra中的用户定义类型。但是就其他方面来讲,它就是有几个属性的简单类。

我们会发现,IngredientUDT类没有包含id属性。尽管它也可以包含源Ingredient中id属性的副本,但是这样没有太大必要。实际上,用户定义类型可以包含任何想要的属性,它没有必要与表定义一一对应。

我发现,可视化用户定义类型与表中的持久化数据之间的关联关系是很困难的。图12.1展现了整个Taco Cloud数据库的数据模型,包含了用户定义类型。

image-20211017213043765

图12.1 在这里不再使用外键和连接,Cassandra表是非规范化的,用户定义类型包含从关联表复制的数据

具体到我们刚刚创建的用户定义类型,需要注意Taco有一个IngredientUDT列表,其中包含了从Ingredient复制而来的数据。当Taco持久化的时候,Taco对象以及IngredientUDT列表都会持久化到tacos表中。IngredientUDT列表会完整地持久化到ingredients列中。

另外一种帮助我们理解用户定义类型如何使用的办法就是从数据库中查询tacos表的各个行。借助Cassandra提供的CQL和cqlsh工具,我们可以看到如下的结果:

1
2
3
4
5
6
7
8
9
cqlsh:tacocloud> select id, name, createdAt, ingredients from tacos;
id | name | createdat | ingredients
----------+-----------+-----------+---------------------------------------
827390...| Carnivore | 2018-04...| [{name: 'Flour Tortilla', type: 'WRAP'},
{name: 'Carnitas', type: 'PROTEIN'},
{name: 'Sour Cream', type: 'SAUCE'},
{name: 'Salsa', type: 'SAUCE'},
{name: 'Cheddar', type: 'CHEESE'}]
(1 rows)

从中可以看到,id、name和createdat列包含的都是简单的值。在这方面,它们与关系数据库的类似查询差别不大。ingredients列就不一样了,按照定义,它包含用户定义的ingredient类型(由IngredientUDT所定义)的集合,所以它的值显示为一个JSON数组,数组中则是JSON对象。

你可能也注意到了在图12.1中还有其他的用户定义类型。在继续将领域对象映射为Cassandra的过程中,我们肯定要创建更多的用户定义类型,其中包括Order类所用到的类型。程序清单12.2显示的Order类,针对Cassandra持久化进行了修改。

程序清单12.2 将Order类映射为Cassandra tacoorders表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data
@Table("tacoorders") ⇽--- 映射到tacoorders表
public class Order implements Serializable {
private static final long serialVersionUID = 1L;
@PrimaryKey ⇽--- 声明主键
private UUID id = UUIDs.timeBased();
private Date placedAt = new Date();
@Column("user") ⇽--- 映射到user列
private UserUDT user;
// delivery and credit card properties omitted for brevity's sake
@Column("tacos") ⇽--- 将一个列表映射到tacos列
private List<TacoUDT> tacos = new ArrayList<>();
public void addDesign(TacoUTD design) {
this.tacos.add(design);
}
}

程序清单12.2有意省略了Order的许多属性,这些属性不适合Cassandra数据建模的讨论。剩下的属性和映射方式类似于Taco的定义。就像以前使用@Table一样,在这里@Table用于将Order映射到tacoorders表。在本例中,我们不关注顺序,因此id属性只使用了@PrimaryKey注解,将其同时作为分区键和集群键,并采用了默认的排序。

tacos属性比较有趣,因为它是List<TacoUDT>,而不是Taco对象的列表。在这里,Order和Taco/TacoUDT之间的关系类似于前文中Taco和Ingredient/IngredientUDT之间的关系。也就是说,我们不是通过外键将不同表中的多行数据关联在一起,而是让Order表包含所有的taco数据,以便于优化表的快速读取。

类似的,user属性引用了UserUDT对象,它会持久化到user列中。同样,这与关系型数据库中连接另外一张表的策略是不同的。

至于TacoUDT,它与IngredientUDT类非常相似,不过它里面包含了对另外一个用户定义类型的引用:

1
2
3
4
5
6
@Data
@UserDefinedType("taco")
public class TacoUDT {
private final String name;
private final List<IngredientUDT> ingredients;
}

UserUDT更有趣一点,因为它包含了3个属性,而不是两个:

1
2
3
4
5
6
7
@UserDefinedType("user")
@Data
public class UserUDT {
private final String username;
private final String fullname;
private final String phoneNumber;
}

如果能够重用第3章定义的领域类或者仅仅将JPA注解替换为Cassandra注解,那当然很好,但是Cassandra持久化的本质特点是要求我们重新思考数据该如何建模。现在,我们已经映射好了领域模型,接下来该编写repository了。

12.2.4 编写反应式Cassandra repository

正如我们在第3章所看到的,使用Spring Data编写repository只需声明一个接口,让它扩展Spring Data的基础repository,并有选择性地声明用于自定义查询的方法即可。实际上,编写反应式repository并没有太大的不同。主要区别在于,我们需要扩展一个不同的基础repository接口,而且我们的方法将会处理反应式发布者,如Mono和Flux,而不再是领域类型和集合。

在编写反应式Cassandra repository时,我们有两个基础接口可选:ReactiveCassandraRepository和ReactiveCrudRepository。选择哪个接口很大程度上取决于该如何使用repository。ReactiveCassandraRepository扩展了ReactiveCrudRepository,提供了insert()方法的一些变种,如果要保存的对象是新建的,这些变种进行了优化。除此之外,ReactiveCassandraRepository提供了与ReactiveCrudRepository相同的操作。如果我们想要插入很多数据,那么可能需要选择ReactiveCassandraRepository;否则,最好选择ReactiveCrudRepository,因为在不同数据库类型之间它更具可移植性。

Cassandra repository必须是反应式的吗?

尽管我们本章主要关注如何使用Spring Data编写反应式repository,但是你可能想知道该如何为Cassandra编写非反应式的repository。如果是这样,那么我们需要让repository接口扩展非反应式的CrudRepository或CassandraRepository接口,而不是扩展ReactiveCrud Repository或ReactiveCassandraRepository。我们的repository方法就可以返回带有Cassandra相关注解的领域类型或这些领域类型的集合,而不再是Flux和Mono。

如果你准备采用非反应式的repository,那么可以将starter依赖从spring-boot-starter-data-cassandra-reactive修改为spring-boot-starter-data-cassandra,不过这并不是严格要求的。

重新看一下我们为Taco Cloud编写的repository,要使它们变成反应式的,我们首先让它们扩展ReactiveCrudRepository或ReactiveCassandraRepository,而不再是CrudRepository。我们首先看一下IngredientRepository。除了使用配料数据初始化数据库之外,我们不会插入很多的新配料数据。所以,IngredientRepository可以扩展ReactiveCrudRepository,如下所示:

1
2
3
public interface IngredientRepository
extends ReactiveCrudRepository<Ingredient, String> {
}

我们不需要在IngredientRepository中定义任何的自定义查询,所以要将IngredientRepository变成反应式repository,并不需要额外的工作。现在,它扩展了ReactiveCrud Repository,所以它的方法处理的都是Flux和Mono。例如,findAll()方法现在返回的是Flux<Ingredient>,而不是Iterable<Ingredient>。所以,在使用它的时候,要按照正确的方式来使用。比如,IngredientController需要重写为返回Flux<Ingredient>

1
2
3
4
@GetMapping
public Flux<Ingredient> allIngredients() {
return repo.findAll();
}

TacoRepository的变更要稍微复杂一些。我们不用像非反应式repository那样扩展PagingAndSortingRepository,而是可以扩展ReactiveCassandraRepository。在参数化Taco对象的时候,不能使用Long类型的ID属性,在与Taco对象协作的时候,要使用UUID类型的ID:

1
2
3
public interface TacoRepository
extends ReactiveCrudRepository<Taco, UUID> {
}

因为这个新TacoRepository的findAll()方法会返回Flux<Ingredient>,所以我们不用让它扩展PagingAndSortingRepository,也不用操作分页的数据。相反,在DesignTacoController的recentTacos()方法中,我们只需要调用返回的Flux的take()方法来限制要消费的Taco对象的数量即可(实际上,在11.1.2节中,我们已经修改了DesignTacoController和它的recentTacos()方法)。

OrderRepository所需的变更也很简单。我们不再扩展CrudRepository,而是让它扩展ReactiveCassandraRepository:

1
2
3
public interface OrderRepository
extends ReactiveCassandraRepository<Order, UUID> {
}

最后,我们来看一下UserRepository。我们可能还记得,UserRepository有一个自定义的查询方法,即findByUsername()。这个方法让定义Cassandra持久化repository有了一些变化。支持Cassandra的UserRepository代码如下:

1
2
3
4
5
public interface UserRepository
extends ReactiveCassandraRepository<User, UUID> {
@AllowFiltering
Mono<User> findByUsername(String username);
}

与其他的repository接口(除了IngredientRepository)类似,UserRepository也扩展了ReactiveCassandraRepository。到目前为止,没有感到惊讶的地方。但是,它的findByUsername()方法我们需要注意一下。

首先,因为这是一个反应式repository,所以findByUsername()不会再简单地返回User对象。我们对其进行了重新定义,让它返回Mono<User>。一般而言,在反应式repository中,我们自定义的查询方法应该要么返回Mono(要返回的值不超过一个),要么返回Flux(会有多个返回值)。

同时,按照Cassandra的特点,在查询表的时候,我们不能像在关系型数据库的SQL中那样简单地使用where子句。Cassandra对读取进行了优化,但是使用where子句进行过滤可能会拖慢其他快速查询的速度。即便如此,根据一个或多个列对表进行查询还是非常有用的。因此,@AllowFiltering注解使结果的过滤变成了现实,它可以作为这些场景的可用方案。

在findByUsername()中,我们预期的CQL查询如下所示:

1
select * from users where username='some username';

同样,Cassandra是不允许这样做的。但是,在将@AllowFiltering注解放到findByUsername()方法上之后,所形成的CQL查询如下所示:

1
select * from users where username='some username' allow filtering;

查询末尾的allow filtering子句提醒Cassandra,我们已经意识到查询性能的潜在影响,并且无论如何都需要它。在这种情况下,Cassandra将允许使用where子句并按需过滤结果。

Cassandra中有很多强大功能,当它与Spring Data和Reactor结合使用时,我们可以在Spring应用中充分使用这些功能。但是,让我们把注意力转移到支持反应式repository的另一个数据库上来,那就是MongoDB。

第12章 反应式持久化数据

本章内容
  • - Spring Data的反应式repository
  • - 为Cassandra和MongoDB编写反应式repository
  • - 以反应式的方式使用非反应式的repository
  • - Cassandra的数据模型

在思考非阻塞的反应式代码和阻塞的命令式代码时,我经常会想到上下班的高峰时刻(rush hour)。高峰时刻是一个很奇怪的名字。每个人都急着去他们想到的地方,但是我们通常只能几乎一动不动地坐在车流之中。如果路上没有其他人,我们能够轻而易举地到达目的地。

即便我非常希望到达某个地方(我没有阻塞),但是这并不意味着路上没有其他人挡着我。前面可能有其他司机发生了剐蹭事故,阻塞了其他车辆的通行。所以即使我本可以畅通无阻地回到家中,但此刻我也只能阻塞在这里等待事故清理完成。

在前面的章节中,我们看到了如何使用Spring WebFlux创建反应式、非阻塞的控制器。这样能够帮助我们提升Web层的可扩展性。但是,只有当与这些控制器协作的其他组件都是非阻塞的时候,它们本身才能是非阻塞的。如果我们编写的SpringWebFlux控制器依赖于阻塞的repository,那么反应式控制器需要阻塞等待它们生成数据。

因此,很重要的一点在于,要让整个数据流变成反应式和非阻塞的,也就是从控制器直到数据库。在本章中,我们将会看到如何使用Spring Data编写反应式的repository,这些repository与我们在第3章看到的编程模型非常类似。我们首先从整体上了解一下Spring Data对反应式的支持。

12.1 理解Spring Data的反应式概况

从Spring Data Kay release train开始,Spring Data首次提供对反应式repository的支持,其中包括使用Cassandra、MongoDB、Couchbase或Redis持久化数据的反应式编程模型。

名称的由来

尽管Spring Data的各个项目都有自己的节奏,但是它们都按照一个releasetrain来进行发布,每个release train的命名对应计算机科学中一个重要人物的名字。

这些名字是按照字母排序的,比如Babbage、Codd、Dijkstra、Evans、Fowler、Gosling、Hopper和Ingalls。在编写本书的时候,最新的releasetrain版本是Spring Data Kay,这是根据Alan Kay来命名的,Alan Kay是Smalltalk编程语言的设计者之一。

你可能也发现了,在这里我并没有提到关系型数据库或JPA。令人遗憾的是,目前还没有对反应式JPA的支持。尽管关系型数据库依然是行业中使用最广泛的数据库方案,但是要让Spring Data JPA支持反应式编程模型,需要数据库和相关的JDBC都支持非阻塞的反应式模型。不幸的是,至少目前还不支持关系数据库的反应式处理。希望这种情况能在不久的将来得到解决。[^1]

本章的重点是使用Spring Data为支持反应式模型的数据库开发使用反应式类型的repository。我们首先对比一下Spring Data的反应式模型和非反应式模型。

12.1.1 Spring Data反应式本质论

Spring Data反应式的本质可以概括为一句话,那就是在反应式repository的方法中,要接受和返回Mono和Flux,而不是领域实体和集合。根据配料类型,从后端数据库中获取Ingredient对象的repository,可以声明为如下的repository接口:

1
Flux<Ingredient> findByType(Ingredient.Type type);

我们可以看到,这个findByType()方法会返回Flux<Ingredient>,而不是像对应的非反应式实现那样返回List<Ingredient>Iterable<Ingredient>

类似的,在保存Taco的时候,repository的saveAll()方法签名如下所示:

1
<Taco> Flux<Taco> saveAll(Publisher<Taco> tacoPublisher);

在本例中,saveAll()方法接受一个Taco类型的Publisher(可能是Mono<Taco>Flux<Taco>)并返回一个Flux<Taco>。这与非反应式的repository是不同的,它的save()方法直接处理领域类型,接受Taco对象并返回保存的Taco对象。

简而言之,Spring Data的反应式repository与我们在第3章看到的Spring Data的非反应式repository共享几乎相同的编程模型。唯一重要的区别是,反应式repository的方法接受和返回Flux和Mono,而不是原始的领域类型和集合。

12.1.2 反应式和非反应式类型之间的转换

在进一步研究如何使用Spring Data编写反应式repository之前,我们看一下如何解决遗留的巨大问题。我们可能已经使用了关系型数据库,将数据迁移至SpringData反应式编程模型支持的4种数据库之一是不太现实的,那是否就意味着我们无法在应用中使用反应式编程了呢?

从头到尾使用反应式模型(包括数据库层面)时,我们才能够得到反应式编程的全部收益,但是在非反应式数据库之上使用反应式流的话,我们也能得到一部分收益。即便我们所选择的数据库不支持非阻塞的反应式查询,我们依然可以以阻塞的方式获取数据并将其转换为反应式类型,从而使上游组件从中收益。

例如,假设我们正在使用关系型数据库并利用Spring Data JPA进行持久化。我们的OrderRepository可能会有一个如下签名的方法:

1
List<Order> findByUser(User user);

这个方法会返回一个非反应式的List<Order>,包含给定User的所有Order信息。当findByUser()被调用的时候,查询执行的过程中该方法会阻塞,结果会收集到一个List中。因为List并不是反应式类型,所以我们不能在它上面执行Flux提供的任何操作。另外,如果调用者是控制器,那么它无法以反应式的方式处理结果,实现提高可扩展性的目的。

在JPA repository的阻塞性方面我们确实无能为力。但是,我们可以在接收到非反应式List的时候就将其转换成Flux,这样我们就可以从这里开始以反应式的方式处理结果了。为了实现这一点,我们可以使用Flux.fromIterable():

1
2
List<Order> orders = repo.findByUser(someUser);
Flux<Order> orderFlux = Flux.fromIterable(orders);

与之类似,如果我们想要根据ID获取一个Order,我们就可以立即将其转换为Mono:

1
2
Order order repo.findById(Long id);
Mono<Order> orderMono = Mono.just(order);

通过使用Mono.just()和Flux的fromIterable()、fromArray()和fromStream()方法,我们可以将非反应式阻塞代码隔离在repository中,在应用的其他地方,我们都可以使用反应式类型。

那反方向怎么样呢?如果我们有一个Mono或Flux,此时需要调用非反应式JPArepository的save()方法又该怎么办呢?好消息是,Mono和Flux都提供了将它们发布的数据抽取到领域类型或Iterable中的操作。

例如,假设WebFlux控制器接受的是Mono<Taco>,那么我们需要使用SpringData JPA repository的save()方法将其保存起来。没有问题,我们只需调用Mono的block()方法就可以抽取Taco对象:

1
2
Taco taco = tacoMono.block();
tacoRepo.save(taco);

顾名思义,block()方法会执行一个阻塞操作,完成数据的抽取过程。

如果要从Flux中抽取数据,我们可以使用toIterable()。假设我们有一个Flux<Taco>,并且要调用Spring Data JPA repository的saveAll()方法,如下的代码片段将从Flux<Taco>中抽取Iterable<Taco>

1
2
Iterable<Taco> tacos = tacoFlux.toIterable();
tacoRepo.saveAll(tacos);

与Mono.block()类似,Flux.toIterable()在将Flux发布的对象抽取到Iterable的过程中是阻塞的。因为它们本质上是阻塞的,所以应该谨慎使用Mono.block()和Flux.toIterable(),并且要清楚地认识到使用它们会打破反应式编程模型。

要避免阻塞的抽取操作,还有一种更具反应式的方法,就是订阅Mono或Flux,并在其发布每个元素的时候执行所需的操作。例如,要使用非反应式的repository保存Flux<Taco>发布的Taco对象,我们可以这样做:

1
2
3
tacoFlux.subscribe(taco -> {
tacoRepo.save(taco);
});

虽然调用repository的save()方法依然是非反应式的阻塞操作,但是在消费和处理Flux或Mono发布的数据时,使用subscribe()是一种更自然、更加反应式的方式。

关于非反应式repository,我们已经讨论得够多了。接下来,我们见识一下SpringData反应式功能的真正威力,为Taco Cloud应用创建反应式repository。

12.1.3 开发反应式repository

正如我们在第3章中看到的那样,Spring Data最令人赞叹的特性之一就是我们只须声明repository接口即可,在运行时Spring Data会自动实现它们。在那一章中,我们主要关注Spring Data JPA,但是同样的编程模型也适用于非关系数据库,包括Cassandra和MongoDB。

除了Spring Data Cassandra和Spring Data MongoDB对非反应式repository的支持之外,它们都提供了反应式的编程模型。这些数据库在后端提供数据持久化功能,Spring应用可以真正实现从Web层到数据库的端到端反应式流。我们首先看看如何使用反应式Spring Data repository将数据持久化到Cassandra。

[^1]: Spring Data R2DBC致力于解决关系型数据库的反应式访问问题。——译者注

11.6 小结

  • Spring WebFlux提供了一个反应式的Web框架,它的编程模型是与SpringMVC对应的,甚至共享了很多相同的注解。
  • Spring 5还提供了函数式编程模型,作为Spring WebFlux的替代方案。
  • 反应式控制器可以使用WebTestClient来进行测试。
  • 在客户端,Spring 5提供了WebClient,也就是Spring RestTemplate的反应式等价实现。
  • 在保护Web应用方面,尽管WebFlux在底层有一些区别,但是SpringSecurity 5为反应式安全所提供的编程模型与非反应式Spring MVC应用相比并没有特别大的差异。

11.5 保护反应式Web API

从Spring Security诞生以来(甚至可以追溯到它叫作Acegi Security的时代),它的Web安全模型就是基于Servlet Filter构建的。毕竟,这样做是有道理的。如果我们希望拦截基于Servlet技术的Web框架的请求,以确保该请求得到了恰当的授权,那么Servlet Filter是显而易见的方案。但是,Spring WebFlux并不适用于这种方式。

在使用Spring WebFlux编写Web应用的时候,我们甚至都不能保证会用到Servlet。实际上,反应式Web应用很有可能构建在Netty或其他非Servlet容器上。这是否意味着基于Servlet Filter的Spring Security不能用来保护我们的Spring WebFlux应用了呢?

在保护Spring WebFlux应用的时候,Servlet Filter确实不是可行方案了。但是,Spring Security依然可以胜任这项任务。从5.0.0版本开始,Spring Security就既能保护基于Servlet的Spring MVC,又能保护反应式的Spring WebFlux应用了。在实现这一点的时候,它使用了Spring的WebFilter,这是Spring模仿ServletFilter的类似方案,但是它不依赖于Servlet API。

然而,更值得注意的是,反应式Spring Security的配置模型与在第4章中看到的没有太大不同。事实上,Spring WebFlux与Spring MVC有着独立的依赖关系,但与之不同,Spring Security是作为同一个Spring Boot Security starter提供的,不管你是打算使用它来保护Spring MVC Web应用,还是保护使用Spring WebFlux编写的应用,都需要添加这项依赖。提醒一下,security starter如下所示:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

也就是说,Spring Security的反应式和非反应式配置模型仅有几项很小的差异。我们很有必要快速对比一下这两种配置模型。

11.5.1 配置反应式Web应用的安全性

回忆一下,配置Spring Security来保护Spring MVC Web应用通常需要创建一个扩展自WebSecurityConfigurerAdapter的新配置类,并使用@EnableWebSecurity注解。这样的配置类将重写configuration()方法,以指定Web安全的细节,例如特定的请求路径需要哪些权限。下面这个简单的SpringSecurity配置类可以帮助我们回忆如何为非反应式Spring MVC应用配置安全性:

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/design", "/orders").hasAuthority("USER")
.antMatchers("/**").permitAll();
}
}

现在,我们看一下相同的配置如何用到反应式Spring WebFlux应用中。程序清单11.2展现了一个反应式安全配置类,它的功能与前文的安全配置大致相同:

程序清单11.2 为Spring WebFlux配置Spring Security

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {
@Bean
public SecurityWebFilterChain securityWebFilterChain(
ServerHttpSecurity http) {
return http
.authorizeExchange()
.pathMatchers("/design", "/orders").hasAuthority("USER")
.anyExchange().permitAll()
.and()
.build();
}
}

我们可以看到,有很多类似的地方,同时也有所差异。这个新的配置类没有使用@EnableWebSecurity,而是使用了@EnableWebFluxSecurity注解。除此之外,配置类没有扩展WebSecurityConfigurerAdapter或其他的基类,因此也就没有必要重写configure()。

为了取代configure()的功能,我们通过securityWebFilterChain()方法声明了一个SecurityWebFilterChain类型的bean。securityWebFilterChain()的方法体与前面配置的configure()方法没有太大的差异,但是也有略微的修改。

最重要的是,配置是通过给定的ServerHttpSecurity对象进行声明的,而不是通过HttpSecurity对象。借助ServerHttpSecurity,我们可以调用authorizeExchange(),它大致等价于authorizeRequests(),都是用来声明请求级的安全性的。

**注意**:ServerHttpSecurity是Spring Security 5新引入的,在反应式编程中它模拟了HttpSecurity的功能。

在映射路径的时候,我们依然可以使用Ant风格的通配符路径,但是这里要使用pathMatchers(),而不是antMatchers()。这样做的结果就是,我们不再需要声明Ant风格的路径“/**”来捕获所有请求,因为anyExchange()会映射所有的路径。

最后,因为我们将SecurityWebFilterChain声明为一个bean,而不是重写框架方法,所以我们需要调用build()方法将所有的安全规则聚合到一个要返回的SecurityWebFilterChain对象中。

除了这些微小的差异外,配置Spring WebFlux和Spring MVC的Web安全性并没有太多不同。那么如何获取用户的详情信息呢?

11.5.2 配置反应式的用户详情服务

在扩展WebSecurityConfigurerAdapter的时候,我们会重写configure()方法以声明安全规则,并且还会重写另外一个configure()方法来配置认证逻辑,通常这需要定义一个UserDetails。为了提醒一下代码会是什么样子,如下的代码重写了configure()方法,并且在UserDetailsService的匿名实现中,使用了注入的UserRepository对象以提供根据用户名查找用户的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Autowired
UserRepository userRepo;
@Override
protected void
configure(AuthenticationManagerBuilder auth)
throws Exception {
auth
.userDetailsService(new UserDetailsService() {
@Override
public UserDetails loadUserByUsername(String username)
throws UsernameNotFoundException {
User user = userRepo.findByUsername(username)
if (user == null) {
throw new UsernameNotFoundException(
username " + not found")
}
return user.toUserDetails();
}
});
}

在这个非反应式的配置中,我们重写了UserDetailsService唯一要求的方法,也就是loadUserByUsername()。在这个方法内部,我们使用给定的UserRepository,实现根据用户名来查找用户的功能。如果没有找到该名称的用户,就会抛出UsernameNotFoundException。如果能够找到,就调用一个辅助方法toUserDetails(),返回最终的UserDetails对象。

在反应式的安全配置中,我们不再重写configure()方法,而是声明一个ReactiveUserDetailsService bean。ReactiveUserDetailsService是UserDetailsService的反应式等价形式。与UserDetailsService类似,ReactiveUserDetailsService只需要实现一个方法。具体来讲,就是一个返回Mono<UserDetails>的findByUsername()方法,这里返回的不再是UserDetails对象。

在下面的样例中,ReactiveUserDetailsService bean会给定一个UserRepository,我们假设它是一个反应式的Spring Data repository(在第12章我们将会详细讨论):

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public ReactiveUserDetailsService userDetailsService(
UserRepository userRepo) {
return new ReactiveUserDetailsService() {
@Override
public Mono<UserDetails> findByUsername(String username) {
return userRepo.findByUsername(username)
.map(user -> {
return user.toUserDetails();
});
}
};
}

在这里,按需要返回一个Mono<UserDetails>,但是UserRepository.findByUsername()方法所返回的是Mono<User>。因为它是一个Mono,所以可以对它进行链式操作,比如进行map()操作,将Mono<User>映射为Mono<UserDetails>

在本例中,map()操作使用了一个lambda表达式,它调用了Mono所发布的User对象上的toUserDetails()方法。这个方法会将User转换为UserDetails。这样的话,“.map()”操作会返回一个Mono<UserDetails>,恰好就是ReactiveUserDetailsService.findByUsername()方法所需要的。

11.4 反应式消费REST API

在第7章中,我们使用RestTemplate发送客户端请求到Taco Cloud API上。RestTemplate有着很久的历史,从Spring 3.0版本就引入了。我们曾经使用它为应用发送了无数的请求,但是RestTemplate提供的方法处理的都是非反应式领域类型和集合。这意味着,如果我们想要以反应式的方式使用响应数据,就需要使用Flux或Mono对其进行包装。如果我们已经有了Flux或Mono,想要通过POST或PUT请求发送它们,那么我们需要在发送请求之前将数据抽取到一个非反应式的类型中。

如果能够有一种方式让RestTemplate原生使用反应式类型那就好了。不用担心,Spring 5提供了WebClient,它可以作为RestTemplate的反应式版本。WebClient能够让我们请求外部API时发送和接收反应式类型。

WebClient的使用方式与RestTemplate有很大的差别。RestTemplate会有多个方法处理不同类型的请求;而WebClient有一个流畅(fluent)的构建者风格接口,能够让我们描述和发送请求。WebClient的通用使用模式如下:

  • 创建WebClient实例(或注入WebClient bean);
  • 指定要发送请求的HTTP方法;
  • 指定请求中URI和头信息;
  • 提交请求;
  • 消费响应。

接下来,我们实际看几个WebClient的例子,首先从如何使用WebClient发送HTTP GET请求开始。

11.4.1 获取资源

作为使用WebClient的样例,假设我们需要通过Taco Cloud API根据ID获取Ingredient对象。如果使用RestTemplate,那么我们可能会使用getForObject()方法。但是,借助WebClient的话,我们会构建请求、获取响应并抽取一个会发布Ingredient对象的Mono:

1
2
3
4
5
6
Mono<Ingredient> ingredient = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
ingredient.subscribe(i -> { ... })

在这里,我们使用create()创建了一个新的WebClient实例。然后,我们使用get()和uri()定义对http://localhost:8080/ingredients/{id}的GET请求,其中{id}占位符将会被ingredientId的值所替换。接着,retrieve()会执行请求。最后,我们调用bodyToMono()将响应体的载荷抽取到`Mono`中,就可以继续使用Mono的额外操作了。

为了对bodyToMono()返回Mono进行额外的操作,需要注意的很重要的一点是要在请求发送之前对其进行订阅。发送请求获取值的集合是非常容易的。例如,如下的代码片段将获取所有配料:

1
2
3
4
5
6
Flux<Ingredient> ingredients = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients")
.retrieve()
.bodyToFlux(Ingredient.class);
ingredients.subscribe(i -> { ... })

大部分而言,获取多个条目与获取单个条目是相同的。最大的差异在于我们不再是使用bodyToMono()将响应体抽取为Mono,而是使用bodyToFlux()将其抽取为一个Flux。

与bodyToMono()类似,bodyToFlux()返回的Flux还没有被订阅。在数据流过之前,我们可以对Flux添加一些额外的操作(过滤、映射等)。因此,非常重要的一点就是要订阅结果所形成的Flux,否则请求将始终不会发送。

使用基础URI发送请求

你可能会发现在很多请求中都会使用一个通用的基础URI。这样的话,创建WebClient bean的时候设置一个基础URI并将其注入到所需的地方是非常有用的。这样的bean可以按照如下的方式来声明:

1
2
3
4
@Bean
public WebClient webClient() {
return WebClient.create("http://localhost:8080");
}

然后,在想要使用基础URI的任意地方,我们都可以将WebClient bean注入进来并按照如下的方式来使用:

1
2
3
4
5
6
7
8
9
10
@Autowired
WebClient webClient;
public Mono<Ingredient> getIngredientById(String ingredientId) {
Mono<Ingredient> ingredient = webClient
.get()
.uri("/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
ingredient.subscribe(i -> { ... })
}

因为WebClient已经创建好了,所以我们可以通过get()方法直接使用它。对于URI来说,我们只需要调用uri()指定相对于基础URI的相对路径即可。

对长时间运行的请求进行超时处理

我们需要考虑的一件事情就是,网络并不是始终可靠的,或者并不像我们预期的那么快,远程服务器在处理请求时有可能会非常缓慢。理想情况下,对远程服务的请求会在一个合理的时间内返回。无法正常返回的话,客户端要是能够避免陷入长时间等待响应的窘境就好了。

为了避免客户端请求被缓慢的网络或服务阻塞,我们可以使用Flux或Mono的timeout()方法,为等待数据发布的过程设置一个时长限制。作为样例,我们考虑一下如何为获取配料数据使用timeout()方法:

1
2
3
4
5
6
7
8
9
10
11
12
Flux<Ingredient> ingredients = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients")
.retrieve()
.bodyToFlux(Ingredient.class);
ingredients
.timeout(Duration.ofSeconds(1))
.subscribe(
i -> { ... },
e -> {
// handle timeout error
})

可以看到,在订阅Flux之前,我们调用了timeout()方法,将持续时间设置成了1秒。如果请求能够在1秒之内返回,就不会有任何问题。如果请求的耗时超过1秒,就会超时,作为第二个参数传递给subscribe()的错误处理器将会被调用。

11.4.2 发送资源

使用WebClient发送数据与接收数据并没有太大的差异。作为样例,假设我们有一个Mono<Ingredient>,并且想要将Mono发布的Ingredient对象以POST请求的形式发送到相对路径“/ingredients”的URI上。我们所需要做的就是使用post()方法来替换get(),并通过body()方法指明要使用Mono来填充请求体:

1
2
3
4
5
6
7
8
Mono<Ingredient> ingredientMono = ...;
Mono<Ingredient> result = webClient
.post()
.uri("/ingredients")
.body(ingredientMono, Ingredient.class)
.retrieve()
.bodyToMono(Ingredient.class);
result.subscribe(i -> { ... })

如果我们没有要发送的Mono或Flux,而只有原始的领域对象,那么可以使用syncBody()方法。例如,假设我们没有Mono<Ingredient>,而是有一个想要在请求体中发送的Ingredient对象,那么可以这样做:

1
2
3
4
5
6
7
8
Ingedient ingredient = ...;
Mono<Ingredient> result = webClient
.post()
.uri("/ingredients")
.syncBody(ingredient)
.retrieve()
.bodyToMono(Ingredient.class);
result.subscribe(i -> { ... })

如果我们不是使用POST请求,而是想要使用PUT请求更新一个Ingredient,就可以使用put()来替换post(),并相应地调整URI路径:

1
2
3
4
5
6
7
Mono<Void> result = webClient
.put()
.uri("/ingredients/{id}", ingredient.getId())
.syncBody(ingredient)
.retrieve()
.bodyToMono(Void.class)
.subscribe();

PUT请求的响应载荷一般是空的,所以我们必须要求bodyToMono()返回一个Void类型的Mono。一旦订阅该Mono,请求就会立即发送。

11.4.3 删除资源

WebClient还支持通过其delete()方法移除资源。例如,根据ID删除配料:

1
2
3
4
5
6
Mono<Void> result = webClient
.delete()
.uri("/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Void.class)
.subscribe();

与PUT请求类似,DELETE请求的响应不会有载荷。同样,我们返回并订阅Mono<Void>就会发送请求。

11.4.4 处理错误

到目前为止,所有的WebClient样例都假设有一个正常的结果:没有400级别和500级别的状态码。如果出现这两种类型的错误状态,WebClient就会记录失败信息;否则,就会默默忽略掉。

如果你需要处理这种错误,那么可以调用onStatus()来指定各种类型的HTTP状态码该如何进行处理。onStatus()接受两个函数:一个断言函数用来匹配HTTP状态;另一个函数会得到ClientResponse对象,并返回Mono<Throwable>

为了阐述如何使用onStatus()创建自定义的错误处理器,请参考如下使用WebClient根据ID获取配料的样例:

1
2
3
4
5
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);

如果ingredientId的值能够匹配已知的资源,那么结果得到的Mono在订阅时就会发布一个Ingredient。但是,如果找不到匹配的配料呢?

当订阅可能会出现错误的Mono或Flux时,很重要的一点就是在调用subscribe()注册数据消费者的同时,也要注册一个错误消费者:

1
2
3
4
5
6
7
8
9
ingredientMono.subscribe(
ingredient -> {
// handle the ingredient data
...
},
error-> {
// deal with the error
...
});

如果能够找到配料资源,那么传递给subscribe()的第一个lambda表达式(数据消费者)将会被调用,并且会将匹配的Ingredient对象传递过来。但是,如果找不到资源,那么请求将会得到一个HTTP 404 (NOT FOUND)状态码的响应,它将会导致第二个lambda表达式(错误消费者)被调用,并且会传递过来一个默认的WebClientResponseException。

WebClientResponseException最大的问题在于它无法明确指出导致Mono失败的原因是什么。它的名字表明在WebClient发起的请求中,出现了响应错误,但是我们需要深入研究WebClientResponseException才能知道哪里出现了错误。无论如何,如果给错误消费者的异常更加专注业务领域而不是专注WebClient,那就更好了。

我们可以添加一个自定义的错误处理器,在这个处理器中可以提供将状态码转换为自己所选择的Throwable的代码。如果请求配料资源时得到的Mono失败,我们就生成一个UnknownIngredientException。在调用retrieve()之后,我们可以添加一个对onStatus()的调用,从而实现这一点:

1
2
3
4
5
6
7
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
response -> Mono.just(new UnknownIngredientException()))
.bodyToMono(Ingredient.class);

调用onStatus()时第一个参数是断言,它会接受一个HttpStatus,如果状态码是我们想要处理的,就将会返回true。如果状态码匹配,响应将会传递给第二个参数的函数并按需进行处理,最终返回Throwable类型的Mono。

在样例中,如果状态码是400级别的(比如客户端错误),那么将会返回包含UnknownIngredientException的Mono。这会导致ingredientMono因为该异常而失败。

需要注意,HttpStatus::is4xxClientError是对HttpStatus的is4xxClientError的方法引用。此时,将会基于HttpStatus对象调用该方法。如果喜欢,还可以使用HttpStatus的其他方法作为方法引用。你也可以以lambda表达式或方法引用的形式提供其他返回boolean类型的函数。

例如,在错误处理中,我们可以更加精确地检查HTTP 404 (NOT FOUND)状态,只需将对onStatus()的调用修改成如下形式即可:

1
2
3
4
5
6
7
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.onStatus(status -> status == HttpStatus.NOT_FOUND,
response -> Mono.just(new UnknownIngredientException()))
.bodyToMono(Ingredient.class);

值得一提的是,我们可以按需调用onStatus()任意多次,以便于处理响应中可能返回的各种HTTP状态码。

11.4.5 交换请求

到目前为止,在使用WebClient的时候,我们都是利用它的retrieve()方法来发送请求。在这些场景中,retrieve()方法会返回一个ResponseSpec类型的对象,通过调用它的onStatus()、bodyToFlux()和bodyToMono()方法,我们就能处理响应。对于简单的场景来说,使用ResponseSpec就足够了,但是它在很多方面都有局限性。如果我们想要访问响应的头信息或cookie的值,那么ResponseSpec就无能为力了。

在使用ResponseSpec遇到困难时,我们就可以通过调用exchange()方法来替换retrieve()方法。exchange()方法会返回ClientResponse类型的Mono,我们可以对它采用各种反应式操作,以便于探测和使用整个响应中的数据,包括载荷、头信息和cookie。

在了解exchange()和retrieve()的差异之前,我们先看一下它们之间的相似之处。如下的代码片段通过WebClient和exchange()方法,根据ID获取单个配料:

1
2
3
4
5
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.exchange()
.flatMap(cr -> cr.bodyToMono(Ingredient.class));

这几乎与使用retrieve()的样例是相同的:

1
2
3
4
5
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);

在exchange()样例中,我们不是使用ResponseSpec对象的bodyToMono()方法来获取Mono<Ingredient>,而是得到了一个Mono<ClientResponse>,通过它我们可以执行扁平化映射(flat-mapping)函数,将ClientResponse映射为Mono<Ingredient>,这样扁平化为最终想要的Mono。

现在,我们看一下exchange()的差异在什么地方。假设请求的响应中会包含一个名为X_UNAVAILABLE的头信息,如果它的值为true,则表明该配料是不可用的(因为某种原因)。为了讨论方便,假设如果这个头信息存在,那么我们希望得到的Mono是空的,不返回任何内容。通过添加另外一个flatMap()调用,我们就能实现这一点。整个的WebClient调用过程如下所示:

1
2
3
4
5
6
7
8
9
10
11
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.exchange()
.flatMap(cr -> {
if (cr.headers().header("X_UNAVAILABLE").contains("true")) {
return Mono.empty();
}
return Mono.just(cr);
})
.flatMap(cr -> cr.bodyToMono(Ingredient.class));

新的flatMap()调用会探查给定ClientRequest对象的响应头,查看是否存在值为true的X_UNAVAILABLE头信息。如果能够找到,就将会返回一个空的Mono;否则,返回一个包含ClientResponse的新Mono。不管是哪种情况,返回的Mono都会扁平化为下一个flatMap()操作所要使用的Mono。

第11章 开发反应式API

本章内容:
  • 使用Spring WebFlux
  • 编写和测试反应式的控制器以及客户端
  • 消费REST API
  • 保护反应式Web应用

我们已经了解了反应式编程和Reactor项目,现在可以开始在Spring应用程序中使用这些技术了。在本章中,我们将利用Spring 5的反应式编程模型重新讨论在第6章中编写的控制器。

具体来讲,我们将一起探讨Spring 5中新添加的反应式Web框架——SpringWebFlux。我们很快会发现,Spring WebFlux与Spring MVC非常相似,这样使得它非常易于使用,我们已经掌握的如何在Spring中构建REST API的知识依然有用。