11.4 反应式消费REST API

11.4 反应式消费REST API

在第7章中,我们使用RestTemplate发送客户端请求到Taco Cloud API上。RestTemplate有着很久的历史,从Spring 3.0版本就引入了。我们曾经使用它为应用发送了无数的请求,但是RestTemplate提供的方法处理的都是非反应式领域类型和集合。这意味着,如果我们想要以反应式的方式使用响应数据,就需要使用Flux或Mono对其进行包装。如果我们已经有了Flux或Mono,想要通过POST或PUT请求发送它们,那么我们需要在发送请求之前将数据抽取到一个非反应式的类型中。

如果能够有一种方式让RestTemplate原生使用反应式类型那就好了。不用担心,Spring 5提供了WebClient,它可以作为RestTemplate的反应式版本。WebClient能够让我们请求外部API时发送和接收反应式类型。

WebClient的使用方式与RestTemplate有很大的差别。RestTemplate会有多个方法处理不同类型的请求;而WebClient有一个流畅(fluent)的构建者风格接口,能够让我们描述和发送请求。WebClient的通用使用模式如下:

  • 创建WebClient实例(或注入WebClient bean);
  • 指定要发送请求的HTTP方法;
  • 指定请求中URI和头信息;
  • 提交请求;
  • 消费响应。

接下来,我们实际看几个WebClient的例子,首先从如何使用WebClient发送HTTP GET请求开始。

11.4.1 获取资源

作为使用WebClient的样例,假设我们需要通过Taco Cloud API根据ID获取Ingredient对象。如果使用RestTemplate,那么我们可能会使用getForObject()方法。但是,借助WebClient的话,我们会构建请求、获取响应并抽取一个会发布Ingredient对象的Mono:

1
2
3
4
5
6
Mono<Ingredient> ingredient = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
ingredient.subscribe(i -> { ... })

在这里,我们使用create()创建了一个新的WebClient实例。然后,我们使用get()和uri()定义对http://localhost:8080/ingredients/{id}的GET请求,其中{id}占位符将会被ingredientId的值所替换。接着,retrieve()会执行请求。最后,我们调用bodyToMono()将响应体的载荷抽取到`Mono`中,就可以继续使用Mono的额外操作了。

为了对bodyToMono()返回Mono进行额外的操作,需要注意的很重要的一点是要在请求发送之前对其进行订阅。发送请求获取值的集合是非常容易的。例如,如下的代码片段将获取所有配料:

1
2
3
4
5
6
Flux<Ingredient> ingredients = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients")
.retrieve()
.bodyToFlux(Ingredient.class);
ingredients.subscribe(i -> { ... })

大部分而言,获取多个条目与获取单个条目是相同的。最大的差异在于我们不再是使用bodyToMono()将响应体抽取为Mono,而是使用bodyToFlux()将其抽取为一个Flux。

与bodyToMono()类似,bodyToFlux()返回的Flux还没有被订阅。在数据流过之前,我们可以对Flux添加一些额外的操作(过滤、映射等)。因此,非常重要的一点就是要订阅结果所形成的Flux,否则请求将始终不会发送。

使用基础URI发送请求

你可能会发现在很多请求中都会使用一个通用的基础URI。这样的话,创建WebClient bean的时候设置一个基础URI并将其注入到所需的地方是非常有用的。这样的bean可以按照如下的方式来声明:

1
2
3
4
@Bean
public WebClient webClient() {
return WebClient.create("http://localhost:8080");
}

然后,在想要使用基础URI的任意地方,我们都可以将WebClient bean注入进来并按照如下的方式来使用:

