第19章 同步和协作工具类

我们在15.3节实现了线程的一些基本协作机制,那是利用基本的wait/notify实现的。我们提到,Java并发包中有一些专门的同步和协作工具类,本章,我们就来探讨它们。具体工具类包括:

  • 读写锁ReentrantReadWriteLock。
  • 信号量Semaphore。
  • 倒计时门栓CountDownLatch。
  • 循环栅栏CyclicBarrier。

此外,有一个实现线程安全的特殊概念:线程本地变量ThreadLocal,本章也会进行介绍。

与第15章介绍的显式锁和显式条件类似,除了ThreadLocal外,这些同步和协作类都是基于AQS实现的。在一些特定的同步协作场景中,相比使用最基本的wait/notify以及显式锁/条件,它们更为方便,效率更高。下面,我们就来探讨它们的基本概念、用法、用途和基本原理。

19.1 读写锁ReentrantReadWriteLock

之前章节我们介绍了两种锁:synchronized和显式锁ReentrantLock,对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。

怎么让读操作能够并行,又不影响一致性呢?答案是使用读写锁。在Java并发包中,接口ReadWriteLock表示读写锁,主要实现类是可重入读写锁ReentrantReadWriteLock。ReadWriteLock的定义为:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

通过一个ReadWriteLock产生两个锁:一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。需要注意的是,只有“读-读”操作是可以并行的,“读-写”和“写-写”都不可以。只有一个线程可以进行写操作,在获取写锁时,只有没有任何线程持有任何锁才可以获取到,在持有写锁时,其他任何线程都获取不到任何锁。在没有其他线程持有写锁的情况下,多个线程可以获取和持有读锁。

ReentrantReadWriteLock是可重入的读写锁,它有两个构造方法,如下所示:

1
2
public ReentrantLock()
public ReentrantLock(boolean fair)

fire表示是否公平,如果不传递则是false,含义与16.2节介绍的类似,就不赘述了。

我们看个读写锁的应用,使用ReentrantReadWriteLock实现一个缓存类MyCache,如代码清单19-1所示。

代码清单19-1 使用读写锁实现一个缓存类MyCache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MyCache {
private Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public Object get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Object put(String key, Object value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
public void clear() {
writeLock.lock();
try {
map.clear();
} finally {
writeLock.unlock();
}
}
}

代码比较简单,就不赘述了。读写锁是怎么实现的呢?读锁和写锁看上去是两个锁,它们是怎么协调的?具体实现比较复杂,我们简述下其思路。

内部,它们使用同一个整数变量表示锁的状态,16位给读锁用,16位给写锁用,使用一个变量便于进行CAS操作,锁的等待队列其实也只有一个

写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。

读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了0,如果是,唤醒等待队列中的下一个线程。

18.3 定时任务的那些陷阱

本节探讨定时任务,定时任务的应用场景是非常多的,比如:

  • 闹钟程序或任务提醒,指定时间叫床或在指定日期提醒还信用卡。
  • 监控系统,每隔一段时间采集下系统数据,对异常事件报警。
  • 统计系统,一般凌晨一定时间统计昨日的各种数据指标。

在Java中,主要有两种方式实现定时任务:

  • 使用java.util包中的Timer和TimerTask。
  • 使用Java并发包中的ScheduledExecutorService。

它们的基本用法都是比较简单的,但如果对它们没有足够的了解,则很容易陷入其中的一些陷阱。下面,我们就来介绍它们的用法、原理以及那些陷阱。

18.3.1 Timer和TimerTask

我们先介绍它们的基本用法和示例,然后介绍它们的实现原理和一些注意事项。

1.基本用法

TimerTask表示一个定时任务,它是一个抽象类,实现了Runnable,具体的定时任务需要继承该类,实现run方法。Timer是一个具体类,它负责定时任务的调度和执行,主要方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//在指定绝对时间time运行任务task
public void schedule(TimerTask task, Date time)
//在当前时间延时delay毫秒后运行任务task
public void schedule(TimerTask task, long delay)
//固定延时重复执行,第一次计划执行时间为firstTime,
//后一次的计划执行时间为前一次"实际"执行时间加上period
public void schedule(TimerTask task, Date firstTime, long period)
//同样是固定延时重复执行,第一次执行时间为当前时间加上delay
public void schedule(TimerTask task, long delay, long period)
//固定频率重复执行,第一次计划执行时间为firstTime,
//后一次的计划执行时间为前一次"计划"执行时间加上period
public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period)
//同样是固定频率重复执行,第一次计划执行时间为当前时间加上delay
public void scheduleAtFixedRate(TimerTask task, long delay, long period)

需要注意固定延时(fixed-delay)与固定频率(fixed-rate)的区别,二者都是重复执行,但后一次任务执行相对的时间是不一样的,对于固定延时,它是基于上次任务的“实际”执行时间来算的,如果由于某种原因,上次任务延时了,则本次任务也会延时,而固定频率会尽量补够运行次数

另外,需要注意的是,如果第一次计划执行的时间firstTime是一个过去的时间,则任务会立即运行,对于固定延时的任务,下次任务会基于第一次执行时间计算,而对于固定频率的任务,则会从firstTime开始算,有可能加上period后还是一个过去时间,从而连续运行很多次,直到时间超过当前时间。我们通过一些简单的例子具体来看下。

2.基本示例

看一个最简单的例子,如代码清单18-3所示。

代码清单18-3 Timer基本示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class BasicTimer {
static class DelayTask extends TimerTask {
@Override
public void run() {
System.out.println("delayed task");
}
}
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new DelayTask(), 1000);
Thread.sleep(2000);
timer.cancel();
}
}

创建一个Timer对象,1秒钟后运行DelayTask,最后调用Timer的cancel方法取消所有定时任务。

看一个固定延时的简单例子,如代码清单18-4所示。

代码清单18-4 Timer固定延时示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TimerFixedDelay {
static class LongRunningTask extends TimerTask {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("long running finished");
}
}
static class FixedDelayTask extends TimerTask {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
}
}
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new LongRunningTask(), 10);
timer.schedule(new FixedDelayTask(), 100, 1000);
}
}

有两个定时任务,第一个运行一次,但耗时5秒,第二个是重复执行,1秒一次,第一个先运行。运行该程序,会发现,第二个任务只有在第一个任务运行结束后才会开始运行,运行后1秒一次。如果替换上面的代码为固定频率,即变为代码清单18-5所示。

代码清单18-5 Timer固定频率示例
1
2
3
4
5
6
7
8
9
10
11
12
13
public class TimerFixedRate {
static class LongRunningTask extends TimerTask {
//省略,与代码清单18-4一样
}
static class FixedRateTask extends TimerTask {
//省略,与代码清单18-4一样
}
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new LongRunningTask(), 10);
timer.scheduleAtFixedRate(new FixedRateTask(), 100, 1000);
}
}

运行该程序,第二个任务同样只有在第一个任务运行结束后才会运行,但它会把之前没有运行的次数补过来,一下子运行5次,输出类似下面这样:

1
2
3
4
5
6
7
long running finished
1489467662330
1489467662330
1489467662330
1489467662330
1489467662330
1489467662419

3.基本原理

Timer内部主要由任务队列和Timer线程两部分组成。任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级。Timer线程负责执行所有的定时任务,需要强调的是,一个Timer对象只有一个Timer线程,所以,对于上面的例子,任务会被延迟。

Timer线程主体是一个循环,从队列中获取任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它,如果队列中没有任务或第一个任务延时还没到,就睡眠。如果睡眠过程中队列上添加了新任务且新任务是第一个任务,Timer线程会被唤醒,重新进行检查。

在执行任务之前,Timer线程判断任务是否为周期任务,如果是,就设置下次执行的时间并添加到优先级队列中,对于固定延时的任务,下次执行时间为当前时间加上period,对于固定频率的任务,下次执行时间为上次计划执行时间加上period

需要强调是,下次任务的计划是在执行当前任务之前就做出了的,对于固定延时的任务,延时相对的是任务执行前的当前时间,而不是任务执行后,这与后面讲到的Sched-uledExecutorService的固定延时计算方法是不同的,后者的计算方法更合乎一般的期望。对于固定频率的任务,延时相对的是最先的计划,所以,很有可能会出现前面例子中一下子执行很多次任务的情况。

4.死循环

一个Timer对象只有一个Timer线程,这意味着,定时任务不能耗时太长,更不能是无限循环。看个例子,如代码清单18-6所示。

代码清单18-6 Timer死循环示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class EndlessLoopTimer {
static class LoopTask extends TimerTask {
@Override
public void run() {
while (true) {
try {
//模拟执行任务
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//永远也没有机会执行
static class ExampleTask extends TimerTask {
@Override
public void run() {
System.out.println("hello");
}
}
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new LoopTask(), 10);
timer.schedule(new ExampleTask(), 100);
}
}

第一个定时任务是一个无限循环,其后的定时任务ExampleTask将永远没有机会执行。

5.异常处理

关于Timer线程,还需要强调非常重要的一点:在执行任何一个任务的run方法时,一旦run抛出异常,Timer线程就会退出,从而所有定时任务都会被取消。我们看个简单的示例,如代码清单18-7所示。

代码清单18-7 Timer异常示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TimerException {
static class TaskA extends TimerTask {
@Override
public void run() {
System.out.println("task A");
}
}
static class TaskB extends TimerTask {
@Override
public void run() {
System.out.println("task B");
throw new RuntimeException();
}
}
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new TaskA(), 1, 1000);
timer.schedule(new TaskB(), 2000, 1000);
}
}

期望TaskA每秒执行一次,但TaskB会抛出异常,导致整个定时任务被取消,程序终止,屏幕输出为:

1
2
3
4
5
6
7
task A
task A
task B
Exception in thread "Timer-0" java.lang.RuntimeException
at laoma.demo.timer.TimerException$TaskB.run(TimerException.java:21)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

所以,如果希望各个定时任务不互相干扰,一定要在run方法内捕获所有异常

6.小结

可以看到,Timer/TimerTask的基本使用是比较简单的,但我们需要注意:

  • 后台只有一个线程在运行;
  • 固定频率的任务被延迟后,可能会立即执行多次,将次数补够;
  • 固定延时任务的延时相对的是任务执行前的时间;
  • 不要在定时任务中使用无限循环;
  • 一个定时任务的未处理异常会导致所有定时任务被取消。

18.3.2 ScheduledExecutorService

由于Timer/TimerTask的一些问题,Java并发包引入了ScheduledExecutorService,下面我们介绍它的基本用法、基本示例和基本原理。

1.基本用法

ScheduledExecutorService是一个接口,其定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ScheduledExecutorService extends ExecutorService {
//单次执行,在指定延时delay后运行command
public ScheduledFuture<? > schedule(Runnable command, long delay,
TimeUnit unit);
//单次执行,在指定延时delay后运行callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit);
//固定频率重复执行
public ScheduledFuture<? > scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit);
//固定延时重复执行
public ScheduledFuture<? > scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit);
}

它们的返回类型都是ScheduledFuture,它是一个接口,扩展了Future和Delayed,没有定义额外方法。这些方法的大部分语义与Timer中的基本是类似的。对于固定频率的任务,第一次执行时间为initialDelay后,第二次为initialDelay+period,第三次为initial-Delay+2*period,以此类推。不过,对于固定延时的任务,它是从任务执行后开始算的,第一次为initialDelay后,第二次为第一次任务执行结束后再加上delay。与Timer不同,它不支持以绝对时间作为首次运行的时间。

ScheduledExecutorService的主要实现类是ScheduledThreadPoolExecutor,它是线程池ThreadPoolExecutor的子类,是基于线程池实现的,它的主要构造方法是:

1
public ScheduledThreadPoolExecutor(int corePoolSize)

