15.3 线程的基本协作机制

15.3 线程的基本协作机制

多线程之间除了竞争访问同一个资源外,也经常需要相互协作,怎么协作呢?本节就来介绍Java中多线程协作的基本机制wait/notify。

都有哪些场景需要协作?wait/notify是什么?如何使用?实现原理是什么?协作的核心是什么?如何实现各种典型的协作场景?本节进行详细讨论,我们先来看看都有哪些协作的场景。

15.3.1 协作的场景

多线程之间需要协作的场景有很多,比如:
1)生产者/消费者协作模式:这是一种常见的协作模式,生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务,如果队列长度有限,在队列满的时候,生产者需要等待,而在队列为空的时候,消费者需要等待。
2)同时开始:类似运动员比赛,在听到比赛开始枪响后同时开始,在一些程序,尤其是模拟仿真程序中,要求多个线程能同时开始。
3)等待结束:主从协作模式也是一种常见的协作模式,主线程将任务分解为若干子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。
4)异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦,一种常见的模式是将子线程的管理封装为异步调用,异步调用马上返回,但返回的不是最终的结果,而是一个一般称为Future的对象,通过它可以在随后获得最终的结果。
5)集合点:类似于学校或公司组团旅游,在旅游过程中有若干集合点,比如出发集合点,每个人从不同地方来到集合点,所有人到齐后进行下一项活动,在一些程序,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

我们会探讨如何实现这些协作场景,在此之前,我们先来了解协作的基本方法wait/notify。

15.3.2 wait/notify

我们知道,Java的根父类是Object, Java在Object类而非Thread类中定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类,一类是wait,另一类是notify。

主要有两个wait方法:

1
2
public final void wait() throws InterruptedException
public final native void wait(long timeout) throws InterruptedException;

一个带时间参数,单位是毫秒,表示最多等待这么长时间,参数为0表示无限期等待;一个不带时间参数,表示无限期等待,实际就是调用wait(0)。在等待期间都可以被中断,如果被中断,会抛出InterruptedException。关于中断及中断处理,我们在下节介绍,本节暂时忽略该异常。

wait实际上做了什么呢?它在等待什么?上节我们说过,每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,会尝试获取锁,如果获取不到则会把当前线程加入等待队列中,其实,除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用Object的notify方法:

1
2
public final native void notify();
public final native void notifyAll();

notify做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll和notify的区别是,它会移除条件队列中所有的线程并全部唤醒。

我们来看个简单的例子,一个线程启动后,在执行一项操作前,它需要等待主线程给它指令,收到指令后才执行,如代码清单15-12所示。

代码清单15-12 简单协作示例WaitThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WaitThread extends Thread {
private volatile boolean fire = false;
@Override
public void run() {
try {
synchronized (this) {
while(! fire) {
wait();
}
}
System.out.println("fired");
} catch(InterruptedException e) {
}
}
public synchronized void fire() {
this.fire = true;
notify();
}
public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}

示例代码中有两个线程,一个是主线程,一个是WaitThread,协作的条件变量是fire, WaitThread等待该变量变为true,在不为true的时候调用wait,主线程设置该变量并调用notify。

两个线程都要访问协作的变量fire,容易出现竞态条件,所以相关代码都需要被synchronized保护。实际上,wait/notify方法只能在synchronized代码块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常java.lang.IllegalMonitor-StateException。

你可能会有疑问,如果wait必须被synchronized保护,那一个线程在wait时,另一个线程怎么可能调用同样被synchronized保护的notify方法呢?它不需要等待锁吗?我们需要进一步理解wait的内部过程,虽然是在synchronized方法内,但调用wait时,线程会释放对象锁。wait的具体过程是:
1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。
2)等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:

  • 如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回。
  • 否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从wait调用中返回。

线程从wait调用中返回后,不代表其等待的条件就一定成立了,它需要重新检查其等待的条件,一般的调用模式是:

1
2
3
4
5
synchronized (obj) {
while(条件不成立)
obj.wait();
//执行条件满足后的操作
}

比如,上例中的代码是:

1
2
3
4
5
synchronized (this) {
while(! fire) {
wait();
}
}

