16.9.4 Java9新增的发布-订阅框架
16.9.4 Java9新增的发布-订阅框架
Java9
新增了一个发布-订阅框架,该框架是基于异步响应流
的。这个发布-订阅框架可以非常方便地处理异步线程之间的流数据交换
(比如两个线程之间需要交换数据)。而且这个发布-订阅框架不需要使用数据中心来缓冲数据,同时具有非常高效的性能。
Flow类静态内部接口
这个发布订阅框架使用Flow
类的4个静态内部接口作为核心API
方法 | 描述 |
---|---|
static interface Flow.Publisher<T> |
代表数据发布者、生产者 |
static interface Flow.Subscriber<T> |
代表数据订阅者、消费者 |
static interface Flow.Subscription |
代表发布者和订阅者之间的链接纽带。 订阅者既可通过调用该对象的 request() 方法来获取数据项,也可通过调用对象的 cancel() 方法来取消订阅。 |
static interface Flow.Processor<T,R> |
数据处理器,它可同时作为发布者和订阅者使用 |
Flow.Publisher
接口方法
Flow.Publisher
作为生产者,负责发布数据项,并注册订阅者。Flow.Publisher
接口定义了如下方法来注册订阅者。
方法 | 描述 |
---|---|
void subscribe(Flow.Subscriber<? super T> subscriber) |
程序调用此方法注册订阅者时,会触发订阅者的onSubscribe() 方法,将Flow.Subscription 对象作为参数传给该方法;如果注册失败,将会触发订阅者的 onError() 方法。 |
Flow.Subscriber
接口方法
Flow.Subscriber
接口定义了如下方法
方法 | 描述 |
---|---|
void onSubscribe(Flow.Subscription subscription) |
订阅者注册时自动触发该方法 |
void onComplete() |
当订阅结束时触发该方法 |
void onError(Throwable throwable) |
当订阅失败时触发该方法 |
void onNext(T item) |
订阅者从发布者处获取数据项时触发该方法,订阅者可通过该方法获取数据项 |
Flow.Publisher
的SubmissionPublisher
实现类
为了处理一些通用发布者的场景,Java9
为Flow.Publisher
提供了一个SubmissionPublisher
实现类,它可向当前订阅者异步提交非空的数据项,直到它被关闭。每个订阅者都能以相同的顺序接收到新提交的数据项。
程序创建SubmissionPublisher
对象时,需要传入一个线程池作为底层支撑;该类也提供了一个无参数的构造器,该构造器使用ForkJoinPool.commonPool()
方法来提交发布者,以此实现发布者向订阅者提供数据项的异步特性。
程序示例 使用SubmissionPublisher
作为发布者
下面程序示范了使用SubmissionPublisher
作为发布者的用法。
1 | import java.util.concurrent.Flow.*; |
上面程序中先创建SubmissionPublisher
对象,该对象可作为发布者;然后创建订阅者对象,该订阅者类是一个自定义类;接着注册订阅者。
完成上面步骤之后,程序即可调用SubmissionPublisher
对象的submit()
方法来发布数据项,发布者通过该方法发布数据项。
上面程序实现了一个自定义的订阅者,该订阅者实现了Subscriber
接口的4个方法,重点就是实现onNext()
方法,当订阅者获取到数据时就会触发这个onNext()
方法,订阅者通过该方法接收数据。
至于订阅者接收到数据项之后的处理,则取决于程序的业务需求
运行该程序,可以看到订阅者逐项获得数据的过程。
1 | 开发发布数据... |