此外,还有构造方法可以接受参数ThreadFactory和RejectedExecutionHandler,含义与ThreadPoolExecutor一样,我们就不赘述了。

它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使core-PoolSize设为0,它也会至少运行一个线程。

工厂类Executors也提供了一些方便的方法,以方便创建ScheduledThreadPoolExecutor,如下所示:

1
2
3
4
5
6
7
8
9
//单线程的定时任务执行服务
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(
ThreadFactory threadFactory)
//多线程的定时任务执行服务
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory)

2.基本示例

由于可以有多个线程执行定时任务,一般任务就不会被某个长时间运行的任务所延迟了。比如,对于代码清单18-4所示的TimerFixedDelay,如果改为代码清单18-8所示:

代码清单18-8 多线程的定时任务执行服务示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ScheduledFixedDelay {
static class LongRunningTask implements Runnable {
//省略,与代码清单18-4一样
}
static class FixedDelayTask implements Runnable {
//省略,与代码清单18-4一样
}
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService timer = Executors
.newScheduledThreadPool(10);
timer.schedule(new LongRunningTask(), 10, TimeUnit.MILLISECONDS);
timer.scheduleWithFixedDelay(new FixedDelayTask(), 100, 1000,
TimeUnit.MILLISECONDS);
}
}

再次执行,第二个任务就不会被第一个任务延迟了。

另外,与Timer不同,单个定时任务的异常不会再导致整个定时任务被取消,即使后台只有一个线程执行任务。我们看个例子,如代码清单18-9所示。

代码清单18-9 ScheduledExecutorService异常示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ScheduledException {
static class TaskA implements Runnable {
@Override
public void run() {
System.out.println("task A");
}
}
static class TaskB implements Runnable {
@Override
public void run() {
System.out.println("task B");
throw new RuntimeException();
}
}
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService timer = Executors
.newSingleThreadScheduledExecutor();
timer.scheduleWithFixedDelay(new TaskA(), 0, 1, TimeUnit.SECONDS);
timer.scheduleWithFixedDelay(new TaskB(), 2, 1, TimeUnit.SECONDS);
}
}

TaskA和TaskB都是每秒执行一次,TaskB两秒后执行,但一执行就抛出异常,屏幕的输出类似如下:

1
2
3
4
5
6
task A
task A
task B
task A
task A

这说明,定时任务TaskB被取消了,但TaskA不受影响,即使它们是由同一个线程执行的。不过,需要强调的是,与Timer不同,没有异常被抛出,TaskB的异常没有在任何地方体现。所以,与Timer中的任务类似,应该捕获所有异常

3.基本原理

ScheduledThreadPoolExecutor的实现思路与Timer基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,它的主要不同是:

1)它的背后是线程池,可以有多个线程执行任务。
2)它在任务执行后再设置下次执行的时间,对于固定延时的任务更为合理。
3)任务执行线程会捕获任务执行过程中的所有异常,一个定时任务的异常不会影响其他定时任务,不过,发生异常的任务(即使是一个重复任务)不会再被调度。

18.3.3 小结

本节介绍了Java中定时任务的两种实现方式:Timer和ScheduledExecutorService,需要特别注意Timer的一些陷阱,实践中建议使用ScheduledExecutorService。

它们的共同局限是不太胜任复杂的定时任务调度。比如,每周一和周三晚上18:00到22:00,每半小时执行一次。对于类似这种需求,可以利用我们之前在第7章介绍的日期和时间处理方法,或者利用更为强大的第三方类库,比如Quartz(http://www.quartz-scheduler.org/ )。

在并发应用程序中,一般我们应该尽量利用高层次的服务,比如各种并发容器、任务执行服务和线程池等,避免自己管理线程和它们之间的同步。但在个别情况下,自己管理线程及同步是必需的,这时,除了利用前面章节介绍的synchronized显式锁和条件等基本工具,Java并发包还提供了一些高级的同步和协作工具,以方便实现并发应用,让我们下一章来了解它们。

18.2 线程池

线程池是并发程序中一个非常重要的概念和技术。线程池,顾名思义,就是一个线程的池子,里面有若干线程,它们的目的就是执行提交给线程池的任务,执行完一个任务后不会退出,而是继续等待或执行新任务。线程池主要由两个概念组成:一个是任务队列;另一个是工作者线程。工作者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务。

线程池的概念类似于生活中的一些排队场景,比如在医院排队挂号、在银行排队办理业务等,一般都由若干窗口提供服务,这些服务窗口类似于工作者线程;队列的概念是类似的,只是在现实场景中,每个窗口经常有一个单独的队列,这种排队难以公平,随着信息化的发展,越来越多的排队场合使用虚拟的统一队列,一般都是先拿一个排队号,然后按号依次服务。

线程池的优点是显而易见的:

  • 它可以重用线程,避免线程创建的开销。
  • 任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争,确保任务有序完成。

Java并发包中线程池的实现类是ThreadPoolExecutor,它继承自AbstractExecutor-Service,实现了ExecutorService,基本用法与上节介绍的类似,我们就不赘述了。不过, ThreadPoolExecutor有一些重要的参数,理解这些参数对于合理使用线程池非常重要,接下来,我们探讨这些参数。

18.2.1 理解线程池

先来看ThreadPoolExecutor的构造方法。ThreadPoolExecutor有多个构造方法,都需要一些参数,主要构造方法有:

1
2
3
4
5
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler)

第二个构造方法多了两个参数threadFactory和handler,这两个参数一般不需要,第一个构造方法会设置默认值。参数corePoolSize、maximumPoolSize、keepAliveTime、unit用于控制线程池中线程的个数,workQueue表示任务队列,threadFactory用于对创建的线程进行一些配置,handler表示任务拒绝策略。下面我们详细探讨下这些参数。

1.线程池大小

线程池的大小主要与4个参数有关:

  • corePoolSize:核心线程个数。
  • maximumPoolSize:最大线程个数。
  • keepAliveTime和unit:空闲线程存活时间。

maximumPoolSize表示线程池中的最多线程数,线程的个数会动态变化,但这是最大值,不管有多少任务,都不会创建比这个值大的线程个数。corePoolSize表示线程池中的核心线程个数,不过,并不是一开始就创建这么多线程,刚创建一个线程池后,实际上并不会创建任何线程。

一般情况下,有新任务到来的时候,如果当前线程个数小于corePoolSiz,就会创建一个新线程来执行该任务,需要说明的是,即使其他线程现在也是空闲的,也会创建新线程。不过,如果线程个数大于等于corePoolSiz,那就不会立即创建新线程了,它会先尝试排队,需要强调的是,它是“尝试”排队,而不是“阻塞等待”入队,如果队列满了或其他原因不能立即入队,它就不会排队,而是检查线程个数是否达到了maximumPoolSize,如果没有,就会继续创建线程,直到线程数达到maximumPoolSize。

keepAliveTime的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于corePoolSize时额外空闲线程的存活时间。也就是说,一个非核心线程,在空闲等待新任务时,会有一个最长等待时间,即keepAliveTime,如果到了时间还是没有新任务,就会被终止。如果该值为0,则表示所有线程都不会超时终止。

这几个参数除了可以在构造方法中进行指定外,还可以通过getter/setter方法进行查看和修改。

除了这些静态参数,ThreadPoolExecutor还可以查看关于线程和任务数的一些动态数字:

1
2
3
4
5
6
7
8
//返回当前线程个数
public int getPoolSize()
//返回线程池曾经达到过的最大线程个数
public int getLargestPoolSize()
//返回线程池自创建以来所有已完成的任务数
public long getCompletedTaskCount()
//返回所有任务数,包括所有已完成的加上所有排队待执行的
public long getTaskCount()

2.队列

ThreadPoolExecutor要求的队列类型是阻塞队列BlockingQueue,我们在17.4节介绍过多种BlockingQueue,它们都可以用作线程池的队列,比如:

  • LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。
  • ArrayBlockingQueue:基于数组的有界阻塞队列。
  • PriorityBlockingQueue:基于堆的无界阻塞优先级队列。
  • SynchronousQueue:没有实际存储空间的同步阻塞队列。

如果用的是无界队列,需要强调的是,线程个数最多只能达到corePoolSize,到达core-PoolSize后,新的任务总会排队,参数maximumPoolSize也就没有意义了。

对于SynchronousQueue,我们知道,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,否则,总是会创建新线程,直到达到maximumPoolSize。

3.任务拒绝策略

如果队列有界,且maximumPoolSize有限,则当队列排满,线程个数也达到了maxi-mumPoolSize,这时,新任务来了,如何处理呢?此时,会触发线程池的任务拒绝策略。

默认情况下,提交任务的方法(如execute/submit/invokeAll等)会抛出异常,类型为RejectedExecutionException。

不过,拒绝策略是可以自定义的,ThreadPoolExecutor实现了4种处理方式。
1)ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常。
2)ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛出异常,也不执行。
3)ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队。
4)ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。

它们都是ThreadPoolExecutor的public静态内部类,都实现了RejectedExecutionHandler接口,这个接口的定义为:

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

当线程池不能接受任务时,调用其拒绝策略的rejectedExecution方法。

拒绝策略可以在构造方法中进行指定,也可以通过如下方法进行指定:

1
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

默认的RejectedExecutionHandler是一个AbortPolicy实例,如下所示:

1
2
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

而AbortPolicy的rejectedExecution实现就是抛出异常,如下所示:

1
2
3
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + e.toString());
}

我们需要强调下,拒绝策略只有在队列有界,且maximumPoolSize有限的情况下才会触发。如果队列无界,服务不了的任务总是会排队,但这不一定是期望的结果,因为请求处理队列可能会消耗非常大的内存,甚至引发内存不够的异常。如果队列有界但maxi-mumPoolSize无限,可能会创建过多的线程,占满CPU和内存,使得任何任务都难以完成。所以,在任务量非常大的场景中,让拒绝策略有机会执行是保证系统稳定运行很重要的方面。

4.线程工厂

线程池还可以接受一个参数:ThreadFactory。它是一个接口,定义为:

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}

这个接口根据Runnable创建一个Thread, ThreadPoolExecutor的默认实现是Executors类中的静态内部类DefaultThreadFactory,主要就是创建一个线程,给线程设置一个名称,设置daemon属性为false,设置线程优先级为标准默认优先级,线程名称的格式为:pool-<线程池编号>-thread-<线程编号>。如果需要自定义一些线程的属性,比如名称,可以实现自定义的ThreadFactory。

5.关于核心线程的特殊配置

线程个数小于等于corePoolSize时,我们称这些线程为核心线程,默认情况下。

  • 核心线程不会预先创建,只有当有任务时才会创建。
  • 核心线程不会因为空闲而被终止,keepAliveTime参数不适用于它。

不过,ThreadPoolExecutor有如下方法,可以改变这个默认行为。

1
2
3
4
5
6
//预先创建所有的核心线程
public int prestartAllCoreThreads()
//创建一个核心线程,如果所有核心线程都已创建,则返回false
public boolean prestartCoreThread()
//如果参数为true,则keepAliveTime参数也适用于核心线程
public void allowCoreThreadTimeOut(boolean value)

18.2.2 工厂类Executors

类Executors提供了一些静态工厂方法,可以方便地创建一些预配置的线程池,主要方法有:

1
2
3
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool()

newSingleThreadExecutor基本相当于调用:

1
2
3
public static ExecutorService newSingleThreadExecutor() {
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

只使用一个线程,使用无界队列LinkedBlockingQueue,线程创建后不会超时终止,该线程顺序执行所有任务。该线程池适用于需要确保所有任务被顺序执行的场合。

newFixedThreadPool的代码为:

1
2
3
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

使用固定数目的n个线程,使用无界队列LinkedBlockingQueue,线程创建后不会超时终止。和newSingleThreadExecutor一样,由于是无界队列,如果排队任务过多,可能会消耗过多的内存。

newCachedThreadPool的代码为:

1
2
3
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

它的corePoolSize为0, maximumPoolSize为Integer.MAX_VALUE,keepAliveTime是60秒,队列为SynchronousQueue。它的含义是:当新任务到来时,如果正好有空闲线程在等待任务,则其中一个空闲线程接受该任务,否则就总是创建一个新线程,创建的总线程个数不受限制,对任一空闲线程,如果60秒内没有新任务,就终止。

实际中,应该使用newFixedThreadPool还是newCachedThreadPool呢?

在系统负载很高的情况下,newFixedThreadPool可以通过队列对新任务排队,保证有足够的资源处理实际的任务,而newCachedThreadPool会为每个任务创建一个线程,导致创建过多的线程竞争CPU和内存资源,使得任何实际任务都难以完成,这时, newFixedThreadPool更为适用。

不过,如果系统负载不太高,单个任务的执行时间也比较短,newCachedThreadPool的效率可能更高,因为任务可以不经排队,直接交给某一个空闲线程。

在系统负载可能极高的情况下,两者都不是好的选择,newFixedThreadPool的问题是队列过长,而newCachedThreadPool的问题是线程过多,这时,应根据具体情况自定义ThreadPoolExecutor,传递合适的参数。

18.2.3 线程池的死锁

关于提交给线程池的任务,我们需要注意一种情况,就是任务之间有依赖,这种情况可能会出现死锁。比如任务A,在它的执行过程中,它给同样的任务执行服务提交了一个任务B,但需要等待任务B结束。

如果任务A是提交给了一个单线程线程池,一定会出现死锁,A在等待B的结果,而B在队列中等待被调度。如果是提交给了一个限定线程个数的线程池,也有可能因线程数限制出现死锁。

怎么解决这种问题呢?可以使用newCachedThreadPool创建线程池,让线程数不受限制。另一个解决方法是使用SynchronousQueue,它可以避免死锁,怎么做到的呢?对于普通队列,入队只是把任务放到了队列中,而对于SynchronousQueue来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到maximumPoolSize,如果达到了maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。

18.2.4 小结

本节介绍了线程池的基本概念,详细探讨了其主要参数的含义,理解这些参数对于合理使用线程池是非常重要的,对于相互依赖的任务,需要注意避免出现死锁。

ThreadPoolExecutor实现了生产者/消费者模式,工作者线程就是消费者,任务提交者就是生产者,线程池自己维护任务队列。当我们碰到类似生产者/消费者问题时,应该优先考虑直接使用线程池,而非“重新发明轮子”,应自己管理和维护消费者线程及任务队列。

17.4 并发队列

本节,我们介绍Java并发包中的各种队列。Java并发包提供了丰富的队列类,可以简单分为以下几种。

  • 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque。
  • 普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque。
  • 优先级阻塞队列:PriorityBlockingQueue。
  • 延时阻塞队列:DelayQueue。
  • 其他阻塞队列:SynchronousQueue和LinkedTransferQueue。

无锁非阻塞是指,这些队列不使用锁,所有操作总是可以立即执行,主要通过循环CAS实现并发安全。阻塞队列是指,这些队列使用锁和条件,很多操作都需要先获取锁或满足特定条件,获取不到锁或等待条件时,会等待(即阻塞),获取到锁或条件满足再返回。

这些队列迭代都不会抛出ConcurrentModificationException,都是弱一致的,后面就不单独强调了。下面,我们来简要介绍每类队列的用途、用法和基本实现原理。

17.4.1 无锁非阻塞并发队列

有两个无锁非阻塞队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque,它们适用于多个线程并发使用一个队列的场合,都是基于链表实现的,都没有限制大小,是无界的,与ConcurrentSkipListMap类似,它们的size方法不是一个常量运算,不过这个方法在并发应用中用处也不大。

ConcurrentLinkedQueue实现了Queue接口,表示一个先进先出的队列,从尾部入队,从头部出队,内部是一个单向链表。ConcurrentLinkedDeque实现了Deque接口,表示一个双端队列,在两端都可以入队和出队,内部是一个双向链表。它们的用法类似于Linked-List,我们就不赘述了。

这两个类最基础的原理是循环CAS, ConcurrentLinkedQueue的算法基于一篇论文《Simple, Fast, and Practical Non-Blocking and Blocking Concurrent QueueAlgorithm》(https://www.research.ibm.com/people/m/michael/podc-1996.pdf )。ConcurrentLinkedDeque扩展了Con-currentLinkedQueue的技术,但它们的具体实现都非常复杂,我们就不探讨了。

17.4.2 普通阻塞队列

除了刚介绍的两个队列,其他队列都是阻塞队列,都实现了接口BlockingQueue,在入队/出队时可能等待,主要方法有:

1
2
3
4
5
6
7
8
//入队,如果队列满,等待直到队列有空间
void put(E e) throws InterruptedException;
//出队,如果队列空,等待直到队列不为空,返回头部元素
E take() throws InterruptedException;
//入队,如果队列满,最多等待指定的时间,如果超时还是满,返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//出队,如果队列空,最多等待指定的时间,如果超时还是空,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;

普通阻塞队列是常用的队列,常用于生产者/消费者模式。

ArrayBlockingQueue和LinkedBlockingQueue都实现了Queue接口,表示先进先出的队列,尾部进,头部出,而LinkedBlockingDeque实现了Deque接口,是一个双端队列。

ArrayBlockingQueue是基于循环数组实现的,有界,创建时需要指定大小,且在运行过程中不会改变,这与我们在容器类中介绍的ArrayDeque是不同的,ArrayDeque也是基于循环数组实现的,但是是无界的,会自动扩展。

LinkedBlockingQueue是基于单向链表实现的,在创建时可以指定最大长度,也可以不指定,默认是无限的,节点都是动态创建的。LinkedBlockingDeque与LinkedBlocking-Queue一样,最大长度也是在创建时可选的,默认无限,不过,它是基于双向链表实现的。

内部,它们都是使用显式锁ReentrantLock和显式条件Condition实现的。

ArrayBlockingQueue的实现很直接,有一个数组存储元素,有两个索引表示头和尾,有一个变量表示当前元素个数,有一个锁保护所有访问,有“不满”和“不空”两个条件用于协作,实现思路与我们在15.3.3节实现的类似,就不赘述了。

与ArrayBlockingQueue类似,LinkedBlockingDeque也是使用一个锁和两个条件,使用锁保护所有操作,使用“不满”和“不空”两个条件。LinkedBlockingQueue稍微不同,因为它使用链表,且只从头部出队、从尾部入队,它做了一些优化,使用了两个锁,一个保护头部,一个保护尾部,每个锁关联一个条件。

17.4.3 优先级阻塞队列

普通阻塞队列是先进先出的,而优先级队列是按优先级出队的,优先级高的先出,我们在容器类中介绍过优先级队列PriorityQueue及其背后的数据结构堆。Priority-BlockingQueue是PriorityQueue的并发版本,与PriorityQueue一样,它没有大小限制,是无界的,内部的数组大小会动态扩展,要求元素要么实现Comparable接口,要么创建Priority-BlockingQueue时提供一个Comparator对象。

与PriorityQueue的区别是,PriorityBlockingQueue实现了BlockingQueue接口,在队列为空时,take方法会阻塞等待。另外,PriorityBlockingQueue是线程安全的,它的基本实现原理与PriorityQueue是一样的,也是基于堆,但它使用了一个锁ReentrantLock保护所有访问,使用了一个条件协调阻塞等待。

17.4.4 延时阻塞队列

延时阻塞队列DelayQueue是一种特殊的优先级队列,它是无界的。它要求每个元素都实现Delayed接口,该接口的声明为:

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

Delayed扩展了Comparable接口,也就是说,DelayQueue的每个元素都是可比较的,它有一个额外方法getDelay返回一个给定时间单位unit的整数,表示再延迟多长时间,如果小于等于0,则表示不再延迟。

DelayQueue可以用于实现定时任务,它按元素的延时时间出队。它的特殊之处在于,只有当元素的延时过期之后才能被从队列中拿走,也就是说,take方法总是返回第一个过期的元素,如果没有,则阻塞等待。

DelayQueue是基于PriorityQueue实现的,它使用一个锁ReentrantLock保护所有访问,使用一个条件available表示头部是否有元素,当头部元素的延时未到时,take操作会根据延时计算需睡眠的时间,然后睡眠,如果在此过程中有新的元素入队,且成为头部元素,则阻塞睡眠的线程会被提前唤醒然后重新检查。这是基本思路,DelayQueue的实现有一些优化,以减少不必要的唤醒,具体我们就不探讨了。

17.4.5 其他阻塞队列

Java并发包中还有两个特殊的阻塞队列:SynchronousQueue和LinkedTransferQueue。

SynchronousQueue与一般的队列不同,它不算一种真正的队列,没有存储元素的空间,连存储一个元素的空间都没有。它的入队操作要等待另一个线程的出队操作,反之亦然。如果没有其他线程在等待从队列中接收元素,put操作就会等待。take操作需要等待其他线程往队列中放元素,如果没有,也会等待。SynchronousQueue适用于两个线程之间直接传递信息、事件或任务。

LinkedTransferQueue实现了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增加了一些额外功能,生产者在往队列中放元素时,可以等待消费者接收后再返回,适用于一些消息传递类型的应用中。TransferQueue的接口定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface TransferQueue<E> extends BlockingQueue<E> {
//如果有消费者在等待(执行take或限时的poll),直接转给消费者,
//返回true,否则返回false,不入队
boolean tryTransfer(E e);
//如果有消费者在等待,直接转给消费者,否则入队,阻塞等待直到被消费者接收后再返回
void transfer(E e) throws InterruptedException;
//如果有消费者在等待,直接转给消费者,返回true
//否则入队,阻塞等待限定的时间,如果最后被消费者接收,返回true
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//是否有消费者在等待
boolean hasWaitingConsumer();
//等待的消费者个数
int getWaitingConsumerCount();
}

LinkedTransferQueue是基于链表实现的、无界的TransferQueue,具体实现比较复杂,我们就不探讨了。

关于Java并发包的各种容器,至此就介绍完了,在实际开发中,应该尽量使用这些现成的容器,而非“重新发明轮子”。

Java并发包中还提供了一种方便的任务执行服务,使用它,可以将要执行的并发任务与线程的管理相分离,大大简化并发任务和线程的管理,让我们下一章来探讨。

17.3 基于跳表的Map和Set

Java并发包中与TreeMap/TreeSet对应的并发版本是ConcurrentSkipListMap和Concurrent-SkipListSet,本节就来简要探讨这两个类,先介绍基本概念,然后介绍基本实现原理。

17.3.1 基本概念

我们知道,TreeSet是基于TreeMap实现的,与此类似,ConcurrentSkipListSet也是基于ConcurrentSkipListMap实现的,所以我们主要介绍ConcurrentSkipListMap。

ConcurrentSkipListMap是基于SkipList实现的,SkipList称为跳跃表或跳表,是一种数据结构,稍后我们会进一步介绍。并发版本为什么采用跳表而不是树呢?原因也很简单,因为跳表更易于实现高效并发算法。ConcurrentSkipListMap有如下特点。

1)没有使用锁,所有操作都是无阻塞的,所有操作都可以并行,包括写,多线程可以同时写。
2)与ConcurrentHashMap类似,迭代器不会抛出ConcurrentModificationException,是弱一致的,迭代可能反映最新修改也可能不反映,一些方法如putAll、clear不是原子的。
3)与ConcurrentHashMap类似,同样实现了ConcurrentMap接口,支持一些原子复合操作。
4)与TreeMap一样,可排序,默认按键的自然顺序,也可以传递比较器自定义排序,实现了SortedMap和NavigableMap接口。