调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁,也就是说,只有在包含notify的synchronized代码块执行完后,等待的线程才会从wait调用中返回。

简单总结一下,wait/notify方法看上去很简单,但往往难以理解wait等的到底是什么,而notify通知的又是什么,我们需要知道,它们被不同的线程调用,但共享相同的锁和条件等待队列(相同对象的synchronized代码块内),它们围绕一个共享的条件变量进行协作,这个条件变量是程序自己维护的,当条件不成立时,线程调用wait进入条件等待队列,另一个线程修改了条件变量后调用notify,调用wait的线程唤醒后需要重新检查条件变量。从多线程的角度看,它们围绕共享变量进行协作,从调用wait的线程角度看,它阻塞等待一个条件的成立。我们在设计多线程协作时,需要想清楚协作的共享变量和条件是什么,这是协作的核心。接下来,我们通过一些场景进一步理解wait/notify的应用。

15.3.3 生产者/消费者模式

在生产者/消费者模式中,协作的共享变量是队列,生产者往队列上放数据,如果满了就wait,而消费者从队列上取数据,如果队列为空也wait。我们将队列作为单独的类进行设计,如代码清单15-13所示。

代码清单15-13 生产者/消费者协作队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<>(limit);
}
public synchronized void put(E e) throws InterruptedException {
while(queue.size() == limit) {
wait();
}
queue.add(e);
notifyAll();
}
public synchronized E take() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
E e = queue.poll();
notifyAll();
return e;
}
}

MyBlockingQueue是一个长度有限的队列,长度通过构造方法的参数进行传递,有两个方法:put和take。put是给生产者使用的,往队列上放数据,满了就wait,放完之后调用notifyAll,通知可能的消费者。take是给消费者使用的,从队列中取数据,如果为空就wait,取完之后调用notifyAll,通知可能的生产者。

我们看到,put和take都调用了wait,但它们的目的是不同的,或者说,它们等待的条件是不一样的,put等待的是队列不为满,而take等待的是队列不为空,但它们都会加入相同的条件等待队列。由于条件不同但又使用相同的等待队列,所以要调用notifyAll而不能调用notify,因为notify只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用。

只能有一个条件等待队列,这是Java wait/notify机制的局限性,这使得对于等待条件的分析变得复杂,后续章节我们会介绍显式的锁和条件,它可以解决该问题。

一个简单的生产者代码如代码清单15-14所示。

代码清单15-14 一个简单的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
while(true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce task " + task);
num++;
Thread.sleep((int) (Math.random() * 100));
}
} catch (InterruptedException e) {
}
}
}

Producer向共享队列中插入模拟的任务数据。一个简单的消费者代码如代码清单15-15所示。

代码清单15-15 一个简单的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String task = queue.take();
System.out.println("handle task " + task);
Thread.sleep((int)(Math.random()*100));
}
} catch(InterruptedException e) {
}
}
}

主程序的示例代码如下所示:

1
2
3
4
5
public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
new Producer(queue).start();
new Consumer(queue).start();
}

运行该程序,会看到生产者和消费者线程的输出交替出现。

我们实现的MyBlockingQueue主要用于演示,Java提供了专门的阻塞队列实现,包括:

  • 接口BlockingQueue和BlockingDeque。
  • 基于数组的实现类ArrayBlockingQueue。
  • 基于链表的实现类LinkedBlockingQueue和LinkedBlockingDeque。
  • 基于堆的实现类PriorityBlockingQueue。

我们会在后续章节介绍这些类,在实际系统中,应该优先考虑使用这些类。

15.3.4 同时开始

同时开始,类似于运动员比赛,在听到比赛开始枪响后同时开始,下面,我们模拟这个过程。这里,有一个主线程和N个子线程,每个子线程模拟一个运动员,主线程模拟裁判,它们协作的共享变量是一个开始信号。我们用一个类FireFlag来表示这个协作对象,如代码清单15-16所示。

代码清单15-16 协作对象FireFlag
1
2
3
4
5
6
7
8
9
10
11
12
static class FireFlag {
private volatile boolean fired = false;
public synchronized void waitForFire() throws InterruptedException {
while(! fired) {
wait();
}
}
public synchronized void fire() {
this.fired = true;
notifyAll();
}
}

