20.2 线程的协作机制

多线程之间的核心问题,除了竞争,就是协作。我们在15.3节介绍了多种协作场景,比如生产者/消费者协作模式、主从协作模式、同时开始、集合点等。之前章节探讨了协作的多种机制:

  • wait/notify;
  • 显式条件;
  • 线程的中断;
  • 协作工具类;
  • 阻塞队列;
  • Future/FutureTask。

(1)wait/notify

wait/notify与synchronized配合一起使用,是线程的基本协作机制。每个对象都有一把锁和两个等待队列,一个是锁等待队列,放的是等待获取锁的线程;另一个是条件等待队列,放的是等待条件的线程,wait将自己加入条件等待队列,notify从条件等待队列上移除一个线程并唤醒,notifyAll移除所有线程并唤醒。

需要注意的是,wait/notify方法只能在synchronized代码块内被调用,调用wait时,线程会释放对象锁,被notify/notifyAll唤醒后,要重新竞争对象锁,获取到锁后才会从wait调用中返回,返回后,不代表其等待的条件就一定成立了,需要重新检查其等待的条件。

wait/notify方法看上去很简单,但往往难以理解wait等的到底是什么,而notify通知的又是什么,只能有一个条件等待队列,这也是wait/notify机制的局限性,这使得对于等待条件的分析变得复杂,15.3节通过多个例子演示了其用法,这里就不赘述了。

(2)显式条件

显式条件与显式锁配合使用,与wait/notify相比,可以支持多个条件队列,代码更为易读,效率更高。使用时注意不要将signal/signalAll误写为notify/notifyAll。

(3)线程的中断

Java中取消/关闭一个线程的方式是中断。中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出,线程在不同状态和IO操作时对中断有不同的反应。作为线程的实现者,应该提供明确的取消/关闭方法,并用文档清楚描述其行为;作为线程的调用者,应该使用其取消/关闭方法,而不是贸然调用interrupt。

(4)协作工具类

除了基本的显式锁和条件,针对常见的协作场景,Java并发包提供了多个用于协作的工具类。

信号量类Semaphore用于限制对资源的并发访问数。

倒计时门栓CountDownLatch主要用于不同角色线程间的同步,比如在裁判/运动员模式中,裁判线程让多个运动员线程同时开始,也可以用于协调主从线程,让主线程等待多个从线程的结果。

循环栅栏CyclicBarrier用于同一角色线程间的协调一致,所有线程在到达栅栏后都需要等待其他线程,等所有线程都到达后再一起通过,它是循环的,可以用作重复的同步。

(5)阻塞队列

对于最常见的生产者/消费者协作模式,可以使用阻塞队列,阻塞队列封装了锁和条件,生产者线程和消费者线程只需要调用队列的入队/出队方法就可以了,不需要考虑同步和协作问题。

阻塞队列有普通的先进先出队列,包括基于数组的ArrayBlockingQueue和基于链表的LinkedBlockingQueue/LinkedBlockingDeque,也有基于堆的优先级阻塞队列PriorityBlock-ingQueue,还有可用于定时任务的延时阻塞队列DelayQueue,以及用于特殊场景的阻塞队列SynchronousQueue和LinkedTransferQueue。

(6)Future/FutureTask

在常见的主从协作模式中,主线程往往是让子线程异步执行一项任务,获取其结果。手工创建子线程的写法往往比较麻烦,常见的模式是使用异步任务执行服务,不再手工创建线程,而只是提交任务,提交后马上得到一个结果,但这个结果不是最终结果,而是一个Future。Future是一个接口,主要实现类是FutureTask。

Future封装了主线程和执行线程关于执行状态和结果的同步,对于主线程而言,它只需要通过Future就可以查询异步任务的状态、获取最终结果、取消任务等,不需要再考虑同步和协作问题。

第20章 并发总结

从第15章到第19章,我们一直在讨论并发,本章进行简要总结。多线程开发有两个核心问题:一个是竞争,另一个是协作。竞争会出现线程安全问题,所以,本章首先总结线程安全的机制,然后是协作的机制。管理竞争和协作是复杂的,所以Java提供了更高层次的服务,比如并发容器类和异步任务执行服务,我们也会进行总结。本章纲要如下:

  • 线程安全的机制;
  • 线程的协作机制;
  • 容器类;
  • 任务执行服务。

20.1 线程安全的机制

线程表示一条单独的执行流,每个线程有自己的执行计数器,有自己的栈,但可以共享内存,共享内存是实现线程协作的基础,但共享内存有两个问题,竞态条件和内存可见性,之前章节探讨了解决这些问题的多种思路:

  • 使用synchronized;
  • 使用显式锁;
  • 使用volatile;
  • 使用原子变量和CAS;
  • 写时复制;
  • 使用ThreadLocal。

(1)synchronized

synchronized简单易用,它只是一个关键字,大部分情况下,放到类的方法声明上就可以了,既可以解决竞态条件问题,也可以解决内存可见性问题。

需要理解的是,它保护的是对象,而不是代码,只有对同一个对象的synchronized方法调用,synchronized才能保证它们被顺序调用。对于实例方法,这个对象是this;对于静态方法,这个对象是类对象;对于代码块,需要指定哪个对象。

另外,需要注意,它不能尝试获取锁,也不响应中断,还可能会死锁。不过,相比显式锁,synchronized简单易用,JVM也可以不断优化它的实现,应该被优先使用。

(2)显式锁

显式锁是相对于synchronized隐式锁而言的,它可以实现synchronized同样的功能,但需要程序员自己创建锁,调用锁相关的接口,主要接口是Lock,主要实现类是Reen-trantLock。

相比synchronized,显式锁支持以非阻塞方式获取锁,可以响应中断,可以限时,可以指定公平性,可以解决死锁问题,这使得它灵活得多。

在读多写少、读操作可以完全并行的场景中,可以使用读写锁以提高并发度,读写锁的接口是ReadWriteLock,实现类是ReentrantReadWriteLock。

(3)volatile

synchronized和显式锁都是锁,使用锁可以实现安全,但使用锁是有成本的,获取不到锁的线程还需要等待,会有线程的上下文切换开销等。保证安全不一定需要锁。如果共享的对象只有一个,操作也只是进行最简单的get/set操作,set也不依赖于之前的值,那就不存在竞态条件问题,而只有内存可见性问题,这时,在变量的声明上加上volatile就可以了。

(4)原子变量和CAS

使用volatile, set的新值不能依赖于旧值,但很多时候,set的新值与原来的值有关,这时,也不一定需要锁,如果需要同步的代码比较简单,可以考虑原子变量,它们包含了一些以原子方式实现组合操作的方法,对于并发环境中的计数、产生序列号等需求,考虑使用原子变量而非锁。

原子变量的基础是CAS,一般的计算机系统都在硬件层次上直接支持CAS指令。通过循环CAS的方式实现原子更新是一种重要的思维。相比synchronized,它是乐观的,而synchronized是悲观的;它是非阻塞式的,而synchronized是阻塞式的。CAS是Java并发包的基础,基于它可以实现高效的、乐观、非阻塞式数据结构和算法,它也是并发包中锁、同步工具和各种容器的基础。

(5)写时复制

之所以会有线程安全的问题,是因为多个线程并发读写同一个对象,如果每个线程读写的对象都是不同的,或者,如果共享访问的对象是只读的,不能修改,那也就不存在线程安全问题了。

我们在介绍容器类CopyOnWriteArrayList和CopyOnWriteArraySet时介绍了写时复制技术,写时复制就是将共享访问的对象变为只读的,写的时候,再使用锁,保证只有一个线程写,写的线程不是直接修改原对象,而是新创建一个对象,对该对象修改完毕后,再原子性地修改共享访问的变量,让它指向新的对象。

(6)ThreadLocal

ThreadLocal就是让每个线程,对同一个变量,都有自己的独有副本,每个线程实际访问的对象都是自己的,自然也就不存在线程安全问题了。

19.5 理解ThreadLocal

本节,我们来探讨一个特殊的概念:线程本地变量。在Java中的实现是类ThreadLocal,它是什么?有什么用?实现原理是什么?让我们接下来逐步探讨。

19.5.1 基本概念和用法

线程本地变量是说,每个线程都有同一个变量的独有拷贝。这个概念听上去比较难以理解,我们先直接来看类TheadLocal的用法。ThreadLocal是一个泛型类,接受一个类型参数T,它只有一个空的构造方法,有两个主要的public方法:

1
2
public T get()
public void set(T value)

set就是设置值,get就是获取值,如果没有值,返回null,看上去,ThreadLocal就是一个单一对象的容器,比如:

1
2
3
4
5
public static void main(String[] args) {
ThreadLocal<Integer> local = new ThreadLocal<>();
local.set(100);
System.out.println(local.get());
}

输出为100。那ThreadLocal有什么特殊的呢?特殊发生在有多个线程的时候,看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadLocalBasic {
static ThreadLocal<Integer> local = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
Thread child = new Thread() {
@Override
public void run() {
System.out.println("child thread initial: " + local.get());
local.set(200);
System.out.println("child thread final: " + local.get());
}
};
local.set(100);
child.start();
child.join();
System.out.println("main thread final: " + local.get());
}
}

local是一个静态变量,main方法创建了一个子线程child, main和child都访问了local,程序的输出为:

1
2
3
child thread initial: null
child thread final: 200
main thread final: 100

这说明,main线程对local变量的设置对child线程不起作用,child线程对local变量的改变也不会影响main线程,它们访问的虽然是同一个变量local,但每个线程都有自己的独立的值,这就是线程本地变量的含义

除了get/set, ThreadLocal还有两个方法:

1
2
protected T initialValue()
public void remove()

initialValue用于提供初始值,这是一个受保护方法,可以通过匿名内部类的方式提供,当调用get方法时,如果之前没有设置过,会调用该方法获取初始值,默认实现是返回null。remove删掉当前线程对应的值,如果删掉后,再次调用get,会再调用initialValue获取初始值。看个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadLocalInit {
static ThreadLocal<Integer> local = new ThreadLocal<Integer>(){
@Override
protected Integer initialValue() {
return 100;
}
};
public static void main(String[] args) {
System.out.println(local.get());
local.set(200);
local.remove();
System.out.println(local.get());
}
}

输出值都是100。

19.5.2 使用场景

ThreadLocal有什么用呢?我们来看三个例子:日期处理、随机数和上下文信息。

1.日期处理

ThreadLocal是实现线程安全的一种方案,比如对于DateFormat/SimpleDateFormat,我们在介绍日期和时间操作的时候,提到它们是非线程安全的,实现安全的一种方式是使用锁,另一种方式是每次都创建一个新的对象,更好的方式就是使用ThreadLocal,每个线程使用自己的DateFormat,就不存在安全问题了,在线程的整个使用过程中,只需要创建一次,又避免了频繁创建的开销,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadLocalDateFormat {
static ThreadLocal<DateFormat> sdf = new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
public static String date2String(Date date) {
return sdf.get().format(date);
}
public static Date string2Date(String str) throws ParseException {
return sdf.get().parse(str);
}
}

需要说明的是,ThreadLocal对象一般都定义为static,以便于引用。

2.随机数

即使对象是线程安全的,使用ThreadLocal也可以减少竞争,比如,我们在介绍Random类的时候提到,Random是线程安全的,但如果并发访问竞争激烈的话,性能会下降,所以Java并发包提供了类ThreadLocalRandom,它是Random的子类,利用了ThreadLocal,它没有public的构造方法,通过静态方法current获取对象,比如:

1
2
3
4
public static void main(String[] args) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
System.out.println(rnd.nextInt());
}

current方法的实现为:

1
2
3
public static ThreadLocalRandom current() {
return localRandom.get();
}

localRandom就是一个ThreadLocal变量:

1
2
3
4
5
6
private static final ThreadLocal<ThreadLocalRandom> localRandom =
new ThreadLocal<ThreadLocalRandom>() {
protected ThreadLocalRandom initialValue() {
return new ThreadLocalRandom();
}
};

3.上下文信息

ThreadLocal的典型用途是提供上下文信息,比如在一个Web服务器中,一个线程执行用户的请求,在执行过程中,很多代码都会访问一些共同的信息,比如请求信息、用户身份信息、数据库连接、当前事务等,它们是线程执行过程中的全局信息,如果作为参数在不同代码间传递,代码会很烦琐,这时,使用ThreadLocal就很方便,所以它被用于各种框架如Spring中。我们看个简单的示例,如代码清单19-6所示。

代码清单19-6 使用ThreadLocal保存上下文信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RequestContext {
public static class Request { //...
};
private static ThreadLocal<String> localUserId = new ThreadLocal<>();
private static ThreadLocal<Request> localRequest = new ThreadLocal<>();
public static String getCurrentUserId() {
return localUserId.get();
}
public static void setCurrentUserId(String userId) {
localUserId.set(userId);
}
public static Request getCurrentRequest() {
return localRequest.get();
}
public static void setCurrentRequest(Request request) {
localRequest.set(request);
}
}

在首次获取到信息时,调用set方法如setCurrentRequest/setCurrentUserId进行设置,然后就可以在代码的任意其他地方调用get相关方法进行获取了。

19.5.3 基本实现原理

ThreadLocal是怎么实现的呢?为什么对同一个对象的get/set,每个线程都能有自己独立的值呢?我们直接来看代码(基于Java
7)。set方法的代码为:

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map ! = null)
map.set(this, value);
else
createMap(t, value);
}

它调用了getMap, getMap的代码为:

1
2
3
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

返回线程的实例变量threadLocals,它的初始值为null,在null时,set调用createMap初始化,代码为:

1
2
3
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

从以上代码可以看出,每个线程都有一个Map,类型为ThreadLocalMap,调用set实际上是在线程自己的Map里设置了一个条目,键为当前的ThreadLocal对象,值为value。ThreadLocalMap是一个内部类,它是专门用于ThreadLocal的,与一般的Map不同,它的键类型为WeakReference<ThreadLocal>。我们没有提过WeakReference,它与Java的垃圾回收机制有关,使用它,便于回收内存,具体我们就不探讨了。

get方法的代码为:

1
2
3
4
5
6
7
8
9
10
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map ! = null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if(e ! = null)
return (T)e.value;
}
return setInitialValue();
}

通过线程访问到Map,以ThreadLocal对象为键从Map中获取到条目,取其value,如果Map中没有,则调用setInitialValue,其代码为:

1
2
3
4
5
6
7
8
9
10
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map ! = null)
map.set(this, value);
else
createMap(t, value);
return value;
}

initialValue()就是之前提到的提供初始值的方法,默认实现就是返回null。

remove方法的代码也很直接,如下所示:

1
2
3
4
5
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if(m ! = null)
m.remove(this);
}

简单总结下,每个线程都有一个Map,对于每个ThreadLocal对象,调用其get/set实际上就是以ThreadLocal对象为键读写当前线程的Map,这样,就实现了每个线程都有自己的独立副本的效果。

本章介绍了Java并发包中的一些同步协作工具:
1)在读多写少的场景中使用ReentrantReadWriteLock替代ReentrantLock,以提高性能。
2)使用Semaphore限制对资源的并发访问数。
3)使用CountDownLatch实现不同角色线程间的同步。
4)使用CyclicBarrier实现同一角色线程间的协调一致。

关于ThreadLocal,本章介绍了它的基本概念、用法用途和实现原理,简单总结来说:
1)ThreadLocal使得每个线程对同一个变量有自己的独立副本,是实现线程安全、减少竞争的一种方案。
2)ThreadLocal经常用于存储上下文信息,避免在不同代码间来回传递,简化代码。
3)每个线程都有一个Map,调用ThreadLocal对象的get/set实际就是以ThreadLocal对象为键读写当前线程的该Map。

至此,关于并发就介绍完了,下一章,让我们一起回顾总结一下。

19.4 循环栅栏CyclicBarrier

我们在15.3.7节使用wait/notify实现了一个简单的集合点AssemblePoint,我们提到, Java并发包中已经提供了类似工具,就是CyclicBarrier。它相当于是一个栅栏,所有线程在到达该栅栏后都需要等待其他线程,等所有线程都到达后再一起通过,它是循环的,可以用作重复的同步。

CyclicBarrier特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

与CountDownLatch类似,它也有一个数字,但表示的是参与的线程个数,这个数字通过构造方法进行传递:

1
public CyclicBarrier(int parties)

它还有一个构造方法,接受一个Runnable参数,如下所示:

1
public CyclicBarrier(int parties, Runnable barrierAction)

这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。

CyclicBarrier的主要方法就是await:

1
2
3
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException

await在等待其他线程到达栅栏,调用await后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。

await可以被中断,可以限定最长等待时间,中断或超时后会抛出异常。需要说明的是异常BrokenBarrierException,它表示栅栏被破坏了,什么意思呢?在CyclicBarrier中,参与的线程是互相影响的,只要其中一个线程在调用await时被中断了,或者超时了,栅栏就会被破坏。此外,如果栅栏动作抛出了异常,栅栏也会被破坏。被破坏后,所有在调用await的线程就会退出,抛出BrokenBarrierException。

我们看一个简单的例子,多个游客线程分别在集合点A和B同步,如代码清单19-5所示。

代码清单19-5 CyclicBarrier应用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException
public class CyclicBarrierDemo {
static class Tourist extends Thread {
CyclicBarrier barrier;
public Tourist(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
//模拟先各自独立运行
Thread.sleep((int) (Math.random() * 1000));
//集合点A
barrier.await();
System.out.println(this.getName() + " arrived A "
+ System.currentTimeMillis());
//集合后模拟再各自独立运行
Thread.sleep((int) (Math.random() * 1000));
//集合点B
barrier.await();
System.out.println(this.getName() + " arrived B "
+ System.currentTimeMillis());
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
public static void main(String[] args) {
int num = 3;
Tourist[] threads = new Tourist[num];
CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {
@Override
public void run() {
System.out.println("all arrived " + System.currentTimeMillis()
+ " executed by " + Thread.currentThread().getName());
}
});
for(int i = 0; i < num; i++) {
threads[i] = new Tourist(barrier);
threads[i].start();
}
}
}

在笔者的计算机中的一次输出为:

1
2
3
4
5
6
7
8
all arrived 1490053578552 executed by Thread-1
Thread-1 arrived A 1490053578555
Thread-2 arrived A 1490053578555
Thread-0 arrived A 1490053578555
all arrived 1490053578889 executed by Thread-0
Thread-0 arrived B 1490053578890
Thread-2 arrived B 1490053578890
Thread-1 arrived B 1490053578890

多个线程到达A和B的时间是一样的,使用CyclicBarrier,达到了重复同步的目的。

CyclicBarrier与CountDownLatch可能容易混淆,我们强调下它们的区别。
1)CountDownLatch的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为0,负责倒计时和等待倒计时的线程都可以有多个,用于不同角色线程间的同步。
2)CyclicBarrier的参与线程角色是一样的,用于同一角色线程间的协调一致。
3)CountDownLatch是一次性的,而CyclicBarrier是可以重复利用的。

19.3 倒计时门栓CountDownLatch

我们在15.3.5节使用wait/notify实现了一个简单的门栓MyLatch,我们提到,Java并发包中已经提供了类似工具,就是CountDownLatch。它相当于是一个门栓,一开始是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为0后,门栓打开,等待的所有线程都可以通过,它是一次性的,打开后就不能再关上了。

CountDownLatch里有一个计数,这个计数通过构造方法进行传递:

1
public CountDownLatch(int count)

多个线程可以基于这个计数进行协作,它的主要方法有:

1
2
3
public void await() throws InterruptedException
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
public void countDown()

await检查计数是否为0,如果大于0,就等待,await可以被中断,也可以设置最长等待时间。countDown检查计数,如果已经为0,直接返回,否则减少计数,如果新的计数变为0,则唤醒所有等待的线程。

之前,我们介绍了门栓的两种应用场景:一种是同时开始,另一种是主从协作。它们都有两类线程,互相需要同步,我们使用CountDownLatch重新演示。

在同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用await,主线程调用countDown,如代码清单19-3所示。

代码清单19-3 使用CountDownLatch实现同时开始场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RacerWithCountDownLatch {
static class Racer extends Thread {
CountDownLatch latch;
public Racer(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
this.latch.await();
System.out.println(getName()
+ " start run "+System.currentTimeMillis());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
CountDownLatch latch = new CountDownLatch(1);
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}

代码比较简单,就不赘述了。在主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用count-Down,主线程调用await进行等待,如代码清单19-4所示。

代码清单19-4 使用CountDownLatch实现主从协作场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MasterWorkerDemo {
static class Worker extends Thread {
CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
//模拟执行任务
Thread.sleep((int) (Math.random() * 1000));
//模拟异常情况
if(Math.random() < 0.02) {
throw new RuntimeException("bad luck");
}
} catch (InterruptedException e) {
} finally {
this.latch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
CountDownLatch latch = new CountDownLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i = 0; i < workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
}

需要强调的是,在这里,countDown的调用应该放到finally语句中,确保在工作线程发生异常的情况下也会被调用,使主线程能够从await调用中返回。

19.2 信号量Semaphore

之前介绍的锁都是限制只有一个线程可以同时访问一个资源。现实中,资源往往有多个,但每个同时只能被一个线程访问,比如,饭店的饭桌、火车上的卫生间。有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。还有的情况,与软件的授权和计费有关,对不同等级的账户,限制不同的最大并发访问数。

信号量类Semaphore就是用来解决这类问题的,它可以限制对资源的并发访问数,它有两个构造方法:

1
2
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)

fire表示公平,含义与之前介绍的是类似的,permits表示许可数量。

Semaphore的方法与锁是类似的,主要的方法有两类,获取许可和释放许可,主要方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//阻塞获取许可
public void acquire() throws InterruptedException
//阻塞获取许可,不响应中断
public void acquireUninterruptibly()
//批量获取多个许可
public void acquire(int permits) throws InterruptedException
public void acquireUninterruptibly(int permits)
//尝试获取
public boolean tryAcquire()
//限定等待时间获取
public boolean tryAcquire(int permits, long timeout,
TimeUnit unit) throws InterruptedException
//释放许可
public void release()

我们看个简单的示例,限制并发访问的用户数不超过100,如代码清单19-2所示。

代码清单19-2 Semaphore应用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class AccessControlService {
public static class ConcurrentLimitException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
private static final int MAX_PERMITS = 100;
private Semaphore permits = new Semaphore(MAX_PERMITS, true);
public boolean login(String name, String password) {
if(! permits.tryAcquire()) {
//同时登录用户数超过限制
throw new ConcurrentLimitException();
}
//…其他验证
return true;
}
public void logout(String name) {
permits.release();
}
}

代码比较简单,就不赘述了。需要说明的是,如果我们将permits的值设为1,你可能会认为它就变成了一般的锁,不过,它与一般的锁是不同的。一般锁只能由持有锁的线程释放,而Semaphore表示的只是一个许可数,任意线程都可以调用其release方法。主要的锁实现类ReentrantLock是可重入的,而Semaphore不是,每一次的acquire调用都会消耗一个许可,比如,看下面的代码段:

1
2
3
4
Semaphore permits = new Semaphore(1);
permits.acquire();
permits.acquire();
System.out.println("acquired");

程序会阻塞在第二个acquire调用,永远都不会输出“acquired”。

信号量的基本原理比较简单,也是基于AQS实现的,permits表示共享的锁个数,acquire方法就是检查锁个数是否大于0,大于则减一,获取成功,否则就等待,release就是将锁个数加一,唤醒第一个等待的线程。

第18章 异步任务执行服务

在之前的介绍中,线程Thread既表示要执行的任务,又表示执行的机制。Java并发包提供了一套框架,大大简化了执行异步任务所需的开发。这套框架引入了一个“执行服务”的概念,它将“任务的提交”和“任务的执行”相分离,“执行服务”封装了任务执行的细节,对于任务提交者而言,它可以关注于任务本身,如提交任务、获取结果、取消任务,而不需要关注任务执行的细节,如线程创建、任务调度、线程关闭等。

本章我们就来探讨这套框架,具体分为3个小节:18.1节介绍基本概念和原理;18.2节介绍任务执行服务的主要实现机制:线程池;18.3节介绍定时任务的执行服务。

18.1 基本概念和原理

下面,我们来看异步任务执行服务的基本接口、用法和实现原理。

18.1.1 基本接口

首先,我们来看任务执行服务涉及的基本接口:

  • Runnable和Callable:表示要执行的异步任务。
  • Executor和ExecutorService:表示执行服务。
  • Future:表示异步任务的结果。

关于Runnable和Callable,我们在前面章节都已经了解了,都表示任务,Runnable没有返回结果,而Callable有,Runnable不会抛出异常,而Callable会。

Executor表示最简单的执行服务,其定义为:

1
2
3
public interface Executor {
void execute(Runnable command);
}

就是可以执行一个Runnable,没有返回结果。接口没有限定任务如何执行,可能是创建一个新线程,可能是复用线程池中的某个线程,也可能是在调用者线程中执行。

ExecutorService扩展了Executor,定义了更多服务,基本方法有:

1
2
3
4
5
6
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<? > submit(Runnable task);
//... 其他方法
}

这三个submit都表示提交一个任务,返回值类型都是Future,返回后,只是表示任务已提交,不代表已执行,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。我们知道,对于Callable,任务最终有个返回值,而对于Runnable是没有返回值的;第二个提交Runnable的方法可以同时提供一个结果,在异步任务结束时返回;第三个方法异步任务的最终返回值为null。

我们来看Future接口的定义:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException;
}