看段简单的使用代码:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
Map<String, String> map = new ConcurrentSkipListMap<>(
Collections.reverseOrder());
map.put("a", "abstract");
map.put("c", "call");
map.put("b", "basic");
System.out.println(map.toString());
}

程序输出为:

1
{c=call, b=basic, a=abstract}

表示是有序的。

我们之前介绍过ConcurrentSkipListMap的大部分方法,有序的方法与TreeMap是类似的,原子复合操作与ConcurrentHashMap是类似的,此处不再赘述。

需要说明的是ConcurrentSkipListMa的size方法,与大多数容器实现不同,这个方法不是常量操作,它需要遍历所有元素,复杂度为O(N),而且遍历结束后,元素个数可能已经变了。一般而言,在并发应用中,这个方法用处不大。下面我们主要介绍其基本实现原理。

17.3.2 基本实现原理

我们先来介绍跳表的结构,跳表是基于链表的,在链表的基础上加了多层索引结构。我们通过一个简单的例子来说明。假定容器中包含如下元素:

1
3, 6, 7, 9, 12, 17, 19, 21, 25, 26

对Map来说,这些值可以视为键。ConcurrentSkipListMap会构造类似图17-1所示的跳表结构。

epub_923038_131

图17-1 跳表结构示例

最下面一层就是最基本的单向链表,这个链表是有序的。虽然是有序的,但我们知道,与数组不同,链表不能根据索引直接定位,不能进行二分查找。

为了快速查找,跳表有多层索引结构,这个例子中有两层,第一层有5个节点,第二层有2个节点。高层的索引节点一定同时是低层的索引节点,比如9和21。高层的索引节点少,低层的多。统计概率上,第一层索引节点是实际元素数的1/2,第二层是第一层的1/2,逐层减半,但这不是绝对的,有随机性,只是大致如此。每个索引节点有两个指针:一个向右,指向下一个同层的索引节点;另一个向下,指向下一层的索引节点或基本链表节点。

有了这个结构,就可以实现类似二分查找了。查找元素总是从最高层开始,将待查值与下一个索引节点的值进行比较,如果大于索引节点,就向右移动,继续比较,如果小于索引节点,则向下移动到下一层进行比较。图17-2所示的两条线展示了查找值19和8的过程。

epub_923038_132

图17-2 在跳表中查找的示例

对于值19,查找过程是:
1)与9相比,大于9;
2)向右与21相比,小于21;
3)向下与17相比,大于17;
4)向右与21相比,小于21;
5)向下与19相比,找到。

对于值8,查找过程是:
1)与9相比,小于9;
2)向下与6相比,大于6;
3)向右与9相比,小于9;
4)向下与7相比,大于7;
5)向右与9相比,小于9,不能再向下,没找到。

这个结构是有序的,查找的性能与二叉树类似,复杂度是O(log(N))。不过,这个结构是如何构建起来的呢?与二叉树类似,这个结构是在更新过程中进行保持的,保存元素的基本思路是:
1)先保存到基本链表,找到待插入的位置,找到位置后,插入基本链表;
2)更新索引层。

对于索引更新,随机计算一个数,表示为该元素最高建几层索引,一层的概率为1/2,二层的概率为1/4,三层的概率为1/8,以此类推。然后从最高层到最低层,在每一层,为该元素建立索引节点,建立索引节点的过程也是先查找位置,再插入。

对于删除元素,ConcurrentSkipListMap不是直接进行真正删除,而是为了避免并发冲突,有一个复杂的标记过程,在内部遍历元素的过程中进行真正删除。

以上我们只是介绍了基本思路,为了实现并发安全、高效、无锁非阻塞,Concurrent-SkipListMap的实现非常复杂,具体我们就不探讨了,感兴趣的读者可以参考其源码,其中提到了多篇学术论文,论文中描述了它参考的一些算法。对于常见的操作,如get/put/remove/containsKey, ConcurrentSkipListMap的复杂度都是O(log(N))。

上面介绍的SkipList结构是为了便于并发操作的,如果不需要并发,可以使用另一种更为高效的结构,数据和所有层的索引放到一个节点中,如图17-3所示。

epub_923038_133

图17-3 数据和索引都在一个节点中的跳表

对于一个元素,只有一个节点,只是每个节点的索引个数可能不同,在新建一个节点时,使用随机算法决定它的索引个数。平均而言,1/2的元素有两个索引,1/4的元素有三个索引,以此类推。

简单总结下,ConcurrentSkipListMap和ConcurrentSkipListSet基于跳表实现,有序,无锁非阻塞,完全并行,主要操作复杂度为O(log(N))。

17.2 ConcurrentHashMap

本节介绍一个常用的并发容器ConcurrentHashMap,它是HashMap的并发版本,与HashMap相比,它有如下特点:

  • 并发安全;
  • 直接支持一些原子复合操作;
  • 支持高并发,读操作完全并行,写操作支持一定程度的并行;
  • 与同步容器Collections.synchronizedMap相比,迭代不用加锁,不会抛出Concurre ntModificationException;
  • 弱一致性。

下面我们分别介绍。

17.2.1 并发安全

需要了解的是,HashMap不是并发安全的,在并发更新的情况下,HashMap可能出现死循环,占满CPU。我们看个例子,如代码清单17-1所示。

代码清单17-1 HashMap死循环示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void unsafeConcurrentUpdate() {
final Map<Integer, Integer> map = new HashMap<>();
for(int i = 0; i < 1000; i++) {
Thread t = new Thread() {
Random rnd = new Random();
@Override
public void run() {
for(int i = 0; i < 1000; i++) {
map.put(rnd.nextInt(), 1);
}
}
};
t.start();
}
}

运行上面的代码,在笔者的计算机中,无论是Java 7还是Java 8环境,每次都会出现死循环,占满CPU。

为什么会出现死循环呢?死循环出现在多个线程同时扩容哈希表的时候,不是同时更新一个链表的时候,那种情况可能会出现更新丢失,但不会死循环,具体过程比较复杂,我们就不解释了。关于Java 7的解释感兴趣的读者可以参考 http://coolshell.cn/articles/9606.html 中的文章。Java 8对HashMap的实现进行了大量优化,减少了死循环的可能,但在扩容的时候还是可能有死循环。

使用Collections.synchronizedMap方法可以生成一个同步容器,以避免产生死循环,替换第一行代码即可:

1
2
final Map<Integer, Integer> map = Collections.synchronizedMap(
new HashMap<Integer, Integer>());

同步容器有几个问题:

  • 每个方法都需要同步,支持的并发度比较低;
  • 对于迭代和复合操作,需要调用方加锁,使用比较麻烦,且容易忘记。

ConcurrentHashMap没有这些问题,它同样实现了Map接口,也是基于哈希表实现的,上面的代码替换第一行即可:

1
final Map<Integer, Integer> map = new ConcurrentHashMap<>();

17.2.2 原子复合操作

除了Map接口,ConcurrentHashMap还实现了一个接口ConcurrentMap,接口定义了一些条件更新操作,Java 7中的具体定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ConcurrentMap<K, V> extends Map<K, V> {
//条件更新,如果Map中没有key,设置key为value,返回原来key对应的值,
//如果没有,返回null
V putIfAbsent(K key, V value);
//条件删除,如果Map中有key,且对应的值为value,则删除,如果删除了,返回true,
//否则返回false
boolean remove(Object key, Object value);
//条件替换,如果Map中有key,且对应的值为oldValue,则替换为newValue,
//如果替换了,返回ture,否则false
boolean replace(K key, V oldValue, V newValue);
//条件替换,如果Map中有key,则替换值为value,返回原来key对应的值,
//如果原来没有,返回null
V replace(K key, V value);
}

Java 8增加了几个默认方法,包括getOrDefault、forEach、computeIfAbsent、merge等,具体可参见API文档,我们就不介绍了。如果使用同步容器,调用方必须加锁,而Concurrent-HashMap将它们实现为了原子操作。实际上,使用ConcurrentHashMap,调用方也没有办法进行加锁,它没有暴露锁接口,也不使用synchronized。

17.2.3 高并发的基本机制

ConcurrentHashMap是为高并发设计的,它是怎么做的呢?具体实现比较复杂,我们简要介绍其思路,在Java 7中,主要有两点:

  • 分段锁
  • 读不需要锁

同步容器使用synchronized,所有方法竞争同一个锁;而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立的哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。

采用分段锁,可以大大提高并发度,多个段之间可以并行读写。默认情况下,段是16个,不过,这个数字可以通过构造方法进行设置,如下所示:

1
2
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)

concurrencyLevel表示估计的并行更新的线程个数,ConcurrentHashMap会将该数转换为2的整数次幂,比如14转换为16,25转换为32。

在对每个段的数据进行读写时,ConcurrentHashMap也不是简单地使用锁进行同步,内部使用了CAS。对一些写采用原子方式的方法,实现比较复杂,我们就不介绍了。实现的效果是,对于写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写的同时也可以读,这使得ConcurrentHashMap的并行度远高于同步容器。

Java 8对ConcurrentHashMap的实现进一步做了优化。首先,与HashMap的改进类似,在哈希冲突比较严重的时候,会将单向链表转化为平衡的排序二叉树,提高查找的效率;其次,锁的粒度进一步细化了,以提高并行性,哈希表数组中的每个位置(指向一个单链表或树)都有一个单独的锁,具体比较复杂,我们就不介绍了。

17.2.4 迭代安全

我们在15.2.3节介绍过,使用同步容器,在迭代中需要加锁,否则可能会抛出Concurrent-ModificationException。ConcurrentHashMap没有这个问题,在迭代器创建后,在迭代过程中,如果另一个线程对容器进行了修改,迭代会继续,不会抛出异常。

问题是,迭代会反映其他线程的修改吗?还是像CopyOnWriteArrayList一样,反映的是创建时的副本?答案是,都不是!我们看个例子,如代码清单17-2所示。

代码清单17-2 ConcurrentHashMap的迭代示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public static void unsafeConcurrentUpdate() {
final Map<Integer, Integer> map = new HashMap<>();
for(int i = 0; i < 1000; i++) {
Thread t = new Thread() {
Random rnd = new Random();
@Override
public void run() {
for(int i = 0; i < 1000; i++) {
map.put(rnd.nextInt(), 1);
}
}
};
t.start();
}
}
final Map<Integer, Integer> map = Collections.synchronizedMap(
new HashMap<Integer, Integer>());
public interface ConcurrentMap<K, V> extends Map<K, V> {
//条件更新,如果Map中没有key,设置key为value,返回原来key对应的值,
//如果没有,返回null
V putIfAbsent(K key, V value);
//条件删除,如果Map中有key,且对应的值为value,则删除,如果删除了,返回true,
//否则返回false
boolean remove(Object key, Object value);
//条件替换,如果Map中有key,且对应的值为oldValue,则替换为newValue,
//如果替换了,返回ture,否则false
boolean replace(K key, V oldValue, V newValue);
//条件替换,如果Map中有key,则替换值为value,返回原来key对应的值,
//如果原来没有,返回null
V replace(K key, V value);
}
final Map<Integer, Integer> map = new ConcurrentHashMap<>();
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)
public class ConcurrentHashMapIteratorDemo {
public static void test() {
final ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("a", "abstract");
map.put("b", "basic");
Thread t1 = new Thread() {
@Override
public void run() {
for(Entry<String, String> entry : map.entrySet()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(entry.getKey() + ", " + entry.getValue());
}
}
};
t1.start();
// 确保线程t1启动
try {
Thread.sleep(100);
} catch(InterruptedException e) {
}
map.put("c", "call");
}
public static void main(String[] args) {
test();
}
}

t1启动后,创建迭代器,但在迭代输出每个元素前,先睡眠1秒,主线程启动t1后,先睡眠一下,确保t1先运行,然后给map增加了一个元素,程序输出为:

1
2
3
a, abstract
b, basic
c, call