子线程应该调用waitForFire()等待枪响,而主线程应该调用fire()发射比赛开始信号。

表示比赛运动员的类如代码清单15-17所示。

代码清单15-17 表示比赛运动员的类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class Racer extends Thread {
FireFlag fireFlag;
public Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
@Override
public void run() {
try {
this.fireFlag.waitForFire();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}

主程序代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
int num = 10;
FireFlag fireFlag = new FireFlag();
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(fireFlag);
racers[i].start();
}
Thread.sleep(1000);
fireFlag.fire();
}

这里,启动了10个子线程,每个子线程启动后等待fire信号,主线程调用fire()后各个子线程才开始执行后续操作。

15.3.5 等待结束

在15.1.2节中我们使用join方法让主线程等待子线程结束,join实际上就是调用了wait,其主要代码是:

1
2
3
while (isAlive()) {
wait(0);
}

只要线程是活着的,isAlive()返回true, join就一直等待。谁来通知它呢?当线程运行结束的时候,Java系统调用notifyAll来通知。

使用join有时比较麻烦,需要主线程逐一等待每个子线程。这里,我们演示一种新的写法。主线程与各个子线程协作的共享变量是一个数,这个数表示未完成的线程个数,初始值为子线程个数,主线程等待该值变为0,而每个子线程结束后都将该值减一,当减为0时调用notifyAll,我们用MyLatch来表示这个协作对象,如代码清单15-18所示。

代码清单15-18 协作对象MyLatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyLatch {
private int count;
public MyLatch(int count) {
this.count = count;
}
public synchronized void await() throws InterruptedException {
while(count > 0) {
wait();
}
}
public synchronized void countDown() {
count--;
if(count <= 0) {
notifyAll();
}
}
}

这里,MyLatch构造方法的参数count应初始化为子线程的个数,主线程应该调用await(),而子线程在执行完后应该调用countDown()。工作子线程的示例代码如代码清单15-19所示。

代码清单15-19 使用MyLatch的工作子线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class Worker extends Thread {
MyLatch latch;
public Worker(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
//simulate working on task
Thread.sleep((int) (Math.random() * 1000));
this.latch.countDown();
} catch (InterruptedException e) {
}
}
}

主线程的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
MyLatch latch = new MyLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i = 0; i < workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}

MyLatch是一个用于同步协作的工具类,主要用于演示基本原理,在Java中有一个专门的同步类CountDownLatch,在实际开发中应该使用它。关于CountDownLatch,我们会在后续章节介绍。

MyLatch的功能是比较通用的,它也可以应用于上面“同时开始”的场景,初始值设为1, Racer类调用await(),主线程调用countDown()即可,如代码清单15-20所示。

代码清单15-20 使用MyLatch实现同时开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RacerWithLatchDemo {
static class Racer extends Thread {
MyLatch latch;
public Racer(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
this.latch.await();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
MyLatch latch = new MyLatch(1);
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}

15.3.6 异步结果

在主从模式中,手工创建线程往往比较麻烦,一种常见的模式是异步调用,异步调用返回一个一般称为Future的对象,通过它可以获得最终的结果。在Java中,表示子任务的接口是Callable,声明为:

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

为表示异步调用的结果,我们定义一个接口MyFuture,如下所示:

1
2
3
public interface MyFuture <V> {
V get() throws Exception ;
}

这个接口的get方法返回真正的结果,如果结果还没有计算完成,get方法会阻塞直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常。

为方便主线程调用子任务,我们定义一个类MyExecutor,其中定义一个public方法execute,表示执行子任务并返回异步结果,声明如下:

1
public <V> MyFuture<V> execute(final Callable<V> task)

利用该方法,对于主线程,就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果。比如,在主线程中,可以类似代码清单15-21那样启动异步调用并获取结果:

代码清单15-21 异步调用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
//子任务
Callable<Integer> subTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//…执行异步任务
int millis = (int) (Math.random() * 1000);
Thread.sleep(millis);
return millis;
}
};
//异步调用,返回一个MyFuture对象
MyFuture<Integer> future = executor.execute(subTask);
//…执行其他操作
try {
//获取异步调用的结果
Integer result = future.get();
System.out.println(result);
} catch(Exception e) {
e.printStackTrace();
}
}