get用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待,另一个get方法可以限定阻塞等待的时间,如果超时任务还未结束,会抛出TimeoutException。

cancel用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消, cancel返回false,否则返回true。如果任务还未开始,则不再运行。但如果任务已经在运行,则不一定能取消,参数mayInterruptIfRunning表示,如果任务正在执行,是否调用interrupt方法中断线程,如果为false就不会,如果为true,就会尝试中断线程,但我们从15.4节知道,中断不一定能取消线程。

isDone和isCancelled用于查询任务状态。isCancelled表示任务是否被取消,只要cancel方法返回了true,随后的isCancelled方法都会返回true,即使执行任务的线程还未真正结束。isDone表示任务是否结束,不管什么原因都算,可能是任务正常结束,可能是任务抛出了异常,也可能是任务被取消。

我们再来看下get方法,任务最终大概有三种结果:
1)正常完成,get方法会返回其执行结果,如果任务是Runnable且没有提供结果,返回null。
2)任务执行抛出了异常,get方法会将异常包装为ExecutionException重新抛出,通过异常的getCause方法可以获取原异常。
3)任务被取消了,get方法会抛出异常CancellationException。

如果调用get方法的线程被中断了,get方法会抛出InterruptedException。

Future是一个重要的概念,是实现“任务的提交”与“任务的执行”相分离的关键,是其中的“纽带”,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作

18.1.2 基本用法

说了这么多接口,具体怎么用呢?我们看个简单的例子,如代码清单18-1所示。

代码清单18-1 任务执行服务的基本示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BasicDemo {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sleepSeconds = new Random().nextInt(1000);
Thread.sleep(sleepSeconds);
return sleepSeconds;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new Task());
//模拟执行其他任务
Thread.sleep(100);
try {
System.out.println(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}

我们使用工厂类Executors创建了一个任务执行服务。Executors有多个静态方法,可以用来创建ExecutorService,这里使用的是:

1
public static ExecutorService newSingleThreadExecutor()

表示使用一个线程执行所有服务,后续我们会详细介绍Executors,注意与Executor相区别,后者是单数,是接口。

不管ExecutorService是如何创建的,对使用者而言,用法都一样,例子提交了一个任务,提交后,可以继续执行其他事情,随后可以通过Future获取最终结果或处理任务执行的异常。

最后,我们调用了ExecutorService的shutdown方法,它会关闭任务执行服务。

前面我们只是介绍了ExecutorService的三个submit方法,其实它还有如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

有两个关闭方法:shutdown和shutdownNow。区别是,shutdown表示不再接受新任务,但已提交的任务会继续执行,即使任务还未开始执行;shutdownNow不仅不接受新任务,而且会终止已提交但尚未执行的任务,对于正在执行的任务,一般会调用线程的interrupt方法尝试中断,不过,线程可能不响应中断,shutdownNow会返回已提交但尚未执行的任务列表。

shutdown和shutdownNow不会阻塞等待,它们返回后不代表所有任务都已结束,不过isShutdown方法会返回true。调用者可以通过awaitTermination等待所有任务结束,它可以限定等待的时间,如果超时前所有任务都结束了,即isTerminated方法返回true,则返回true,否则返回false。

ExecutorService有两组批量提交任务的方法:invokeAll和invokeAny,它们都有两个版本,其中一个限定等待时间。

invokeAll等待所有任务完成,返回的Future列表中,每个Future的isDone方法都返回true,不过isDone为true不代表任务就执行成功了,可能是被取消了。invokeAll可以指定等待时间,如果超时后有的任务没完成,就会被取消。

而对于invokeAny,只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消;如果没有任务能在限时内成功返回,抛出TimeoutException;如果限时内所有任务都结束了,但都发生了异常,抛出ExecutionException。

使用ExecutorService,编写并发异步任务的代码就像写顺序程序一样,不用关心线程的创建和协调,只需要提交任务、处理结果就可以了,大大简化了开发工作。

18.1.3 基本实现原理

了解了ExecutorService和Future的基本用法,我们来看下它们的基本实现原理。

ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,关于线程池我们下节再介绍。ExecutorService有一个抽象实现类AbstractExecutorService,本节,我们简要分析其原理,并基于它实现一个简单的ExecutorService。Future的主要实现类是FutureTask,我们也会简要探讨其原理。

1. AbstractExecutorService

AbstractExecutorService提供了submit、invokeAll和invokeAny的默认实现,子类需要实现其他方法。除了execute,其他方法都与执行服务的生命周期管理有关,简化起见,我们忽略其实现,主要考虑execute。submit/invokeAll/invokeAny最终都会调用execute, execute决定了到底如何执行任务,简化起见,我们为每个任务创建一个线程。一个完整的最简单的ExecutorService实现类如代码清单18-2所示。

代码清单18-2 一个简单的ExecutorService实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SimpleExecutorService extends AbstractExecutorService {
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return false;
}
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}

对于前面的例子,创建ExecutorService的代码可以替换为:

1
ExecutorService executor = new SimpleExecutorService();

可以实现相同的效果。

ExecutorService最基本的方法是submit,它是如何实现的呢?我们来看AbstractExecutor-Service的代码(基于Java
7):

1
2
3
4
5
6
public <T> Future<T> submit(Callable<T> task) {
if(task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

它调用newTaskFor生成了一个RunnableFuture, RunnableFuture是一个接口,既扩展了Runnable,又扩展了Future,没有定义新方法,作为Runnable,它表示要执行的任务,传递给execute方法进行执行,作为Future,它又表示任务执行的异步结果。这可能令人混淆,我们来看具体代码:

1
2
3
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

就是创建了一个FutureTask对象,FutureTask实现了RunnableFuture接口。它是怎么实现的呢?我们接下来看(基于Java
7)。

2. FutureTask

它有一个成员变量表示待执行的任务,声明为:

1
private Callable<V> callable;

有个整数变量state表示状态,声明为:

1
private volatile int state;

取值可能为:

1
2
3
4
5
6
7
NEW            = 0; //刚开始的状态,或任务在运行
COMPLETING = 1; //临时状态,任务即将结束,在设置结果
NORMAL = 2; //任务正常执行完成
EXCEPTIONAL = 3; //任务执行抛出异常结束
CANCELLED = 4; //任务被取消
INTERRUPTING = 5; //任务在被中断
INTERRUPTED = 6; //任务被中断

有个变量表示最终的执行结果或异常,声明为:

1
private Object outcome;

有个变量表示运行任务的线程:

1
private volatile Thread runner;

还有个单向链表表示等待任务执行结果的线程:

1
private volatile WaitNode waiters;

FutureTask的构造方法会初始化callable和状态,如果FutureTask接受的是一个Runnable对象,它会调用Executors.callable转换为Callable对象,如下所示:

1
2
3
4
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; //ensure visibility of callable
}

任务执行服务会使用一个线程执行FutureTask的run方法。run方法的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void run() {
if(state ! = NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if(c ! = null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if(ran)
set(result);
}
} finally {
//runner must be non-null until state is settled to
//prevent concurrent calls to run()
runner = null;
//state must be re-read after nulling runner to prevent
//leaked interrupts
int s = state;
if(s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

其基本逻辑是:
1)调用callable的call方法,捕获任何异常;
2)如果正常执行完成,调用set设置结果,保存到outcome;
3)如果执行过程发生异常,调用setException设置异常,异常也是保存到outcome,但状态不一样;
4)set和setException除了设置结果、修改状态外,还会调用finishCompletion,它会唤醒所有等待结果的线程。