上述代码说明迭代器反映了最新的更新。将添加语句更改为:

1
map.put("g", "call");

会发现程序输出为:

1
2
a, abstract
b, basic

这说明迭代器没有反映最新的更新。需要说明的是,这是Java 7的输出,Java 8和Java 9的实现不太一样,输出也不太一样,但也有相同的问题。到底是怎么回事呢?这需要我们理解ConcurrentHashMap的弱一致性

17.2.5 弱一致性

ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性。

类似的情况还会出现在ConcurrentHashMap的另一个方法:

1
2
//批量添加m中的键值对到当前Map
public void putAll(Map<? extends K, ? extends V> m)

该方法并非原子操作,而是调用put方法逐个元素进行添加的,在该方法没有结束的时候,部分修改效果就会体现出来。

17.2.6 小结

本节介绍了ConcurrentHashMap,它是并发版的HashMap,通过降低锁的粒度和CAS等实现了高并发,支持原子条件更新操作,不会抛出ConcurrentModificationException,实现了弱一致性。

Java中没有并发版的HashSet,但可以通过Collections.newSetFromMap方法基于Con-currentHashMap构建一个。

我们知道HashMap/HashSet基于哈希,不能对元素排序,对应的可排序的容器类是TreeMap/TreeSet,并发包中可排序的对应版本不是基于树,而是基于Skip List(跳跃表),类分别是ConcurrentSkipListMap和ConcurrentSkipListSet,它们到底是什么呢?让我们下节讨论。

16.3 显式条件

16.2节我们介绍了显式锁,本节介绍关联的显式条件,介绍其用法和原理。显式条件在不同上下文中也可以被称为条件变量、条件队列、或条件,后文我们可能会交替使用。

16.3.1 用法

锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronized相对应,而显式条件与wait/notify相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。条件与锁相关联,创建条件变量需要通过显式锁,Lock接口定义了创建方法:

1
Condition newCondition();

Condition表示条件变量,是一个接口,它的定义为:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

await对应于Object的wait, signal对应于notify, signalAll对应于notifyAll,语义也是一样的。

与Object的wait方法类似,await也有几个限定等待时间的方法,但功能更多一些:

1
2
3
4
5
6
//等待时间是相对时间,如果由于等待超时返回,返回值为false,否则为true
boolean await(long time, TimeUnit unit) throws InterruptedException;
//等待时间也是相对时间,但参数单位是纳秒,返回值是nanosTimeout减去实际等待的时间
long awaitNanos(long nanosTimeout) throws InterruptedException;
//等待时间是绝对时间,如果由于等待超时返回,返回值为false,否则为true
boolean awaitUntil(Date deadline) throws InterruptedException;

这些await方法都是响应中断的,如果发生了中断,会抛出InterruptedException,但中断标志位会被清空。Condition还定义了一个不响应中断的等待方法:

1
void awaitUninterruptibly();

该方法不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置。

一般而言,与Object的wait方法一样,调用await方法前需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。

await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从await方法中退出

另外,与Object的wait方法一样,await返回后,不代表其等待的条件就一定满足了,通常要将await的调用放到一个循环内,只有条件满足后才退出。

一般而言,signal/signalAll与notify/notifyAll一样,调用它们需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。signal与notify一样,挑选一个线程进行唤醒,signalAll与notifyAll一样,唤醒所有等待的线程,但这些线程被唤醒后都需要重新竞争锁,获取锁后才会从await调用中返回。

ReentrantLock实现了newCondition方法,通过它,我们来看下条件的基本用法。我们实现与15.3节类似的例子WaitThread,一个线程启动后,在执行一项操作前,等待主线程给它指令,收到指令后才执行,示例代码如代码清单16-7所示。

代码清单16-7 使用显式条件进行协作的示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class WaitThread extends Thread {
private volatile boolean fire = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
@Override
public void run() {
try {
lock.lock();
try {
while (! fire) {
condition.await();
}
} finally {
lock.unlock();
}
System.out.println("fired");
} catch (InterruptedException e) {
Thread.interrupted();
}
}
public void fire() {
lock.lock();
try {
this.fire = true;
condition.signal();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}

需要特别注意的是,不要将signal/signalAll与notify/notifyAll混淆,notify/notifyAll是Object中定义的方法,Condition对象也有,稍不注意就会误用。比如,对上面例子中的fire方法,可能会写为:

1
2
3
4
5
6
7
8
9
public void fire() {
lock.lock();
try {
this.fire = true;
condition.notify();
} finally {
lock.unlock();
}
}

写成这样,编译器不会报错,但运行时会抛出IllegalMonitorStateException,因为notify的调用不在synchronized语句内。同样,避免将锁与synchronized混用,那样非常令人混淆,比如:

1
2
3
4
5
6
public void fire() {
synchronized(lock){
this.fire = true;
condition.signal();
}
}

记住,显式条件与显式锁配合,wait/notify与synchronized配合

16.3.2 生产者/消费者模式

在15.3节,我们用wait/notify实现了生产者/消费者模式,我们提到了wait/notify的一个局限,它只能有一个条件等待队列,分析等待条件也很复杂。在生产者/消费者模式中,其实有两个条件,一个与队列满有关,一个与队列空有关。使用显式锁,可以创建多个条件等待队列。下面,我们用显式锁/条件重新实现下其中的阻塞队列,如代码清单16-8所示。

代码清单16-8 使用显式锁/条件实现的阻塞队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<>(limit);
}
public void put(E e) throws InterruptedException {
lock.lockInterruptibly();
try{
while (queue.size() == limit) {
notFull.await();
}
queue.add(e);
notEmpty.signal();
}finally{
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try{
while(queue.isEmpty()) {
notEmpty.await();
}
E e = queue.poll();
notFull.signal();
return e;
}finally{
lock.unlock();
}
}
}

上述代码定义了两个等待条件:不满(notFull)、不空(notEmpty)。在put方法中,如果队列满,则在notFull上等待;在take方法中,如果队列空,则在notEmpty上等待。put操作后通知notEmpty, take操作后通知notFull。这样,代码更为清晰易读,同时避免了不必要的唤醒和检查,提高了效率。Java并发包中的类ArrayBlockingQueue就采用了类似的方式实现。

16.3.3 实现原理

理解了显式条件的概念和用法,我们来看下ReentrantLock是如何实现它的,其new-Condition()的代码为:

1
2
3
public Condition newCondition() {
return sync.newCondition();
}

sync是ReentrantLock的内部类对象,其newCondition()代码为:

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

ConditionObject是AQS中定义的一个内部类,它的实现也比较复杂,我们通过一些主要代码来简要探讨其实现原理。ConditionObject内部也有一个队列,表示条件等待队列,其成员声明为:

1
2
3
4
//条件队列的头节点
private transient Node firstWaiter;
//条件队列的尾节点
private transient Node lastWaiter;

ConditionObject是AQS的成员内部类,它可以直接访问AQS中的数据,比如AQS中定义的锁等待队列。我们看下主要方法的实现。先看await方法,如代码清单16-9所示。我们通过添加注释解释其基本思路。

代码清单16-9 await的实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void await() throws InterruptedException {
//如果等待前中断标志位已被设置,直接抛出异常
if(Thread.interrupted())
throw new InterruptedException();
//1.为当前线程创建节点,加入条件等待队列
Node node = addConditionWaiter();
//2.释放持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//3.放弃CPU,进行等待,直到被中断或isOnSyncQueue变为true
//isOnSyncQueue为true,表示节点被其他线程从条件等待队列
//移到了外部的锁等待队列,等待的条件已满足
while (! isOnSyncQueue(node)) {
LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)
break;
}
//4.重新获取锁
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE)
interruptMode = REINTERRUPT;
if(node.nextWaiter ! = null) // clean up if cancelled
unlinkCancelledWaiters();
//5.处理中断,抛出异常或设置中断标志位
if(interruptMode ! = 0)
reportInterruptAfterWait(interruptMode);
}

awaitNanos与await的实现是基本类似的,区别主要是会限定等待的时间,具体就不列举了。

signal方法代码为:

1
2
3
4
5
6
7
8
9
public final void signal() {
//验证当前线程持有锁
if(! isHeldExclusively())
throw new IllegalMonitorStateException();
//调用doSignal唤醒等待队列中第一个线程
Node first = firstWaiter;
if(first ! = null)
doSignal(first);
}

doSignal的代码就不列举了,其基本逻辑是:
1)将节点从条件等待队列移到锁等待队列;
2)调用LockSupport.unpark将线程唤醒。

16.3.4 小结

本节介绍了显式条件的用法和实现原理。它与显式锁配合使用,与wait/notify相比,可以支持多个条件队列,代码更为易读,效率更高,使用时注意不要将signal/signalAll误写为notify/notifyAll。

至此,关于并发包的基础:原子变量和CAS、显式锁和条件,就介绍完了,基于这些, Java并发包还提供了很多更为易用的高层数据结构、工具和服务,下一章,我们介绍一些并发容器。

16.2 显式锁

15.2节介绍了利用synchronized实现锁,我们提到了synchronized的一些局限性,本节探讨Java并发包中的显式锁,它可以解决synchronized的限制。

Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类有:

  • 锁接口Lock,主要实现类是ReentrantLock;
  • 读写锁接口ReadWriteLock,主要实现类是ReentrantReadWriteLock。

本节主要介绍接口Lock和实现类ReentrantLock,关于读写锁,我们后续章节介绍。

16.2.1 接口Lock

显式锁接口Lock的定义为:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

下面解释一下。

1)lock()/unlock():就是普通的获取锁和释放锁方法,lock()会阻塞直到成功。
2)lockInterruptibly():与lock()的不同是,它可以响应中断,如果被其他线程中断了,则抛出InterruptedException。
3)tryLock():只是尝试获取锁,立即返回,不阻塞,如果获取成功,返回true,否则返回false。
4)tryLock(long time, TimeUnit unit):先尝试获取锁,如果能成功则立即返回true,否则阻塞等待,但等待的最长时间由指定的参数设置,在等待的同时响应中断,如果发生了中断,抛出InterruptedException,如果在等待的时间内获得了锁,返回true,否则返回false。
5)newCondition:新建一个条件,一个Lock可以关联多个条件,关于条件,我们留待16.3节介绍。

可以看出,相比synchronized,显式锁支持以非阻塞方式获取锁、可以响应中断、可以限时,这使得它灵活得多。

16.2.2 可重入锁ReentrantLock

下面,先介绍ReentrantLock的基本用法,然后重点介绍如何使用tryLock避免死锁。

1.基本用法

Lock接口的主要实现类是ReentrantLock,它的基本用法lock/unlock实现了与syn-chronized一样的语义,包括:

  • 可重入,一个线程在持有一个锁的前提下,可以继续获得该锁;
  • 可以解决竞态条件问题;
  • 可以保证内存可见性。

ReentrantLock有两个构造方法:

1
2
public ReentrantLock()
public ReentrantLock(boolean fair)

参数fair表示是否保证公平,不指定的情况下,默认为false,表示不保证公平。所谓公平是指,等待时间最长的线程优先获得锁。保证公平会影响性能,一般也不需要,所以默认不保证,synchronized锁也是不保证公平的,16.2.3节还会再分析实现细节。

使用显式锁,一定要记得调用unlock。一般而言,应该将lock之后的代码包装到try语句内,在finally语句内释放锁。比如,使用ReentrantLock实现Counter,代码可以为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Counter {
private final Lock lock = new ReentrantLock();
private volatile int count;
public void incr() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
return count;
}
}

2.使用tryLock避免死锁

使用tryLock(),可以避免死锁。在持有一个锁获取另一个锁而获取不到的时候,可以释放已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。

我们来看个例子,银行账户之间转账,用类Account表示账户,如代码清单16-3所示。