1
2
3
4
5
6
7
8
9
10
@Autowired
WebClient webClient;
public Mono<Ingredient> getIngredientById(String ingredientId) {
Mono<Ingredient> ingredient = webClient
.get()
.uri("/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
ingredient.subscribe(i -> { ... })
}

因为WebClient已经创建好了,所以我们可以通过get()方法直接使用它。对于URI来说,我们只需要调用uri()指定相对于基础URI的相对路径即可。

对长时间运行的请求进行超时处理

我们需要考虑的一件事情就是,网络并不是始终可靠的,或者并不像我们预期的那么快,远程服务器在处理请求时有可能会非常缓慢。理想情况下,对远程服务的请求会在一个合理的时间内返回。无法正常返回的话,客户端要是能够避免陷入长时间等待响应的窘境就好了。

为了避免客户端请求被缓慢的网络或服务阻塞,我们可以使用Flux或Mono的timeout()方法,为等待数据发布的过程设置一个时长限制。作为样例,我们考虑一下如何为获取配料数据使用timeout()方法:

1
2
3
4
5
6
7
8
9
10
11
12
Flux<Ingredient> ingredients = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients")
.retrieve()
.bodyToFlux(Ingredient.class);
ingredients
.timeout(Duration.ofSeconds(1))
.subscribe(
i -> { ... },
e -> {
// handle timeout error
})

可以看到,在订阅Flux之前,我们调用了timeout()方法,将持续时间设置成了1秒。如果请求能够在1秒之内返回,就不会有任何问题。如果请求的耗时超过1秒,就会超时,作为第二个参数传递给subscribe()的错误处理器将会被调用。

11.4.2 发送资源

使用WebClient发送数据与接收数据并没有太大的差异。作为样例,假设我们有一个Mono<Ingredient>,并且想要将Mono发布的Ingredient对象以POST请求的形式发送到相对路径“/ingredients”的URI上。我们所需要做的就是使用post()方法来替换get(),并通过body()方法指明要使用Mono来填充请求体:

1
2
3
4
5
6
7
8
Mono<Ingredient> ingredientMono = ...;
Mono<Ingredient> result = webClient
.post()
.uri("/ingredients")
.body(ingredientMono, Ingredient.class)
.retrieve()
.bodyToMono(Ingredient.class);
result.subscribe(i -> { ... })

如果我们没有要发送的Mono或Flux,而只有原始的领域对象,那么可以使用syncBody()方法。例如,假设我们没有Mono<Ingredient>,而是有一个想要在请求体中发送的Ingredient对象,那么可以这样做:

1
2
3
4
5
6
7
8
Ingedient ingredient = ...;
Mono<Ingredient> result = webClient
.post()
.uri("/ingredients")
.syncBody(ingredient)
.retrieve()
.bodyToMono(Ingredient.class);
result.subscribe(i -> { ... })

如果我们不是使用POST请求,而是想要使用PUT请求更新一个Ingredient,就可以使用put()来替换post(),并相应地调整URI路径:

1
2
3
4
5
6
7
Mono<Void> result = webClient
.put()
.uri("/ingredients/{id}", ingredient.getId())
.syncBody(ingredient)
.retrieve()
.bodyToMono(Void.class)
.subscribe();

PUT请求的响应载荷一般是空的,所以我们必须要求bodyToMono()返回一个Void类型的Mono。一旦订阅该Mono,请求就会立即发送。

11.4.3 删除资源

WebClient还支持通过其delete()方法移除资源。例如,根据ID删除配料:

1
2
3
4
5
6
Mono<Void> result = webClient
.delete()
.uri("/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Void.class)
.subscribe();

与PUT请求类似,DELETE请求的响应不会有载荷。同样,我们返回并订阅Mono<Void>就会发送请求。

11.4.4 处理错误

到目前为止,所有的WebClient样例都假设有一个正常的结果:没有400级别和500级别的状态码。如果出现这两种类型的错误状态,WebClient就会记录失败信息;否则,就会默默忽略掉。

如果你需要处理这种错误,那么可以调用onStatus()来指定各种类型的HTTP状态码该如何进行处理。onStatus()接受两个函数:一个断言函数用来匹配HTTP状态;另一个函数会得到ClientResponse对象,并返回Mono<Throwable>

为了阐述如何使用onStatus()创建自定义的错误处理器,请参考如下使用WebClient根据ID获取配料的样例:

1
2
3
4
5
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);

如果ingredientId的值能够匹配已知的资源,那么结果得到的Mono在订阅时就会发布一个Ingredient。但是,如果找不到匹配的配料呢?

当订阅可能会出现错误的Mono或Flux时,很重要的一点就是在调用subscribe()注册数据消费者的同时,也要注册一个错误消费者:

1
2
3
4
5
6
7
8
9
ingredientMono.subscribe(
ingredient -> {
// handle the ingredient data
...
},
error-> {
// deal with the error
...
});

如果能够找到配料资源,那么传递给subscribe()的第一个lambda表达式(数据消费者)将会被调用,并且会将匹配的Ingredient对象传递过来。但是,如果找不到资源,那么请求将会得到一个HTTP 404 (NOT FOUND)状态码的响应,它将会导致第二个lambda表达式(错误消费者)被调用,并且会传递过来一个默认的WebClientResponseException。

WebClientResponseException最大的问题在于它无法明确指出导致Mono失败的原因是什么。它的名字表明在WebClient发起的请求中,出现了响应错误,但是我们需要深入研究WebClientResponseException才能知道哪里出现了错误。无论如何,如果给错误消费者的异常更加专注业务领域而不是专注WebClient,那就更好了。

我们可以添加一个自定义的错误处理器,在这个处理器中可以提供将状态码转换为自己所选择的Throwable的代码。如果请求配料资源时得到的Mono失败,我们就生成一个UnknownIngredientException。在调用retrieve()之后,我们可以添加一个对onStatus()的调用,从而实现这一点:

1
2
3
4
5
6
7
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
response -> Mono.just(new UnknownIngredientException()))
.bodyToMono(Ingredient.class);