对于任务提交者,它通过get方法获取结果,限时get方法的代码为:

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if(unit == null)
throw new NullPointerException();
int s = state;
if(s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

其基本逻辑是:如果任务还未执行完毕,就等待,最后调用report报告结果,report根据状态返回结果或抛出异常,代码为:

1
2
3
4
5
6
7
8
private V report(int s) throws ExecutionException {
Object x = outcome;
if(s == NORMAL)
return (V)x;
if(s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

cancel方法的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean cancel(boolean mayInterruptIfRunning) {
if(state ! = NEW)
return false;
if(mayInterruptIfRunning) {
if(! UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if(t ! = null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
else if(! UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}

其基本逻辑为:

  • 如果任务已结束或取消,返回false;
  • 如果mayInterruptIfRunning为true,调用interrupt中断线程,设置状态为INTERR-UPTED;
  • 如果mayInterruptIfRunning为false,设置状态为CANCELLED;
  • 调用finishCompletion唤醒所有等待结果的线程。

18.1.4 小结

本节介绍了Java并发包中任务执行服务的基本概念和原理,该服务体现了并发异步开发中“关注点分离”的思想,使用者只需要通过ExecutorService提交任务,通过Future操作任务和结果即可,不需要关注线程创建和协调的细节

本节主要介绍了AbstractExecutorService和FutureTask的基本原理,实现了一个最简单的执行服务SimpleExecutorService,对每个任务创建一个单独的线程。实际中,最经常使用的执行服务是基于线程池实现的ThreadPoolExecutor,让我们下一节来探讨。

第17章 并发容器

本章,我们探讨Java并发包中的容器类,具体包括:

  • 写时复制的List和Set;
  • ConcurrentHashMap;
  • 基于SkipList的Map和Set;
  • 各种并发队列。

它们都有什么用?如何使用?与普通容器类相比,有哪些特点?是如何实现的?本章进行详细讨论。

17.1 写时复制的List和Set

本节先介绍两个简单的类:CopyOnWriteArrayList和CopyOnWriteArraySet,讨论它们的用法和实现原理。它们的用法比较简单,我们需要理解的是它们的实现机制。Copy-On-Write即写时复制,或称写时拷贝,这是解决并发问题的一种重要思路。

17.1.1 CopyOnWriteArrayList

CopyOnWriteArrayList实现了List接口,它的用法与其他List(如ArrayList)基本是一样的。CopyOnWriteArrayList的特点如下:

  • 它是线程安全的,可以被多个线程并发访问;
  • 它的迭代器不支持修改操作,但也不会抛出ConcurrentModificationException;
  • 它以原子方式支持一些复合操作。

我们在15.2.3节提到过基于synchronized的同步容器的几个问题。迭代时,需要对整个列表对象加锁,否则会抛出ConcurrentModificationException,CopyOnWriteArrayList没有这个问题,迭代时不需要加锁。

基于synchronized的同步容器的另一个问题是复合操作,比如先检查再更新,也需要调用方加锁,而CopyOnWriteArrayList直接支持两个原子方法:

1
2
3
4
//不存在才添加,如果添加了,返回true,否则返回false
public boolean addIfAbsent(E e)
//批量添加c中的非重复元素,不存在才添加,返回实际添加的个数
public int addAllAbsent(Collection<? extends E> c)

CopyOnWriteArrayList的内部也是一个数组,但这个数组是以原子方式被整体更新的。每次修改操作,都会新建一个数组,复制原数组的内容到新数组,在新数组上进行需要的修改,然后以原子方式设置内部的数组引用,这就是写时复制。

所有的读操作,都是先拿到当前引用的数组,然后直接访问该数组。在读的过程中,可能内部的数组引用已经被修改了,但不会影响读操作,它依旧访问原数组内容。

换句话说,数组内容是只读的,写操作都是通过新建数组,然后原子性地修改数组引用来实现的。下面我们通过代码具体介绍(基于Java 7),包括内部组成、构造方法、add方法和indexOf方法。

内部数组声明为:

1
private volatile transient Object[] array;

注意:它声明为了volatile,这是必需的,以保证内存可见性,即保证在写操作更改之后读操作能看到。有两个方法用来访问/设置该数组:

1
2
3
4
5
6
final Object[] getArray() {
return array;
}
final void setArray(Object[] a) {
array = a;
}

在CopyOnWriteArrayList中,读不需要锁,可以并行,读和写也可以并行,但多个线程不能同时写,每个写操作都需要先获取锁。CopyOnWriteArrayList内部使用Reentrant-Lock,成员声明为:

1
transient final ReentrantLock lock = new ReentrantLock();

默认构造方法为:

1
2
3
public CopyOnWriteArrayList() {
setArray(new Object[^0]);
}

上述代码就是设置了一个空数组。

add方法的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

上述代码也容易理解,add方法是修改操作,整个过程需要被锁保护,先获取当前数组elements,然后复制出一个长度加1的新数组newElements,在新数组中添加元素,最后调用setArray原子性地修改内部数组引用。

查找元素indexOf的代码为:

1
2
3
4
public int indexOf(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length);
}

先获取当前数组elements,然后调用另一个indexOf进行查找,具体代码就不列举了。这个indexOf方法访问的所有数据都是通过参数传递进来的,数组内容也不会被修改,不存在并发问题。

每次修改都要创建一个新数组,然后复制所有内容,这听上去是一个难以令人接受的方案,如果数组比较大,修改操作又比较频繁,可以想象,CopyOnWriteArrayList的性能是很低的。事实确实如此,CopyOnWriteArrayList不适用于数组很大且修改频繁的场景。它是以优化读操作为目标的,读不需要同步,性能很高,但在优化读的同时牺牲了写的性能。

之前我们介绍了保证线程安全的两种思路:一种是锁,使用synchronized或Reentrant-Lock;另外一种是循环CAS,写时复制体现了保证线程安全的另一种思路。锁和循环CAS都是控制对同一个资源的访问冲突,而写时复制通过复制资源减少冲突。对于绝大部分访问都是读,且有大量并发线程要求读,只有个别线程进行写,且只是偶尔写的场合,写时复制就是一种很好的解决方案。

写时复制是一种重要的思维,用于各种计算机程序中,比如操作系统内部的进程管理和内存管理。在进程管理中,子进程经常共享父进程的资源,只有在写时才复制。在内存管理中,当多个程序同时访问同一个文件时,操作系统在内存中可能只会加载一份,只有程序要写时才会复制,分配自己的内存,复制可能也不会全部复制,只会复制写的位置所在的^1

17.1.2 CopyOnWriteArraySet

CopyOnWriteArraySet实现了Set接口,不包含重复元素,使用比较简单,我们就不赘述了。下面,主要介绍其内部组成,以及add与contains方法的代码。CopyOnWriteArraySet内部是通过CopyOnWriteArrayList实现的,其成员声明为:

1
private final CopyOnWriteArrayList<E> al;

在构造方法中被初始化,如:

1
2
3
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}

其add方法代码为:

1
2
3
public boolean add(E e) {
return al.addIfAbsent(e);
}

add方法就是调用了CopyOnWriteArrayList的addIfAbsent方法。

contains方法代码为:

1
2
3
public boolean contains(Object o) {
return al.contains(o);
}

由于CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的,所以与之前介绍过的Set的实现类如HashSet/TreeSet相比,它的性能比较低,不适用于元素个数特别多的集合。如果元素个数比较多,可以考虑ConcurrentHashMap或ConcurrentSkipListSet这两个类,我们稍后介绍。

简单总结下,CopyOnWriteArrayList和CopyOnWriteArraySet适用于读远多于写、集合不太大的场合,它们采用了写时复制,这是计算机程序中一种重要的思维和技术。

第16章 并发包的基石

15章介绍了线程的基本内容,在Java中还有一套并发工具包,位于包java.util.concurrent下,里面包括很多易用且高性能的并发开发工具。从本章开始,我们就来探讨Java并发工具包。

本章主要介绍并发包的一些基础内容,分为3个小节:16.1节介绍最基本的原子变量及其背后的原理和思维;16.2节介绍可以替代synchronized的显式锁;16.3节介绍可以替代wait/notify的显式条件。

16.1 原子变量和CAS

什么是原子变量?为什么需要它们呢?我们从synchronized说起。在15.2节,我们介绍过Counter类,使用synchronized关键字保证原子更新操作,代码如下:

1
2
3
4
5
6
7
8
9
public class Counter {
private int count;
public synchronized void incr(){
count ++;
}
public synchronized int getCount() {
return count;
}
}

对于count++这种操作来说,使用synchronized成本太高了,需要先获取锁,最后需要释放锁,获取不到锁的情况下需要等待,还会有线程的上下文切换,这些都需要成本。

对于这种情况,完全可以使用原子变量代替,Java并发包中的基本原子变量类型有以下几种。

  • AtomicBoolean:原子Boolean类型,常用来在程序中表示一个标志位。
  • AtomicInteger:原子Integer类型。
  • AtomicLong:原子Long类型,常用来在程序中生成唯一序列号。
  • AtomicReference:原子引用类型,用来以原子方式更新复杂类型。

限于篇幅,我们主要介绍AtomicInteger。除了这4个类,还有一些其他类,如针对数组类型的类AtomicLongArray、AtomicReferenceArray,以及用于以原子方式更新对象中的字段的类,如AtomicIntegerFieldUpdater、AtomicReferenceFieldUpdater等。Java 8增加了几个类,在高并发统计汇总的场景中更为适合,包括LongAdder、LongAccumulator、Double-Adder和DoubleAccumulator,具体可参见API文档,我们就不介绍了。

16.1.1 AtomicInteger

我们先介绍AtomicInteger的基本用法,然后介绍它的基本原理和逻辑,以及应用。

1.基本用法

AtomicInteger有两个构造方法:

1
2
public AtomicInteger(int initialValue)
public AtomicInteger()

第一个构造方法给定了一个初始值,第二个构造方法的初始值为0。

可以直接获取或设置AtomicInteger中的值,方法是:

1
2
public final int get()
public final void set(int newValue)

之所以称为原子变量,是因为它包含一些以原子方式实现组合操作的方法,部分方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//以原子方式获取旧值并设置新值
public final int getAndSet(int newValue)
//以原子方式获取旧值并给当前值加1
public final int getAndIncrement()
//以原子方式获取旧值并给当前值减1
public final int getAndDecrement()
//以原子方式获取旧值并给当前值加delta
public final int getAndAdd(int delta)
//以原子方式给当前值加1并获取新值
public final int incrementAndGet()
//以原子方式给当前值减1并获取新值
public final int decrementAndGet()
//以原子方式给当前值加delta并获取新值
public final int addAndGet(int delta)

这些方法的实现都依赖另一个public方法:

1
public final boolean compareAndSet(int expect, int update)

compareAndSet是一个非常重要的方法,比较并设置,我们以后将简称为CAS。该方法有两个参数expect和update,以原子方式实现了如下功能:如果当前值等于expect,则更新为update,否则不更新,如果更新成功,返回true,否则返回false。

AtomicInteger可以在程序中用作一个计数器,多个线程并发更新,也总能实现正确性。我们看个例子,如代码清单16-1所示。

代码清单16-1 AtomicInteger的应用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class AtomicIntegerDemo {
private static AtomicInteger counter = new AtomicInteger(0);
static class Visitor extends Thread {
@Override
public void run() {
for(int i = 0; i < 1000; i++) {
counter.incrementAndGet();
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 1000;
Thread[] threads = new Thread[num];
for(int i = 0; i < num; i++) {
threads[i] = new Visitor();
threads[i].start();
}
for(int i = 0; i < num; i++) {
threads[i].join();
}
System.out.println(counter.get());
}
}

程序的输出总是正确的,为1000000。

2.基本原理和思维

AtomicInteger的使用方法是简单直接的,它是怎么实现的呢?它的主要内部成员是:

1
private volatile int value;

注意:它的声明带有volatile,这是必需的,以保证内存可见性

它的大部分更新方法实现都类似,我们看一个方法incrementAndGet,其代码为:

1
2
3
4
5
6
7
8
public final int incrementAndGet() {
for(; ; ) {
int current = get();
int next = current + 1;
if(compareAndSet(current, next))
return next;
}
}

代码主体是个死循环,先获取当前值current,计算期望的值next,然后调用CAS方法进行更新,如果更新没有成功,说明value被别的线程改了,则再去取最新值并尝试更新直到成功为止。

与synchronized锁相比,这种原子更新方式代表一种不同的思维方式。synchronized是悲观的,它假定更新很可能冲突,所以先获取锁,得到锁后才更新。原子变量的更新逻辑是乐观的,它假定冲突比较少,但使用CAS更新,也就是进行冲突检测,如果确实冲突了,那也没关系,继续尝试就好了。synchronized代表一种阻塞式算法,得不到锁的时候,进入锁等待队列,等待其他线程唤醒,有上下文切换开销。原子变量的更新逻辑是非阻塞式的,更新冲突的时候,它就重试,不会阻塞,不会有上下文切换开销。对于大部分比较简单的操作,无论是在低并发还是高并发情况下,这种乐观非阻塞方式的性能都远高于悲观阻塞式方式。

原子变量相对比较简单,但对于复杂一些的数据结构和算法,非阻塞方式往往难于实现和理解,幸运的是,Java并发包中已经提供了一些非阻塞容器,我们只需要会使用就可以了,比如:

  • ConcurrentLinkedQueue和ConcurrentLinkedDeque:非阻塞并发队列。
  • ConcurrentSkipListMap和ConcurrentSkipListSet:非阻塞并发Map和Set。

这些容器我们在后续章节介绍。

但compareAndSet是怎么实现的呢?我们看代码:

1
2
3
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

它调用了unsafe的compareAndSwapInt方法,unsafe是什么呢?它的类型为sun.misc. Unsafe,定义为:

1
private static final Unsafe unsafe = Unsafe.getUnsafe();

它是Sun的私有实现,从名字看,表示的也是“不安全”,一般应用程序不应该直接使用。原理上,一般的计算机系统都在硬件层次上直接支持CAS指令,而Java的实现都会利用这些特殊指令。从程序的角度看,可以将compareAndSet视为计算机的基本操作,直接接纳就好。

3.实现锁

基于CAS,除了可以实现乐观非阻塞算法之外,还可以实现悲观阻塞式算法,比如锁。实际上,Java并发包中的所有阻塞式工具、容器、算法也都是基于CAS的(不过,也需要一些别的支持)。怎么实现锁呢?我们演示一个简单的例子,用AtomicInteger实现一个锁MyLock,如代码清单16-2所示。

代码清单16-2 使用AtomicInteger实现锁MyLock
1
2
3
4
5
6
7
8
9
10
11
public class MyLock {
private AtomicInteger status = new AtomicInteger(0);
public void lock() {
while(! status.compareAndSet(0, 1)) {
Thread.yield();
}
}
public void unlock() {
status.compareAndSet(1, 0);
}
}

在MyLock中,使用status表示锁的状态,0表示未锁定,1表示锁定,lock()、unlock()使用CAS方法更新,lock()只有在更新成功后才退出,实现了阻塞的效果,不过一般而言,这种阻塞方式过于消耗CPU,我们后续章节介绍更为高效的方式。MyLock只是用于演示基本概念,实际开发中应该使用Java并发包中的类,如ReentrantLock。

16.1.2 ABA问题

使用CAS方式更新有一个ABA问题。该问题是指,假设当前值为A,如果另一个线程先将A修改成B,再修改回成A,当前线程的CAS操作无法分辨当前值发生过变化。

ABA是不是一个问题与程序的逻辑有关,一般不是问题。而如果确实有问题,解决方法是使用AtomicStampedReference,在修改值的同时附加一个时间戳,只有值和时间戳都相同才进行修改,其CAS方法声明为:

1
2
public boolean compareAndSet(
V expectedReference, V newReference, int expectedStamp, int newStamp)

比如:

1
2
3
4
5
6
Pair pair = new Pair(100, 200);
int stamp = 1;
AtomicStampedReference<Pair> pairRef = new
AtomicStampedReference<Pair>(pair, stamp);
int newStamp = 2;
pairRef.compareAndSet(pair, new Pair(200, 200), stamp, newStamp);

AtomicStampedReference在compareAndSet中要同时修改两个值:一个是引用,另一个是时间戳。它怎么实现原子性呢?实际上,内部AtomicStampedReference会将两个值组合为一个对象,修改的是一个值,我们看代码:

1
2
3
4
5
6
7
8
public boolean compareAndSet(V expectedReference, V newReference,int expectedStamp, int newStamp) {
Pair<V> current = pair;
return expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

这个Pair是AtomicStampedReference的一个内部类,成员包括引用和时间戳,具体定义为:

1
2
3
4
5
6
7
8
9
10
11
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

AtomicStampedReference将对引用值和时间戳的组合比较和修改转换为了对这个内部类Pair单个值的比较和修改。

16.1.3 小结

本节介绍了原子变量的基本用法以及背后的原理CAS,对于并发环境中的计数、产生序列号等需求,应该使用原子变量而非锁,CAS是Java并发包的基础,基于它可以实现高效的、乐观、非阻塞式数据结构和算法,它也是并发包中锁、同步工具和各种容器的基础

第15章 并发基础知识

在之前的章节中,我们都是假设程序中只有一条执行流,程序从main方法的第一条语句逐条执行直到结束。从本章开始,我们讨论并发,在程序中创建线程来启动多条执行流。并发和线程是一个复杂的话题,在本章中,我们讨论关于并发和线程的基础知识,具体来说,分为4个小节:15.1节介绍关于线程的一些基本概念;15.2节介绍线程间安全竞争同一资源的机制:synchronized;15.3节介绍线程间的基本协作机制:wait/notify;15.4节介绍取消/关闭线程的机制:中断。

15.1 线程的基本概念

本节,我们介绍Java中线程的一些基本概念,包括创建线程、线程的基本属性和方法、共享内存及问题、线程的优点及成本。

15.1.1 创建线程

线程表示一条单独的执行流,它有自己的程序执行计数器,有自己的栈。下面,我们通过创建线程来对线程建立一个直观感受。在Java中创建线程有两种方式:一种是继承Thread;另外一种是实现Runnable接口。

1.继承Thread

Java中java.lang.Thread这个类表示线程,一个类可以继承Thread并重写其run方法来实现一个线程,如下所示:

1
2
3
4
5
6
public class HelloThread extends Thread {
@Override
public void run() {
System.out.println("hello");
}
}

HelloThread这个类继承了Thread,并重写了run方法。run方法的方法签名是固定的, public,没有参数,没有返回值,不能抛出受检异常。run方法类似于单线程程序中的main方法,线程从run方法的第一条语句开始执行直到结束。

定义了这个类不代表代码就会开始执行,线程需要被启动,启动需要先创建一个HelloThread对象,然后调用Thread的start方法,如下所示:

1
2
3
4
public static void main(String[] args) {
Thread thread = new HelloThread();
thread.start();
}

我们在main方法中创建了一个线程对象,并调用了其start方法,调用start方法后, HelloThread的run方法就会开始执行,屏幕输出为:

1
hello

为什么调用的是start,执行的却是run方法呢?start表示启动该线程,使其成为一条单独的执行流,操作系统会分配线程相关的资源,每个线程会有单独的程序执行计数器和栈,操作系统会把这个线程作为一个独立的个体进行调度,分配时间片让它执行,执行的起点就是run方法。

如果不调用start,而直接调用run方法呢?屏幕的输出并不会发生变化,但并不会启动一条单独的执行流,run方法的代码依然是在main线程中执行的,run方法只是main方法调用的一个普通方法。怎么确认代码是在哪个线程中执行的呢?Thread有一个静态方法currentThread,返回当前执行的线程对象:

1
public static native Thread currentThread();

每个Thread都有一个id和name:

1
2
public long getId()
public final String getName()

这样,我们就可以判断代码是在哪个线程中执行的。修改HelloThead的run方法:

1
2
3
4
5
@Override
public void run() {
System.out.println("thread name: "+ Thread.currentThread().getName());
System.out.println("hello");
}

如果在main方法中通过start方法启动线程,程序输出为:

1
2
thread name: Thread-0
hello

如果在main方法中直接调用run方法,程序输出为:

1
2
thread name: main
hello

调用start后,就有了两条执行流,新的一条执行run方法,旧的一条继续执行main方法,两条执行流并发执行,操作系统负责调度,在单CPU的机器上,同一时刻只能有一个线程在执行,在多CPU的机器上,同一时刻可以有多个线程同时执行,但操作系统给我们屏蔽了这种差异,给程序员的感觉就是多个线程并发执行,但哪条语句先执行哪条后执行是不一定的。当所有线程都执行完毕的时候,程序退出。

2.实现Runnable接口

通过继承Thread来实现线程虽然比较简单,但Java中只支持单继承,每个类最多只能有一个父类,如果类已经有父类了,就不能再继承Thread,这时,可以通过实现java.lang. Runnable接口来实现线程。Runnable接口的定义很简单,只有一个run方法,如下所示:

1
2
3
public interface Runnable {
public abstract void run();
}

一个类可以实现该接口,并实现run方法,如下所示:

1
2
3
4
5
6
public class HelloRunnable implements Runnable {
@Override
public void run() {
System.out.println("hello");
}
}

仅仅实现Runnable是不够的,要启动线程,还是要创建一个Thread对象,但传递一个Runnable对象,如下所示:

1
2
3
4
public static void main(String[] args) {
Thread helloThread = new Thread(new HelloRunnable());
helloThread.start();
}

无论是通过继承Thead还是实现Runnable接口来创建线程,启动线程都是调用start方法。

15.1.2 线程的基本属性和方法

线程有一些基本属性和方法,包括id、name、优先级、状态、是否daemo线程、sleep方法、yield方法、join方法、过时方法等,我们简要介绍。

1. id和name

前面我们提到,每个线程都有一个id和name。id是一个递增的整数,每创建一个线程就加一。name的默认值是Thread-后跟一个编号,name可以在Thread的构造方法中进行指定,也可以通过setName方法进行设置,给Thread设置一个友好的名字,可以方便调试。

2.优先级

线程有一个优先级的概念,在Java中,优先级从1到10,默认为5,相关方法是:

1
2
public final void setPriority(int newPriority)
public final int getPriority()

这个优先级会被映射到操作系统中线程的优先级,不过,因为操作系统各不相同,不一定都是10个优先级,Java中不同的优先级可能会被映射到操作系统中相同的优先级。另外,优先级对操作系统而言主要是一种建议和提示,而非强制。简单地说,在编程中,不要过于依赖优先级。

3.状态

线程有一个状态的概念,Thread有一个方法用于获取线程的状态:

1
public State getState()

返回值类型为Thread.State,它是一个枚举类型,有如下值:

1
2
3
4
5
6
7
8
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}

关于这些状态,我们简单解释下:
1)NEW:没有调用start的线程状态为NEW。
2)TERMINATED:线程运行结束后状态为TERMINATED。
3)RUNNABLE:调用start后线程在执行run方法且没有阻塞时状态为RUNNABLE,不过,RUNNABLE不代表CPU一定在执行该线程的代码,可能正在执行也可能在等待操作系统分配时间片,只是它没有在等待其他条件。
4)BLOCKED、WAITING、TIMED_WAITING:都表示线程被阻塞了,在等待一些条件,其中的区别我们在后续章节再介绍。

Thread还有一个方法,返回线程是否活着:

1
public final native boolean isAlive()

线程被启动后,run方法运行结束前,返回值都是true。

4.是否daemon线程

Thread有一个是否daemon线程的属性,相关方法是:

1
2
public final void setDaemon(boolean on)
public final boolean isDaemon()

前面我们提到,启动线程会启动一条单独的执行流,整个程序只有在所有线程都结束的时候才退出,但daemon线程是例外,当整个程序中剩下的都是daemon线程的时候,程序就会退出。

daemon线程有什么用呢?它一般是其他线程的辅助线程,在它辅助的主线程退出的时候,它就没有存在的意义了。在我们运行一个即使最简单的”hello world”类型的程序时,实际上,Java也会创建多个线程,除了main线程外,至少还有一个负责垃圾回收的线程,这个线程就是daemon线程,在main线程结束的时候,垃圾回收线程也会退出。

5. sleep方法

Thread有一个静态的sleep方法,调用该方法会让当前线程睡眠指定的时间,单位是毫秒:

1
public static native void sleep(long millis) throws InterruptedException;

睡眠期间,该线程会让出CPU,但睡眠的时间不一定是确切的给定毫秒数,可能有一定的偏差,偏差与系统定时器和操作系统调度器的准确度和精度有关。睡眠期间,线程可以被中断,如果被中断,sleep会抛出InterruptedException,关于中断以及中断处理,我们在15.4节介绍。

6. yield方法

Thread还有一个让出CPU的方法:

1
public static native void yield();

这也是一个静态方法,调用该方法,是告诉操作系统的调度器:我现在不着急占用CPU,你可以先让其他线程运行。不过,这对调度器也仅仅是建议,调度器如何处理是不一定的,它可能完全忽略该调用。

7. join方法

在前面HelloThread的例子中,HelloThread没执行完,main线程可能就执行完了, Thread有一个join方法,可以让调用join的线程等待该线程结束,join方法的声明为:

1
public final void join() throws InterruptedException

在等待线程结束的过程中,这个等待可能被中断,如果被中断,会抛出Interrupted-Exception。

join方法还有一个变体,可以限定等待的最长时间,单位为毫秒,如果为0,表示无期限等待:

1
public final synchronized void join(long millis) throws InterruptedException

在前面HelloThread示例中,如果希望main线程在子线程结束后再退出,main方法可以改为:

1
2
3
4
5
public static void main(String[] args) throws InterruptedException {
Thread thread = new HelloThread();
thread.start();
thread.join();
}

8.过时方法

Thread类中还有一些看上去可以控制线程生命周期的方法,如:

1
2
3
public final void stop()
public final void suspend()
public final void resume()

这些方法因为各种原因已被标记为了过时,我们不应该在程序中使用它们。

15.1.3 共享内存及可能存在的问题

前面我们提到,每个线程表示一条单独的执行流,有自己的程序计数器,有自己的栈,但线程之间可以共享内存,它们可以访问和操作相同的对象。我们看个例子,如代码清单15-1所示。

代码清单15-1 共享内存示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ShareMemoryDemo {
private static int shared = 0;
private static void incrShared(){
shared ++;
}
static class ChildThread extends Thread {
List<String> list;
public ChildThread(List<String> list) {
this.list = list;
}
@Override
public void run() {
incrShared();
list.add(Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
List<String> list = new ArrayList<String>();
Thread t1 = new ChildThread(list);
Thread t2 = new ChildThread(list);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(shared);
System.out.println(list);
}
}

在代码中,定义了一个静态变量shared和静态内部类ChildThread,在main方法中,创建并启动了两个ChildThread对象,传递了相同的list对象,ChildThread的run方法访问了共享的变量shared和list, main方法最后输出了共享的shared和list的值,大部分情况下,会输出期望的值:

1
2
2
[Thread-0, Thread-1]

通过这个例子,我们想强调说明执行流、内存和程序代码之间的关系。

1)该例中有三条执行流,一条执行main方法,另外两条执行ChildThread的run方法。
2)不同执行流可以访问和操作相同的变量,如本例中的shared和list变量。
3)不同执行流可以执行相同的程序代码,如本例中incrShared方法,ChildThread的run方法,被两条ChildThread执行流执行,incrShared方法是在外部定义的,但被ChildThread的执行流执行。在分析代码执行过程时,理解代码在被哪个线程执行是很重要的
4)当多条执行流执行相同的程序代码时,每条执行流都有单独的栈,方法中的参数和局部变量都有自己的一份。

当多条执行流可以操作相同的变量时,可能会出现一些意料之外的结果,包括竞态条件和内存可见性问题,我们来看下。

1.竞态条件

所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确。我们看一个例子,如代码清单15-2所示。

代码清单15-2 竞态条件示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CounterThread extends Thread {
private static int counter = 0;
@Override
public void run() {
for(int i = 0; i < 1000; i++) {
counter++;
}
}
public static void main(String[] args) throws InterruptedException {
int num = 1000;
Thread[] threads = new Thread[num];
for(int i = 0; i < num; i++) {
threads[i] = new CounterThread();
threads[i].start();
}
for(int i = 0; i < num; i++) {
threads[i].join();
}
System.out.println(counter);
}
}

这段代码容易理解,有一个共享静态变量counter,初始值为0,在main方法中创建了1000个线程,每个线程对counter循环加1000次,main线程等待所有线程结束后输出counter的值。

期望的结果是100万,但实际执行,发现每次输出的结果都不一样,一般都不是100万,经常是99万多。为什么会这样呢?因为counter++这个操作不是原子操作,它分为三个步骤:
1)取counter的当前值;
2)在当前值基础上加1;
3)将新值重新赋值给counter。