MyExecutor的execute方法是怎么实现的呢?它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程,该子线程如代码清单15-22所示。

代码清单15-22 执行子线程ExecuteThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static class ExecuteThread<V> extends Thread {
private V result = null;
private Exception exception = null;
private boolean done = false;
private Callable<V> task;
private Object lock;
public ExecuteThread(Callable<V> task, Object lock) {
this.task = task;
this.lock = lock;
}
@Override
public void run() {
try {
result = task.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
}
public V getResult() {
return result;
}
public boolean isDone() {
return done;
}
public Exception getException() {
return exception;
}
}

这个子线程执行实际的子任务,记录执行结果到result变量、异常到exception变量,执行结束后设置共享状态变量done为true,并调用notifyAll,以唤醒可能在等待结果的主线程。

MyExecutor的execute方法如代码清单15-23所示。

代码清单15-23 异步执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public <V> MyFuture<V> execute(final Callable<V> task) {
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
thread.start();
MyFuture<V> future = new MyFuture<V>() {
@Override
public V get() throws Exception {
synchronized (lock) {
while(! thread.isDone()) {
try {
lock.wait();
} catch (InterruptedException e) {
}
}
if(thread.getException() ! = null) {
throw thread.getException();
}
return thread.getResult();
}
}
};
return future;
}

execute启动一个线程,并返回MyFuture对象,MyFuture的get方法会阻塞等待直到线程运行结束。

以上的MyExecutore和MyFuture主要用于演示基本原理,实际上,Java中已经包含了一套完善的框架Executors,相关的部分接口和类有:

  • 表示异步结果的接口Future和实现类FutureTask。
  • 用于执行异步任务的接口Executor,以及有更多功能的子接口ExecutorService。
  • 用于创建Executor和ExecutorService的工厂方法类Executors。

后续章节,我们会详细介绍这套框架。

15.3.7 集合点

各个线程先是分头行动,各自到达一个集合点,在集合点需要集齐所有线程,交换数据,然后再进行下一步动作。怎么表示这种协作呢?协作的共享变量依然是一个数,这个数表示未到集合点的线程个数,初始值为子线程个数,每个线程到达集合点后将该值减一,如果不为0,表示还有别的线程未到,进行等待,如果变为0,表示自己是最后一个到的,调用notifyAll唤醒所有线程。我们用AssemblePoint类来表示这个协作对象,如代码清单15-24所示。

代码清单15-24 协作对象AssemblePoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class AssemblePoint {
private int n;
public AssemblePoint(int n) {
this.n = n;
}
public synchronized void await() throws InterruptedException {
if(n > 0) {
n--;
if(n == 0) {
notifyAll();
} else {
while(n ! = 0) {
wait();
}
}
}
}
}

多个游客线程各自先独立运行,然后使用该协作对象到达集合点进行同步的示例如代码清单15-25所示。

代码清单15-25 集合点协作示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AssemblePointDemo {
static class Tourist extends Thread {
AssemblePoint ap;
public Tourist(AssemblePoint ap) {
this.ap = ap;
}
@Override
public void run() {
try {
//模拟先各自独立运行
Thread.sleep((int) (Math.random() * 1000));
//集合
ap.await();
System.out.println("arrived");
//…集合后执行其他操作
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
int num = 10;
Tourist[] threads = new Tourist[num];
AssemblePoint ap = new AssemblePoint(num);
for(int i = 0; i < num; i++) {
threads[i] = new Tourist(ap);
threads[i].start();
}
}
}

这里实现的AssemblePoint主要用于演示基本原理,Java中有一个专门的同步工具类CyclicBarrier可以替代它,关于该类,我们后续章节介绍。

15.3.8 小结

本节介绍了Java中线程间协作的基本机制wait/notify,协作关键要想清楚协作的共享变量和条件是什么,为进一步理解,针对多种协作场景,我们演示了wait/notify的用法及基本协作原理。Java中有专门为协作而建的阻塞队列、同步工具类,以及Executors框架,我们会在后续章节介绍。在实际开发中,应该尽量使用这些现成的类,而非“重新发明轮子”。