调用onStatus()时第一个参数是断言,它会接受一个HttpStatus,如果状态码是我们想要处理的,就将会返回true。如果状态码匹配,响应将会传递给第二个参数的函数并按需进行处理,最终返回Throwable类型的Mono。

在样例中,如果状态码是400级别的(比如客户端错误),那么将会返回包含UnknownIngredientException的Mono。这会导致ingredientMono因为该异常而失败。

需要注意,HttpStatus::is4xxClientError是对HttpStatus的is4xxClientError的方法引用。此时,将会基于HttpStatus对象调用该方法。如果喜欢,还可以使用HttpStatus的其他方法作为方法引用。你也可以以lambda表达式或方法引用的形式提供其他返回boolean类型的函数。

例如,在错误处理中,我们可以更加精确地检查HTTP 404 (NOT FOUND)状态,只需将对onStatus()的调用修改成如下形式即可:

1
2
3
4
5
6
7
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.onStatus(status -> status == HttpStatus.NOT_FOUND,
response -> Mono.just(new UnknownIngredientException()))
.bodyToMono(Ingredient.class);

值得一提的是,我们可以按需调用onStatus()任意多次,以便于处理响应中可能返回的各种HTTP状态码。

11.4.5 交换请求

到目前为止,在使用WebClient的时候,我们都是利用它的retrieve()方法来发送请求。在这些场景中,retrieve()方法会返回一个ResponseSpec类型的对象,通过调用它的onStatus()、bodyToFlux()和bodyToMono()方法,我们就能处理响应。对于简单的场景来说,使用ResponseSpec就足够了,但是它在很多方面都有局限性。如果我们想要访问响应的头信息或cookie的值,那么ResponseSpec就无能为力了。

在使用ResponseSpec遇到困难时,我们就可以通过调用exchange()方法来替换retrieve()方法。exchange()方法会返回ClientResponse类型的Mono,我们可以对它采用各种反应式操作,以便于探测和使用整个响应中的数据,包括载荷、头信息和cookie。

在了解exchange()和retrieve()的差异之前,我们先看一下它们之间的相似之处。如下的代码片段通过WebClient和exchange()方法,根据ID获取单个配料:

1
2
3
4
5
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.exchange()
.flatMap(cr -> cr.bodyToMono(Ingredient.class));

这几乎与使用retrieve()的样例是相同的:

1
2
3
4
5
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);

在exchange()样例中,我们不是使用ResponseSpec对象的bodyToMono()方法来获取Mono<Ingredient>,而是得到了一个Mono<ClientResponse>,通过它我们可以执行扁平化映射(flat-mapping)函数,将ClientResponse映射为Mono<Ingredient>,这样扁平化为最终想要的Mono。

现在,我们看一下exchange()的差异在什么地方。假设请求的响应中会包含一个名为X_UNAVAILABLE的头信息,如果它的值为true,则表明该配料是不可用的(因为某种原因)。为了讨论方便,假设如果这个头信息存在,那么我们希望得到的Mono是空的,不返回任何内容。通过添加另外一个flatMap()调用,我们就能实现这一点。整个的WebClient调用过程如下所示:

1
2
3
4
5
6
7
8
9
10
11
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.exchange()
.flatMap(cr -> {
if (cr.headers().header("X_UNAVAILABLE").contains("true")) {
return Mono.empty();
}
return Mono.just(cr);
})
.flatMap(cr -> cr.bodyToMono(Ingredient.class));

新的flatMap()调用会探查给定ClientRequest对象的响应头,查看是否存在值为true的X_UNAVAILABLE头信息。如果能够找到,就将会返回一个空的Mono;否则,返回一个包含ClientResponse的新Mono。不管是哪种情况,返回的Mono都会扁平化为下一个flatMap()操作所要使用的Mono。