两个线程可能同时执行第一步,取到了相同的counter值,比如都取到了100,第一个线程执行完后counter变为101,而第二个线程执行完后还是101,最终的结果就与期望不符。

怎么解决这个问题呢?有多种方法:

  • 使用synchronized关键字;
  • 使用显式锁;
  • 使用原子变量。

关于这些方法,我们在后续章节会逐步介绍。

2.内存可见性

多个线程可以共享访问和操作相同的变量,但一个线程对一个共享变量的修改,另一个线程不一定马上就能看到,甚至永远也看不到。这可能有悖直觉,我们来看一个例子,如代码清单15-3所示。

代码清单15-3 内存可见性示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class VisibilityDemo {
private static boolean shutdown = false;
static class HelloThread extends Thread {
@Override
public void run() {
while(! shutdown){
// do nothing
}
System.out.println("exit hello");
}
}
public static void main(String[] args) throws InterruptedException {
new HelloThread().start();
Thread.sleep(1000);
shutdown = true;
System.out.println("exit main");
}
}

在这个程序中,有一个共享的boolean变量shutdown,初始为false, HelloThread在shutdown不为true的情况下一直死循环,当shutdown为true时退出并输出”exit hello”, main线程启动HelloThread后休息了一会儿,然后设置shutdown为true,最后输出”exit main”。

