10.1 反应式编程概览

10.1 反应式编程概览

反应式编程是一种可以替代命令式编程的编程范式。这种可替代性存在的原因在于反应式编程解决了命令式编程中的一些限制。理解这些限制,有助于你更好地理解反应式编程模型的优点。

注意:反应式编程不是银弹。你不应该从这一章或者其他任何关于反应式编程的讨论中得出“命令式编程是邪恶的,而反应式编程才是你的救星”的结论。如同我们作为开发者学习到的任何技术一样,反应式编程对于某些使用场景来说的确是完美的,但是在其他的一些场景中可能不那么适用。建议以实用主义为上。

如果你和我以及绝大多数的开发者一样,是从命令式编程开始入行的,那么很可能你现在编写的大部分(或者所有)代码在将来依然是命令式的。命令式编程相当直观,没有编程经验的学生们可以在学校的STEM教育课程中轻松地学习它,而且足够强大。在驱动大型企业运行的代码中,绝大部分都是命令式的。

它的理念很简单:你可以一次一个地按照顺序将代码编写为需要遵循的指令列表。在某项任务开始执行之后,程序在开始下一项任务之前需要等待当前任务完成。在整个处理过程中的每一步,要处理的数据都必须是完全可用的,以便将它们作为一个整体进行处理。

一开始一切都很美好,直到我们遇到问题。在执行一项任务的时候,特别是IO任务(将数据写入DB或者从远程服务器获取数据),触发这项任务的线程实际上是被阻塞的,在任务完成之前它不能做任何事情。坦白来说,阻塞线程是一种浪费。

大多数编程语言(包括Java)都支持并发编程。在Java中创建一个线程,然后让它执行某些操作,而调用线程继续执行其他工作,这是相当容易实现的。虽然创建线程很简单,但是这些线程多半最终会被阻塞。管理多线程中的并发极具挑战,而更多线程则意味着更多的复杂性。

相比之下,反应式编程本质上是函数式的和声明式的。相对于描述一组将依次执行的步骤,反应式编程描述了数据将会流经的管道或者流。相对于要求将被处理的数据作为一个整体进行处理,反应式流可以在数据可用时立即开始处理。实际上,传入的数据可能是无限的(比如,一个某个地理位置的实时温度测量数据的恒定流)。

拿现实世界类比一下,可以将命令式编程看作是水气球,而将反应式编程看作是花园里的软管。在夏天,这两者都是偷袭和愉悦毫无戒心的朋友的好方式,但是它们的运作方式却不同:

  • 水气球只能一次性地填满有效载荷,并在撞到目标时弄湿对象。水气球的容量有限,如果你想要弄湿更多人(或者把同一个人弄得更加湿透一点),那么唯一的选择就是增加水气球的数量。
  • 花园软管的有效载荷是从水龙头到喷嘴的水流。在特定的时间点,花园软管的容量可能是有限的,但是在打水仗的过程中它的容量却是无限的。只要水源源不断地从龙头流入软管中,水就会源源不断地从喷嘴喷出去。同一个软管非常好扩展,你可以尽情地和更多的朋友打水仗。

虽然水气球(或者命令式编程)没有什么固有的问题,但是持有软管(或者能够应用反应式编程)的人通常在伸缩性和性能方面更具优势。

定义反应式流

反应式流(Reactive Streams)是由Netflix、Lightbend和Pivotal(Spring背后的公司)的工程师于2013年年底开始制定的一种规范。反应式流旨在提供无阻塞回压的异步流处理标准。

我们已经触及了反应式编程的异步特性,它使我们能够并行执行任务,从而实现更高的可伸缩性。通过回压,数据消费者可以限制它们想要处理的数据数量,避免被过快的数据源所淹没。

Java的流和反应式流

Java的流和反应式流之间有很多相似之处。首先,它们的名字中都有流(Stream)这个词。它们还提供了用于处理数据的函数式API。事实上,正如你稍后将会在我们介绍Reactor时看到的那样,它们甚至可以共享许多相同的操作。

Java的流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使用函数来对集合进行迭代的一种方式。

反应式流支持异步处理任意大小的数据集,同样也包括无限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。

反应式流规范可以总结为4个接口:Publisher、Subscriber、Subscription和Processor。Publisher负责生成数据,并将数据发送给Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法subscribe(),Subscriber可以通过该方法向Publisher发起订阅。

反应式流规范可以总结为4个接口:Publisher、Subscriber、Subscription和Processor。Publisher负责生成数据,并将数据发送给Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法subscribe(),Subscriber可以通过该方法向Publisher发起订阅。

1
2
3
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}

一旦Subscriber订阅成功,就可以接收来自Publisher的事件。这些事件是通过Subscriber接口上的方法发送的:

1
2
3
4
5
6
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onNext(T item);
void onError(Throwable ex);
void onComplete();
}

Subscriber的第一个事件是通过对onSubscribe()方法的调用接收的。Publisher调用onSubscribe() 方法时,会将Subscription对象传递给Subscriber。通过Subscription,Subscriber可以管理其订阅情况:

1
2
3
4
public interface Subscription {
void request(long n);
void cancel();
}

Subscriber可以通过调用request()方法来请求Publisher发送数据,或者通过调用cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用request()时,Subscriber可以传入一个long类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免Publisher发送多于Subscriber能够处理的数据量。在Publisher发送完所请求数量的数据项之后,Subscriber可以再次调用request()方法来请求更多的数据。

Subscriber请求数据之后,数据就会开始流经反应式流。Publisher发布的每个数据项都会通过调用Subscriber的onNext()方法递交给Subscriber。如果有任何错误,就会调用onError()方法。如果Publisher没有更多的数据,也不会继续产生更多的数据,那么将会调用Subscriber的onComplete()方法来告知Subscriber它已经结束。

至于Processor接口,它是Subscriber和Publisher的组合,如下所示:

1
2
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {}

当作为Subscriber时,Processor会接收数据并以某种方式对数据进行处理。然后它会将角色转变为Publisher,并将处理的结果发布给它的Subscriber。

正如你所看到的,反应式流的规范非常简单,很容易就能想出如何构建一个以Publisher作为开始的数据处理管道,并让数据通过零个或多个Processor,然后将最终结果投递给Subscriber。

然而,反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。我们将会在后面的内容中看到,Reactor构成了Spring 5反应式编程模型的基础。在本章的其余部分,我们将会探讨(并且,我敢说这个过程非常有意思)Reactor项目。