代码清单16-3 表示账户的类Account
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Account {
private Lock lock = new ReentrantLock();
private volatile double money;
public Account(double initialMoney) {
this.money = initialMoney;
}
public void add(double money) {
lock.lock();
try {
this.money += money;
} finally {
lock.unlock();
}
}
public void reduce(double money) {
lock.lock();
try {
this.money -= money;
} finally {
lock.unlock();
}
}
public double getMoney() {
return money;
}
void lock() {
lock.lock();
}
void unlock() {
lock.unlock();
}
boolean tryLock() {
return lock.tryLock();
}
}

Account里的money表示当前余额,add/reduce用于修改余额。在账户之间转账,需要两个账户都锁定,如果不使用tryLock,而直接使用lock,则代码如代码清单27-6所示。

代码清单16-4 转账的错误写法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AccountMgr {
public static class NoEnoughMoneyException extends Exception {}
public static void transfer(Account from, Account to, double money)
throws NoEnoughMoneyException {
from.lock();
try {
to.lock();
try {
if(from.getMoney() >= money) {
from.reduce(money);
to.add(money);
} else {
throw new NoEnoughMoneyException();
}
} finally {
to.unlock();
}
} finally {
from.unlock();
}
}
}

但这么写是有问题的,如果两个账户都同时给对方转账,都先获取了第一个锁,则会发生死锁。我们写段代码来模拟这个过程,如代码清单16-5所示。

代码清单16-5 模拟账户转账的死锁过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void simulateDeadLock() {
final int accountNum = 10;
final Account[] accounts = new Account[accountNum];
final Random rnd = new Random();
for(int i = 0; i < accountNum; i++) {
accounts[i] = new Account(rnd.nextInt(10000));
}
int threadNum = 100;
Thread[] threads = new Thread[threadNum];
for(int i = 0; i < threadNum; i++) {
threads[i] = new Thread() {
public void run() {
int loopNum = 100;
for(int k = 0; k < loopNum; k++) {
int i = rnd.nextInt(accountNum);
int j = rnd.nextInt(accountNum);
int money = rnd.nextInt(10);
if(i ! = j) {
try {
transfer(accounts[i], accounts[j], money);
} catch (NoEnoughMoneyException e) {
}
}
}
}
};
threads[i].start();
}
}

以上代码创建了10个账户,100个线程,每个线程执行100次循环,在每次循环中,随机挑选两个账户进行转账。在笔者的计算机中,每次执行该段代码都会发生死锁。读者可以更改这些数值进行试验。

我们使用tryLock来进行修改,先定义一个tryTransfer方法,如代码清单16-6所示。

代码清单16-6 使用tryLock尝试转账
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static boolean tryTransfer(Account from, Account to, double money)
throws NoEnoughMoneyException {
if(from.tryLock()) {
try {
if(to.tryLock()) {
try {
if(from.getMoney() >= money) {
from.reduce(money);
to.add(money);
} else {
throw new NoEnoughMoneyException();
}
return true;
} finally {
to.unlock();
}
}
} finally {
from.unlock();
}
}
return false;
}

如果两个锁都能够获得,且转账成功,则返回true,否则返回false。不管怎样,结束都会释放所有锁。transfer方法可以循环调用该方法以避免死锁,代码可以为:

1
2
3
4
5
6
7
8
9
10
public static void transfer(Account from, Account to, double money)
throws NoEnoughMoneyException {
boolean success = false;
do {
success = tryTransfer(from, to, money);
if(! success) {
Thread.yield();
}
} while (! success);
}

除了实现Lock接口中的方法,ReentrantLock还有一些其他方法,通过它们,可以获取关于锁的一些信息,这些信息可以用于监控和调试目的,具体可参看API文档,就不介绍了。

16.2.3 ReentrantLock的实现原理

ReentrantLock的用法是比较简单的,它是怎么实现的呢?在最底层,它依赖于16.1节介绍的CAS方法,另外,它依赖于类LockSupport中的一些方法。我们先介绍Lock-Support。

1. LockSupport

类LockSupport也位于包java.util.concurrent.locks下,它的基本方法有:

1
2
3
4
public static void park()
public static void parkNanos(long nanos)
public static void parkUntil(long deadline)
public static void unpark(Thread thread)

park使得当前线程放弃CPU,进入等待状态(WAITING),操作系统不再对它进行调度,什么时候再调度呢?有其他线程对它调用了unpark, unpark使参数指定的线程恢复可运行状态。我们看个例子:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread (){
public void run(){
LockSupport.park(); //放弃CPU
System.out.println("exit");
}
};
t.start(); //启动子线程
Thread.sleep(1000); //睡眠1秒确保子线程先运行
LockSupport.unpark(t);
}

上述例子中,主线程启动子线程t,线程t启动后调用park,放弃CPU,主线程睡眠1秒以确保子线程已执行LockSupport.park(),调用unpark,线程t恢复运行,输出exit。

park不同于Thread.yield(), yield只是告诉操作系统可以先让其他线程运行,但自己依然是可运行状态,而park会放弃调度资格,使线程进入WAITING状态。

需要说明的是,park是响应中断的,当有中断发生时,park会返回,线程的中断状态会被设置。另外还需要说明,park可能会无缘无故地返回,程序应该重新检查park等待的条件是否满足。

park有两个变体:

  • parkNanos:可以指定等待的最长时间,参数是相对于当前时间的纳秒数;
  • parkUntil:可以指定最长等到什么时候,参数是绝对时间,是相对于纪元时的毫秒数。

当等待超时的时候,它们也会返回。

这些park方法还有一些变体,可以指定一个对象,表示是由于该对象而进行等待的,以便于调试,通常传递的值是this,比如:

1
public static void park(Object blocker)

LockSupport有一个方法,可以返回一个线程的blocker对象:

1
public static Object getBlocker(Thread t)

这些park/unpark方法是怎么实现的呢?与CAS方法一样,它们也调用了Unsafe类中的对应方法。Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为Lock-Support中的这些方法就是基本操作

2. AQS

利用CAS和LockSupport提供的基本方法,就可以用来实现ReentrantLock了。但Java中还有很多其他并发工具,如ReentrantReadWriteLock、Semaphore、CountDownLatch,它们的实现有很多类似的地方,为了复用代码,Java提供了一个抽象类AbstractQueued-Synchronizer,简称AQS,它简化了并发工具的实现。AQS的整体实现比较复杂,我们主要以ReentrantLock的使用为例进行简要介绍。

AQS封装了一个状态,给子类提供了查询和设置状态的方法:

1
2
3
4
private volatile int state;
protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)

用于实现锁时,AQS可以保存锁的当前持有线程,提供了方法进行查询和设置:

1
2
3
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t)
protected final Thread getExclusiveOwnerThread()

AQS内部维护了一个等待队列,借助CAS方法实现了无阻塞算法进行更新。

下面,我们以ReentrantLock的使用为例简要介绍AQS的原理。

3. ReentrantLock

ReentrantLock内部使用AQS,有三个内部类:

1
2
3
abstract static class Sync extends AbstractQueuedSynchronizer
static final class NonfairSync extends Sync
static final class FairSync extends Sync

Sync是抽象类,NonfairSync是fair为false时使用的类,FairSync是fire为true时使用的类。ReentrantLock内部有一个Sync成员:

1
private final Sync sync;

在构造方法中sync被赋值,比如:

1
2
3
public ReentrantLock() {
sync = new NonfairSync();
}

我们来看ReentrantLock中的基本方法lock/unlock的实现。先看lock方法,代码为:

1
2
3
public void lock() {
sync.lock();
}

sync默认类型是NonfairSync, NonfairSync的lock代码为:

1
2
3
4
5
6
final void lock() {
if(compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

ReentrantLock使用state表示是否被锁和持有数量,如果当前未被锁定,则立即获得锁,否则调用acquire(1)获得锁。acquire是AQS中的方法,代码为:

1
2
3
4
5
public final void acquire(int arg) {
if(! tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

它调用tryAcquire获取锁,tryAcquire必须被子类重写。NonfairSync的实现为:

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

nonfairTryAcquire是sync中实现的,代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0) {
if(compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if(current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if(nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这段代码容易理解,如果未被锁定,则使用CAS进行锁定;如果已被当前线程锁定,则增加锁定次数。如果tryAcquire返回false,则AQS会调用:

1
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

其中,addWaiter会新建一个节点Node,代表当前线程,然后加入内部的等待队列中,限于篇幅,具体代码就不列出来了。放入等待队列后,调用acquireQueued尝试获得锁,代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for(; ; ) {
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if(failed)
cancelAcquire(node);
}
}

主体是一个死循环,在每次循环中,首先检查当前节点是不是第一个等待的节点,如果是且能获得到锁,则将当前节点从等待队列中移除并返回,否则最终调用LockSupport. park放弃CPU,进入等待,被唤醒后,检查是否发生了中断,记录中断标志,在最终方法返回时返回中断标志。如果发生过中断,acquire方法最终会调用selfInterrupt方法设置中断标志位,其代码为:

1
2
3
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}

以上就是lock方法的基本过程,能获得锁就立即获得,否则加入等待队列,被唤醒后检查自己是否是第一个等待的线程,如果是且能获得锁,则返回,否则继续等待。这个过程中如果发生了中断,lock会记录中断标志位,但不会提前返回或抛出异常。

ReentrantLock的unlock方法的代码为:

1
2
3
public void unlock() {
sync.release(1);
}

release是AQS中定义的方法,代码为:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if(tryRelease(arg)) {
Node h = head;
if(h ! = null && h.waitStatus ! = 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease方法会修改状态释放锁,unparkSuccessor会调用LockSupport.unpark将第一个等待的线程唤醒,具体代码就不列举了。

FairSync和NonfairSync的主要区别是:在获取锁时,即在tryAcquire方法中,如果当前未被锁定,即c==0, FairSync多了一个检查,如下:

1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0) {
if(! hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
..

这个检查是指,只有不存在其他等待时间更长的线程,它才会尝试获取锁。

这样保证公平不是很好吗?为什么默认不保证公平呢?保证公平整体性能比较低,低的原因不是这个检查慢,而是会让活跃线程得不到锁,进入等待状态,引起频繁上下文切换,降低了整体的效率,通常情况下,谁先运行关系不大,而且长时间运行,从统计角度而言,虽然不保证公平,也基本是公平的。需要说明是,即使fair参数为true, ReentrantLock中不带参数的tryLock方法也是不保证公平的,它不会检查是否有其他等待时间更长的线程。

16.2.4 对比ReentrantLock和synchronized

相比synchronized, ReentrantLock可以实现与synchronized相同的语义,而且支持以非阻塞方式获取锁,可以响应中断,可以限时,更为灵活。不过,synchronized的使用更为简单,写的代码更少,也更不容易出错。

synchronized代表一种声明式编程思维,程序员更多的是表达一种同步声明,由Java系统负责具体实现,程序员不知道其实现细节;显式锁代表一种命令式编程思维,程序员实现所有细节。

声明式编程的好处除了简单,还在于性能,在较新版本的JVM上,ReentrantLock和synchronized的性能是接近的,但Java编译器和虚拟机可以不断优化synchronized的实现,比如自动分析synchronized的使用,对于没有锁竞争的场景,自动省略对锁获取/释放的调用。

简单总结下,能用synchronized就用synchronized,不满足要求时再考虑Reentrant-Lock

15.4 线程的中断

本节主要讨论一个问题,如何在Java中取消或关闭一个线程?我们先介绍都有哪些场景需要取消/关闭线程,再介绍取消/关闭的机制,以及线程对中断的反应,最后讨论如何正确地取消/关闭线程。

15.4.1 取消/关闭的场景

我们知道,通过线程的start方法启动一个线程后,线程开始执行run方法,run方法运行结束后线程退出,那为什么还需要结束一个线程呢?有多种情况,比如:
1)很多线程的运行模式是死循环,比如在生产者/消费者模式中,消费者主体就是一个死循环,它不停地从队列中接受任务,执行任务,在停止程序时,我们需要一种“优雅”的方法以关闭该线程。
2)在一些图形用户界面程序中,线程是用户启动的,完成一些任务,比如从远程服务器上下载一个文件,在下载过程中,用户可能会希望取消该任务。
3)在一些场景中,比如从第三方服务器查询一个结果,我们希望在限定的时间内得到结果,如果得不到,我们会希望取消该任务。
4)有时,我们会启动多个线程做同一件事,比如类似抢火车票,我们可能会让多个好友帮忙从多个渠道买火车票,只要有一个渠道买到了,我们会通知取消其他渠道。

15.4.2 取消/关闭的机制

Java的Thread类定义了如下方法:

1
public final void stop()

这个方法看上去就可以停止线程,但这个方法被标记为了过时,简单地说,我们不应该使用它,可以忽略它。

在Java中,停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出。本节我们主要就是来理解Java的中断机制。

Thread类定义了如下关于中断的方法:

1
2
3
public boolean isInterrupted()
public void interrupt()
public static boolean interrupted()

这三个方法名字类似,比较容易混淆,我们解释一下。isInterrupted()和interrupt()是实例方法,调用它们需要通过线程对象;interrupted()是静态方法,实际会调用Thread. currentThread()操作当前线程。

每个线程都有一个标志位,表示该线程是否被中断了。
1)isInterrupted:返回对应线程的中断标志位是否为true。
2)interrupted:返回当前线程的中断标志位是否为true,但它还有一个重要的副作用,就是清空中断标志位,也就是说,连续两次调用interrupted(),第一次返回的结果为true,第二次一般就是false(除非同时又发生了一次中断)。
3)interrupt:表示中断对应的线程。中断具体意味着什么呢?下面我们进一步来说明。

15.4.3 线程对中断的反应

interrupt()对线程的影响与线程的状态和在进行的IO操作有关。我们主要考虑线程的状态,IO操作的影响和具体IO以及操作系统有关,我们就不讨论了。线程状态有:

  • RUNNABLE:线程在运行或具备运行条件只是在等待操作系统调度。
  • WAITING/TIMED_WAITING:线程在等待某个条件或超时。
  • BLOCKED:线程在等待锁,试图进入同步块。
  • NEW/TERMINATED:线程还未启动或已结束。

1. RUNNABLE

如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标志位,没有任何其他作用。线程应该在运行过程中合适的位置检查中断标志位,比如,如果主体代码是一个循环,可以在循环开始处进行检查,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final void stop()
public boolean isInterrupted()
public void interrupt()
public static boolean interrupted()
public class InterruptRunnableDemo extends Thread {
@Override
public void run() {
while(! Thread.currentThread().isInterrupted()) {
//…单次循环代码
}
System.out.println("done ");
}
//其他代码
}

2. WAITING/TIMED_WAITING

线程调用join/wait/sleep方法会进入WAITING或TIMED_WAITING状态,在这些状态时,对线程对象调用interrupt()会使得该线程抛出InterruptedException。需要注意的是,抛出异常后,中断标志位会被清空,而不是被设置。比如,执行如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread t = new Thread (){
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(isInterrupted());
}
}
};
t.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
t.interrupt();