期望的结果是两个线程都退出,但实际执行时,很可能会发现HelloThread永远都不会退出,也就是说,在HelloThread执行流看来,shutdown永远为false,即使main线程已经更改为了true。

这是怎么回事呢?这就是内存可见性问题。在计算机系统中,除了内存,数据还会被缓存在CPU的寄存器以及各级缓存中,当访问一个变量时,可能直接从寄存器或CPU缓存中获取,而不一定到内存中去取,当修改一个变量时,也可能是先写到缓存中,稍后才会同步更新到内存中。在单线程的程序中,这一般不是问题,但在多线程的程序中,尤其是在有多CPU的情况下,这就是严重的问题。一个线程对内存的修改,另一个线程看不到,一是修改没有及时同步到内存,二是另一个线程根本就没从内存读。

怎么解决这个问题呢?有多种方法:

  • 使用volatile关键字。
  • 使用synchronized关键字或显式锁同步。

关于这些方法,我们在后续章节会逐步介绍。

15.1.4 线程的优点及成本

为什么要创建单独的执行流?或者说线程有什么优点呢?至少有以下几点:
1)充分利用多CPU的计算能力,单线程只能利用一个CPU,使用多线程可以利用多CPU的计算能力。
2)充分利用硬件资源,CPU和硬盘、网络是可以同时工作的,一个线程在等待网络IO的同时,另一个线程完全可以利用CPU,对于多个独立的网络请求,完全可以使用多个线程同时请求。
3)在用户界面(GUI)应用程序中,保持程序的响应性,界面和后台任务通常是不同的线程,否则,如果所有事情都是一个线程来执行,当执行一个很慢的任务时,整个界面将停止响应,也无法取消该任务。
4)简化建模及IO处理,比如,在服务器应用程序中,对每个用户请求使用一个单独的线程进行处理,相比使用一个线程,处理来自各种用户的各种请求,以及各种网络和文件IO事件,建模和编写程序要容易得多。

关于线程,我们需要知道,它是有成本的。创建线程需要消耗操作系统的资源,操作系统会为每个线程创建必要的数据结构、栈、程序计数器等,创建也需要一定的时间。

此外,线程调度和切换也是有成本的,当有大量可运行线程的时候,操作系统会忙于调度,为一个线程分配一段时间,执行完后,再让另一个线程执行,一个线程被切换出去后,操作系统需要保存它的当前上下文状态到内存,上下文状态包括当前CPU寄存器的值、程序计数器的值等,而一个线程被切换回来后,操作系统需要恢复它原来的上下文状态,整个过程称为上下文切换,这个切换不仅耗时,而且使CPU中的很多缓存失效。

当然,这些成本是相对而言的,如果线程中实际执行的事情比较多,这些成本是可以接受的;但如果只是执行本节示例中的counter++,那相对成本就太高了。

另外,如果执行的任务都是CPU密集型的,即主要消耗的都是CPU,那创建超过CPU数量的线程就是没有必要的,并不会加快程序的执行。