16.9.4 Java9新增的发布-订阅框架

16.9.4 Java9新增的发布-订阅框架

Java9新增了一个发布-订阅框架,该框架是基于异步响应流的。这个发布-订阅框架可以非常方便地处理异步线程之间的流数据交换(比如两个线程之间需要交换数据)。而且这个发布-订阅框架不需要使用数据中心来缓冲数据,同时具有非常高效的性能。

Flow类静态内部接口

这个发布订阅框架使用Flow类的4个静态内部接口作为核心API

方法 描述
static interface Flow.Publisher<T> 代表数据发布者、生产者
static interface Flow.Subscriber<T> 代表数据订阅者、消费者
static interface Flow.Subscription 代表发布者和订阅者之间的链接纽带。
订阅者既可通过调用该对象的request()方法来获取数据项,
也可通过调用对象的cancel()方法来取消订阅。
static interface Flow.Processor<T,​R> 数据处理器,它可同时作为发布者和订阅者使用

Flow.Publisher接口方法

Flow.Publisher作为生产者,负责发布数据项,并注册订阅者。Flow.Publisher接口定义了如下方法来注册订阅者。

方法 描述
void subscribe(Flow.Subscriber<? super T> subscriber) 程序调用此方法注册订阅者时,会触发订阅者的onSubscribe()方法,将Flow.Subscription对象作为参数传给该方法;
如果注册失败,将会触发订阅者的onError()方法。

Flow.Subscriber接口方法

Flow.Subscriber接口定义了如下方法

方法 描述
void onSubscribe(Flow.Subscription subscription) 订阅者注册时自动触发该方法
void onComplete() 当订阅结束时触发该方法
void onError(Throwable throwable) 当订阅失败时触发该方法
void onNext(T item) 订阅者从发布者处获取数据项时触发该方法,订阅者可通过该方法获取数据项

Flow.PublisherSubmissionPublisher实现类

为了处理一些通用发布者的场景,Java9Flow.Publisher提供了一个SubmissionPublisher实现类,它可向当前订阅者异步提交非空的数据项,直到它被关闭。每个订阅者都能以相同的顺序接收到新提交的数据项。
程序创建SubmissionPublisher对象时,需要传入一个线程池作为底层支撑;该类也提供了一个无参数的构造器,该构造器使用ForkJoinPool.commonPool()方法来提交发布者,以此实现发布者向订阅者提供数据项的异步特性。

程序示例 使用SubmissionPublisher作为发布者

下面程序示范了使用SubmissionPublisher作为发布者的用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.util.concurrent.Flow.*;
import java.util.*;
import java.util.concurrent.*;

public class PubSubTest {
public static void main(String[] args) {
// 创建一个SubmissionPublisher作为发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 创建订阅者
MySubscriber<String> subscriber = new MySubscriber<>();
// 注册订阅者
publisher.subscribe(subscriber);
// 发布几个数据项
System.out.println("开发发布数据...");
List.of("Java", "Kotlin", "Go", "Erlang", "Swift", "Lua").forEach(im -> {
// 提交数据
publisher.submit(im);
try {
Thread.sleep(500);
} catch (Exception ex) {
}
});
// 发布结束
publisher.close();
// 发布结束后,为了让发布者线程不会死亡,暂停线程
synchronized ("fkjava") {
try {
"fkjava".wait();
} catch (Exception ex) {
}
}
}
}

// 创建订阅者
class MySubscriber<T> implements Subscriber<T> {
// 发布者与订阅者之间的纽带
private Subscription subscription;

@Override // 订阅时触发该方法
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
// 开始请求数据
subscription.request(1);
}

@Override // 接收到数据时触发该方法
public void onNext(T item) {
System.out.println("获取到数据: " + item);
// 请求下一条数据
subscription.request(1);
}

@Override // 订阅出错时触发该方法
public void onError(Throwable t) {
t.printStackTrace();
synchronized ("fkjava") {
"fkjava".notifyAll();
}
}

@Override // 订阅结束时触发该方法
public void onComplete() {
System.out.println("订阅结束");
synchronized ("fkjava") {
"fkjava".notifyAll();
}
}
}

上面程序中先创建SubmissionPublisher对象,该对象可作为发布者;然后创建订阅者对象,该订阅者类是一个自定义类;接着注册订阅者。
完成上面步骤之后,程序即可调用SubmissionPublisher对象的submit()方法来发布数据项,发布者通过该方法发布数据项
上面程序实现了一个自定义的订阅者,该订阅者实现了Subscriber接口的4个方法,重点就是实现onNext()方法,当订阅者获取到数据时就会触发这个onNext()方法,订阅者通过该方法接收数据
至于订阅者接收到数据项之后的处理,则取决于程序的业务需求
运行该程序,可以看到订阅者逐项获得数据的过程。

1
2
3
4
5
6
7
8
开发发布数据...
获取到数据: Java
获取到数据: Kotlin
获取到数据: Go
获取到数据: Erlang
获取到数据: Swift
获取到数据: Lua
订阅结束