程序的输出为false。

InterruptedException是一个受检异常,线程必须进行处理。我们在异常处理中介绍过,处理异常的基本思路是:如果知道怎么处理,就进行处理,如果不知道,就应该向上传递,通常情况下不应该捕获异常然后忽略。

捕获到InterruptedException,通常表示希望结束该线程,线程大致有两种处理方式:
1)向上传递该异常,这使得该方法也变成了一个可中断的方法,需要调用者进行处理;
2)有些情况,不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能抛出任何受检异常,这时,应该捕获异常,进行合适的清理操作,清理后,一般应该调用Thread的interrupt方法设置中断标志位,使得其他代码有办法知道它发生了中断。

第一种方式的示例代码如下:

1
2
3
4
public void interruptibleMethod() throws InterruptedException{
//…包含wait, join 或 sleep 方法
Thread.sleep(1000);
}

第二种方式的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class InterruptWaitingDemo extends Thread {
@Override
public void run() {
while(! Thread.currentThread().isInterrupted()) {
try {
//模拟任务代码
Thread.sleep(2000);
} catch(InterruptedException e) {
//…清理操作
//重设中断标志位
Thread.currentThread().interrupt();
}
}
System.out.println(isInterrupted());
}
//其他代码
}

3. BLOCKED

如果线程在等待锁,对线程对象调用interrupt()只是会设置线程的中断标志位,线程依然会处于BLOCKED状态,也就是说,interrupt()并不能使一个在等待锁的线程真正“中断”。我们看段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class InterruptSynchronizedDemo {
private static Object lock = new Object();
private static class A extends Thread {
@Override
public void run() {
synchronized (lock) {
while (! Thread.currentThread().isInterrupted()) {
}
}
System.out.println("exit");
}
}
public static void test() throws InterruptedException {
synchronized (lock) {
A a = new A();
a.start();
Thread.sleep(1000);
a.interrupt();
a.join();
}
}
public static void main(String[] args) throws InterruptedException {
test();
}
}

test方法在持有锁lock的情况下启动线程a,而线程a也去尝试获得锁lock,所以会进入锁等待队列,随后test调用线程a的interrupt方法并调用join等待线程线程a结束,线程a会结束吗?不会,interrupt方法只会设置线程的中断标志,而并不会使它从锁等待队列中出来。

在使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。如果这对程序是一个问题,应该使用显式锁。第16章会介绍显式锁Lock接口,它支持以响应中断的方式获取锁。

4. NEW/TERMINATE

如果线程尚未启动(NEW),或者已经结束(TERMINATED),则调用interrupt()对它没有任何效果,中断标志位也不会被设置。

15.4.4 如何正确地取消/关闭线程

interrupt方法不一定会真正“中断”线程,它只是一种协作机制,如果不明白线程在做什么,不应该贸然地调用线程的interrupt方法,以为这样就能取消线程。

对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供单独的取消/关闭方法给调用者,外部调用者应该调用这些方法而不是直接调用interrupt。Java并发库的一些代码就提供了单独的取消/关闭方法,比如,Future接口提供了如下方法以取消任务:

1
boolean cancel(boolean mayInterruptIfRunning);

再如,ExecutorService提供了如下两个关闭方法:

1
2
void shutdown();
List<Runnable> shutdownNow();

Future和ExecutorService的API文档对这些方法都进行了详细说明,这是我们应该学习的方式。关于这两个接口,我们后续章节介绍。

15.4.5 小结

本节主要介绍了在Java中如何取消/关闭线程,主要依赖的技术是中断,但它是一种协作机制,不会强迫终止线程,我们介绍了线程在不同状态下对中断的反应。作为线程的实现者,应该提供明确的取消/关闭方法,并用文档描述清楚其行为;作为线程的调用者,应该使用其取消/关闭方法,而不是贸然调用interrupt。

至此,关于线程的基础内容就介绍完了。在Java中还有一套并发工具包,位于包java.util.concurrent下,里面包括很多易用且高性能的并发开发工具,从下一章开始,我们就来讨论它,先从最基本的原子变量和CAS(Compare And Set)操作开始。

15.3 线程的基本协作机制

多线程之间除了竞争访问同一个资源外,也经常需要相互协作,怎么协作呢?本节就来介绍Java中多线程协作的基本机制wait/notify。

都有哪些场景需要协作?wait/notify是什么?如何使用?实现原理是什么?协作的核心是什么?如何实现各种典型的协作场景?本节进行详细讨论,我们先来看看都有哪些协作的场景。

15.3.1 协作的场景

多线程之间需要协作的场景有很多,比如:
1)生产者/消费者协作模式:这是一种常见的协作模式,生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务,如果队列长度有限,在队列满的时候,生产者需要等待,而在队列为空的时候,消费者需要等待。
2)同时开始:类似运动员比赛,在听到比赛开始枪响后同时开始,在一些程序,尤其是模拟仿真程序中,要求多个线程能同时开始。
3)等待结束:主从协作模式也是一种常见的协作模式,主线程将任务分解为若干子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。
4)异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦,一种常见的模式是将子线程的管理封装为异步调用,异步调用马上返回,但返回的不是最终的结果,而是一个一般称为Future的对象,通过它可以在随后获得最终的结果。
5)集合点:类似于学校或公司组团旅游,在旅游过程中有若干集合点,比如出发集合点,每个人从不同地方来到集合点,所有人到齐后进行下一项活动,在一些程序,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

我们会探讨如何实现这些协作场景,在此之前,我们先来了解协作的基本方法wait/notify。

15.3.2 wait/notify

我们知道,Java的根父类是Object, Java在Object类而非Thread类中定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类,一类是wait,另一类是notify。

主要有两个wait方法:

1
2
public final void wait() throws InterruptedException
public final native void wait(long timeout) throws InterruptedException;

一个带时间参数,单位是毫秒,表示最多等待这么长时间,参数为0表示无限期等待;一个不带时间参数,表示无限期等待,实际就是调用wait(0)。在等待期间都可以被中断,如果被中断,会抛出InterruptedException。关于中断及中断处理,我们在下节介绍,本节暂时忽略该异常。

wait实际上做了什么呢?它在等待什么?上节我们说过,每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,会尝试获取锁,如果获取不到则会把当前线程加入等待队列中,其实,除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用Object的notify方法:

1
2
public final native void notify();
public final native void notifyAll();

notify做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll和notify的区别是,它会移除条件队列中所有的线程并全部唤醒。

我们来看个简单的例子,一个线程启动后,在执行一项操作前,它需要等待主线程给它指令,收到指令后才执行,如代码清单15-12所示。

代码清单15-12 简单协作示例WaitThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WaitThread extends Thread {
private volatile boolean fire = false;
@Override
public void run() {
try {
synchronized (this) {
while(! fire) {
wait();
}
}
System.out.println("fired");
} catch(InterruptedException e) {
}
}
public synchronized void fire() {
this.fire = true;
notify();
}
public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}

示例代码中有两个线程,一个是主线程,一个是WaitThread,协作的条件变量是fire, WaitThread等待该变量变为true,在不为true的时候调用wait,主线程设置该变量并调用notify。

两个线程都要访问协作的变量fire,容易出现竞态条件,所以相关代码都需要被synchronized保护。实际上,wait/notify方法只能在synchronized代码块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常java.lang.IllegalMonitor-StateException。

你可能会有疑问,如果wait必须被synchronized保护,那一个线程在wait时,另一个线程怎么可能调用同样被synchronized保护的notify方法呢?它不需要等待锁吗?我们需要进一步理解wait的内部过程,虽然是在synchronized方法内,但调用wait时,线程会释放对象锁。wait的具体过程是:
1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。
2)等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:

  • 如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回。
  • 否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从wait调用中返回。

线程从wait调用中返回后,不代表其等待的条件就一定成立了,它需要重新检查其等待的条件,一般的调用模式是:

1
2
3
4
5
synchronized (obj) {
while(条件不成立)
obj.wait();
//执行条件满足后的操作
}

比如,上例中的代码是:

1
2
3
4
5
synchronized (this) {
while(! fire) {
wait();
}
}

调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁,也就是说,只有在包含notify的synchronized代码块执行完后,等待的线程才会从wait调用中返回。

简单总结一下,wait/notify方法看上去很简单,但往往难以理解wait等的到底是什么,而notify通知的又是什么,我们需要知道,它们被不同的线程调用,但共享相同的锁和条件等待队列(相同对象的synchronized代码块内),它们围绕一个共享的条件变量进行协作,这个条件变量是程序自己维护的,当条件不成立时,线程调用wait进入条件等待队列,另一个线程修改了条件变量后调用notify,调用wait的线程唤醒后需要重新检查条件变量。从多线程的角度看,它们围绕共享变量进行协作,从调用wait的线程角度看,它阻塞等待一个条件的成立。我们在设计多线程协作时,需要想清楚协作的共享变量和条件是什么,这是协作的核心。接下来,我们通过一些场景进一步理解wait/notify的应用。

15.3.3 生产者/消费者模式

在生产者/消费者模式中,协作的共享变量是队列,生产者往队列上放数据,如果满了就wait,而消费者从队列上取数据,如果队列为空也wait。我们将队列作为单独的类进行设计,如代码清单15-13所示。

代码清单15-13 生产者/消费者协作队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<>(limit);
}
public synchronized void put(E e) throws InterruptedException {
while(queue.size() == limit) {
wait();
}
queue.add(e);
notifyAll();
}
public synchronized E take() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
E e = queue.poll();
notifyAll();
return e;
}
}

MyBlockingQueue是一个长度有限的队列,长度通过构造方法的参数进行传递,有两个方法:put和take。put是给生产者使用的,往队列上放数据,满了就wait,放完之后调用notifyAll,通知可能的消费者。take是给消费者使用的,从队列中取数据,如果为空就wait,取完之后调用notifyAll,通知可能的生产者。

我们看到,put和take都调用了wait,但它们的目的是不同的,或者说,它们等待的条件是不一样的,put等待的是队列不为满,而take等待的是队列不为空,但它们都会加入相同的条件等待队列。由于条件不同但又使用相同的等待队列,所以要调用notifyAll而不能调用notify,因为notify只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用。

只能有一个条件等待队列,这是Java wait/notify机制的局限性,这使得对于等待条件的分析变得复杂,后续章节我们会介绍显式的锁和条件,它可以解决该问题。

一个简单的生产者代码如代码清单15-14所示。

代码清单15-14 一个简单的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
while(true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce task " + task);
num++;
Thread.sleep((int) (Math.random() * 100));
}
} catch (InterruptedException e) {
}
}
}

Producer向共享队列中插入模拟的任务数据。一个简单的消费者代码如代码清单15-15所示。

代码清单15-15 一个简单的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String task = queue.take();
System.out.println("handle task " + task);
Thread.sleep((int)(Math.random()*100));
}
} catch(InterruptedException e) {
}
}
}

主程序的示例代码如下所示:

1
2
3
4
5
public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
new Producer(queue).start();
new Consumer(queue).start();
}

运行该程序,会看到生产者和消费者线程的输出交替出现。

我们实现的MyBlockingQueue主要用于演示,Java提供了专门的阻塞队列实现,包括:

  • 接口BlockingQueue和BlockingDeque。
  • 基于数组的实现类ArrayBlockingQueue。
  • 基于链表的实现类LinkedBlockingQueue和LinkedBlockingDeque。
  • 基于堆的实现类PriorityBlockingQueue。

我们会在后续章节介绍这些类,在实际系统中,应该优先考虑使用这些类。

15.3.4 同时开始

同时开始,类似于运动员比赛,在听到比赛开始枪响后同时开始,下面,我们模拟这个过程。这里,有一个主线程和N个子线程,每个子线程模拟一个运动员,主线程模拟裁判,它们协作的共享变量是一个开始信号。我们用一个类FireFlag来表示这个协作对象,如代码清单15-16所示。

代码清单15-16 协作对象FireFlag
1
2
3
4
5
6
7
8
9
10
11
12
static class FireFlag {
private volatile boolean fired = false;
public synchronized void waitForFire() throws InterruptedException {
while(! fired) {
wait();
}
}
public synchronized void fire() {
this.fired = true;
notifyAll();
}
}

子线程应该调用waitForFire()等待枪响,而主线程应该调用fire()发射比赛开始信号。

表示比赛运动员的类如代码清单15-17所示。

代码清单15-17 表示比赛运动员的类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class Racer extends Thread {
FireFlag fireFlag;
public Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
@Override
public void run() {
try {
this.fireFlag.waitForFire();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}

主程序代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
int num = 10;
FireFlag fireFlag = new FireFlag();
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(fireFlag);
racers[i].start();
}
Thread.sleep(1000);
fireFlag.fire();
}

这里,启动了10个子线程,每个子线程启动后等待fire信号,主线程调用fire()后各个子线程才开始执行后续操作。

15.3.5 等待结束

在15.1.2节中我们使用join方法让主线程等待子线程结束,join实际上就是调用了wait,其主要代码是:

1
2
3
while (isAlive()) {
wait(0);
}

只要线程是活着的,isAlive()返回true, join就一直等待。谁来通知它呢?当线程运行结束的时候,Java系统调用notifyAll来通知。

使用join有时比较麻烦,需要主线程逐一等待每个子线程。这里,我们演示一种新的写法。主线程与各个子线程协作的共享变量是一个数,这个数表示未完成的线程个数,初始值为子线程个数,主线程等待该值变为0,而每个子线程结束后都将该值减一,当减为0时调用notifyAll,我们用MyLatch来表示这个协作对象,如代码清单15-18所示。

代码清单15-18 协作对象MyLatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyLatch {
private int count;
public MyLatch(int count) {
this.count = count;
}
public synchronized void await() throws InterruptedException {
while(count > 0) {
wait();
}
}
public synchronized void countDown() {
count--;
if(count <= 0) {
notifyAll();
}
}
}

这里,MyLatch构造方法的参数count应初始化为子线程的个数,主线程应该调用await(),而子线程在执行完后应该调用countDown()。工作子线程的示例代码如代码清单15-19所示。

代码清单15-19 使用MyLatch的工作子线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class Worker extends Thread {
MyLatch latch;
public Worker(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
//simulate working on task
Thread.sleep((int) (Math.random() * 1000));
this.latch.countDown();
} catch (InterruptedException e) {
}
}
}

主线程的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
MyLatch latch = new MyLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i = 0; i < workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}

MyLatch是一个用于同步协作的工具类,主要用于演示基本原理,在Java中有一个专门的同步类CountDownLatch,在实际开发中应该使用它。关于CountDownLatch,我们会在后续章节介绍。

MyLatch的功能是比较通用的,它也可以应用于上面“同时开始”的场景,初始值设为1, Racer类调用await(),主线程调用countDown()即可,如代码清单15-20所示。

代码清单15-20 使用MyLatch实现同时开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RacerWithLatchDemo {
static class Racer extends Thread {
MyLatch latch;
public Racer(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
this.latch.await();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
MyLatch latch = new MyLatch(1);
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}

15.3.6 异步结果

在主从模式中,手工创建线程往往比较麻烦,一种常见的模式是异步调用,异步调用返回一个一般称为Future的对象,通过它可以获得最终的结果。在Java中,表示子任务的接口是Callable,声明为:

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

为表示异步调用的结果,我们定义一个接口MyFuture,如下所示:

1
2
3
public interface MyFuture <V> {
V get() throws Exception ;
}

这个接口的get方法返回真正的结果,如果结果还没有计算完成,get方法会阻塞直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常。

为方便主线程调用子任务,我们定义一个类MyExecutor,其中定义一个public方法execute,表示执行子任务并返回异步结果,声明如下:

1
public <V> MyFuture<V> execute(final Callable<V> task)

利用该方法,对于主线程,就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果。比如,在主线程中,可以类似代码清单15-21那样启动异步调用并获取结果:

代码清单15-21 异步调用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
//子任务
Callable<Integer> subTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//…执行异步任务
int millis = (int) (Math.random() * 1000);
Thread.sleep(millis);
return millis;
}
};
//异步调用,返回一个MyFuture对象
MyFuture<Integer> future = executor.execute(subTask);
//…执行其他操作
try {
//获取异步调用的结果
Integer result = future.get();
System.out.println(result);
} catch(Exception e) {
e.printStackTrace();
}
}

MyExecutor的execute方法是怎么实现的呢?它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程,该子线程如代码清单15-22所示。

代码清单15-22 执行子线程ExecuteThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static class ExecuteThread<V> extends Thread {
private V result = null;
private Exception exception = null;
private boolean done = false;
private Callable<V> task;
private Object lock;
public ExecuteThread(Callable<V> task, Object lock) {
this.task = task;
this.lock = lock;
}
@Override
public void run() {
try {
result = task.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
}
public V getResult() {
return result;
}
public boolean isDone() {
return done;
}
public Exception getException() {
return exception;
}
}

这个子线程执行实际的子任务,记录执行结果到result变量、异常到exception变量,执行结束后设置共享状态变量done为true,并调用notifyAll,以唤醒可能在等待结果的主线程。

MyExecutor的execute方法如代码清单15-23所示。

代码清单15-23 异步执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public <V> MyFuture<V> execute(final Callable<V> task) {
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
thread.start();
MyFuture<V> future = new MyFuture<V>() {
@Override
public V get() throws Exception {
synchronized (lock) {
while(! thread.isDone()) {
try {
lock.wait();
} catch (InterruptedException e) {
}
}
if(thread.getException() ! = null) {
throw thread.getException();
}
return thread.getResult();
}
}
};
return future;
}

execute启动一个线程,并返回MyFuture对象,MyFuture的get方法会阻塞等待直到线程运行结束。

以上的MyExecutore和MyFuture主要用于演示基本原理,实际上,Java中已经包含了一套完善的框架Executors,相关的部分接口和类有:

  • 表示异步结果的接口Future和实现类FutureTask。
  • 用于执行异步任务的接口Executor,以及有更多功能的子接口ExecutorService。
  • 用于创建Executor和ExecutorService的工厂方法类Executors。

后续章节,我们会详细介绍这套框架。

15.3.7 集合点

各个线程先是分头行动,各自到达一个集合点,在集合点需要集齐所有线程,交换数据,然后再进行下一步动作。怎么表示这种协作呢?协作的共享变量依然是一个数,这个数表示未到集合点的线程个数,初始值为子线程个数,每个线程到达集合点后将该值减一,如果不为0,表示还有别的线程未到,进行等待,如果变为0,表示自己是最后一个到的,调用notifyAll唤醒所有线程。我们用AssemblePoint类来表示这个协作对象,如代码清单15-24所示。

代码清单15-24 协作对象AssemblePoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class AssemblePoint {
private int n;
public AssemblePoint(int n) {
this.n = n;
}
public synchronized void await() throws InterruptedException {
if(n > 0) {
n--;
if(n == 0) {
notifyAll();
} else {
while(n ! = 0) {
wait();
}
}
}
}
}

多个游客线程各自先独立运行,然后使用该协作对象到达集合点进行同步的示例如代码清单15-25所示。

代码清单15-25 集合点协作示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AssemblePointDemo {
static class Tourist extends Thread {
AssemblePoint ap;
public Tourist(AssemblePoint ap) {
this.ap = ap;
}
@Override
public void run() {
try {
//模拟先各自独立运行
Thread.sleep((int) (Math.random() * 1000));
//集合
ap.await();
System.out.println("arrived");
//…集合后执行其他操作
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
int num = 10;
Tourist[] threads = new Tourist[num];
AssemblePoint ap = new AssemblePoint(num);
for(int i = 0; i < num; i++) {
threads[i] = new Tourist(ap);
threads[i].start();
}
}
}

这里实现的AssemblePoint主要用于演示基本原理,Java中有一个专门的同步工具类CyclicBarrier可以替代它,关于该类,我们后续章节介绍。

15.3.8 小结

本节介绍了Java中线程间协作的基本机制wait/notify,协作关键要想清楚协作的共享变量和条件是什么,为进一步理解,针对多种协作场景,我们演示了wait/notify的用法及基本协作原理。Java中有专门为协作而建的阻塞队列、同步工具类,以及Executors框架,我们会在后续章节介绍。在实际开发中,应该尽量使用这些现成的类,而非“重新发明轮子”。