18.0 第18章 异步任务执行服务 18.1 基本概念和原理

第18章 异步任务执行服务

在之前的介绍中,线程Thread既表示要执行的任务,又表示执行的机制。Java并发包提供了一套框架,大大简化了执行异步任务所需的开发。这套框架引入了一个“执行服务”的概念,它将“任务的提交”和“任务的执行”相分离,“执行服务”封装了任务执行的细节,对于任务提交者而言,它可以关注于任务本身,如提交任务、获取结果、取消任务,而不需要关注任务执行的细节,如线程创建、任务调度、线程关闭等。

本章我们就来探讨这套框架,具体分为3个小节:18.1节介绍基本概念和原理;18.2节介绍任务执行服务的主要实现机制:线程池;18.3节介绍定时任务的执行服务。

18.1 基本概念和原理

下面,我们来看异步任务执行服务的基本接口、用法和实现原理。

18.1.1 基本接口

首先,我们来看任务执行服务涉及的基本接口:

  • Runnable和Callable:表示要执行的异步任务。
  • Executor和ExecutorService:表示执行服务。
  • Future:表示异步任务的结果。

关于Runnable和Callable,我们在前面章节都已经了解了,都表示任务,Runnable没有返回结果,而Callable有,Runnable不会抛出异常,而Callable会。

Executor表示最简单的执行服务,其定义为:

1
2
3
public interface Executor {
void execute(Runnable command);
}

就是可以执行一个Runnable,没有返回结果。接口没有限定任务如何执行,可能是创建一个新线程,可能是复用线程池中的某个线程,也可能是在调用者线程中执行。

ExecutorService扩展了Executor,定义了更多服务,基本方法有:

1
2
3
4
5
6
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<? > submit(Runnable task);
//... 其他方法
}

这三个submit都表示提交一个任务,返回值类型都是Future,返回后,只是表示任务已提交,不代表已执行,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。我们知道,对于Callable,任务最终有个返回值,而对于Runnable是没有返回值的;第二个提交Runnable的方法可以同时提供一个结果,在异步任务结束时返回;第三个方法异步任务的最终返回值为null。

我们来看Future接口的定义:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException;
}

get用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待,另一个get方法可以限定阻塞等待的时间,如果超时任务还未结束,会抛出TimeoutException。

cancel用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消, cancel返回false,否则返回true。如果任务还未开始,则不再运行。但如果任务已经在运行,则不一定能取消,参数mayInterruptIfRunning表示,如果任务正在执行,是否调用interrupt方法中断线程,如果为false就不会,如果为true,就会尝试中断线程,但我们从15.4节知道,中断不一定能取消线程。

isDone和isCancelled用于查询任务状态。isCancelled表示任务是否被取消,只要cancel方法返回了true,随后的isCancelled方法都会返回true,即使执行任务的线程还未真正结束。isDone表示任务是否结束,不管什么原因都算,可能是任务正常结束,可能是任务抛出了异常,也可能是任务被取消。

我们再来看下get方法,任务最终大概有三种结果:
1)正常完成,get方法会返回其执行结果,如果任务是Runnable且没有提供结果,返回null。
2)任务执行抛出了异常,get方法会将异常包装为ExecutionException重新抛出,通过异常的getCause方法可以获取原异常。
3)任务被取消了,get方法会抛出异常CancellationException。

如果调用get方法的线程被中断了,get方法会抛出InterruptedException。

Future是一个重要的概念,是实现“任务的提交”与“任务的执行”相分离的关键,是其中的“纽带”,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作

18.1.2 基本用法

说了这么多接口,具体怎么用呢?我们看个简单的例子,如代码清单18-1所示。

代码清单18-1 任务执行服务的基本示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BasicDemo {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sleepSeconds = new Random().nextInt(1000);
Thread.sleep(sleepSeconds);
return sleepSeconds;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new Task());
//模拟执行其他任务
Thread.sleep(100);
try {
System.out.println(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}

我们使用工厂类Executors创建了一个任务执行服务。Executors有多个静态方法,可以用来创建ExecutorService,这里使用的是:

1
public static ExecutorService newSingleThreadExecutor()

表示使用一个线程执行所有服务,后续我们会详细介绍Executors,注意与Executor相区别,后者是单数,是接口。

不管ExecutorService是如何创建的,对使用者而言,用法都一样,例子提交了一个任务,提交后,可以继续执行其他事情,随后可以通过Future获取最终结果或处理任务执行的异常。

最后,我们调用了ExecutorService的shutdown方法,它会关闭任务执行服务。

前面我们只是介绍了ExecutorService的三个submit方法,其实它还有如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

有两个关闭方法:shutdown和shutdownNow。区别是,shutdown表示不再接受新任务,但已提交的任务会继续执行,即使任务还未开始执行;shutdownNow不仅不接受新任务,而且会终止已提交但尚未执行的任务,对于正在执行的任务,一般会调用线程的interrupt方法尝试中断,不过,线程可能不响应中断,shutdownNow会返回已提交但尚未执行的任务列表。

shutdown和shutdownNow不会阻塞等待,它们返回后不代表所有任务都已结束,不过isShutdown方法会返回true。调用者可以通过awaitTermination等待所有任务结束,它可以限定等待的时间,如果超时前所有任务都结束了,即isTerminated方法返回true,则返回true,否则返回false。

ExecutorService有两组批量提交任务的方法:invokeAll和invokeAny,它们都有两个版本,其中一个限定等待时间。

invokeAll等待所有任务完成,返回的Future列表中,每个Future的isDone方法都返回true,不过isDone为true不代表任务就执行成功了,可能是被取消了。invokeAll可以指定等待时间,如果超时后有的任务没完成,就会被取消。

而对于invokeAny,只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消;如果没有任务能在限时内成功返回,抛出TimeoutException;如果限时内所有任务都结束了,但都发生了异常,抛出ExecutionException。

使用ExecutorService,编写并发异步任务的代码就像写顺序程序一样,不用关心线程的创建和协调,只需要提交任务、处理结果就可以了,大大简化了开发工作。

18.1.3 基本实现原理

了解了ExecutorService和Future的基本用法,我们来看下它们的基本实现原理。

ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,关于线程池我们下节再介绍。ExecutorService有一个抽象实现类AbstractExecutorService,本节,我们简要分析其原理,并基于它实现一个简单的ExecutorService。Future的主要实现类是FutureTask,我们也会简要探讨其原理。

1. AbstractExecutorService

AbstractExecutorService提供了submit、invokeAll和invokeAny的默认实现,子类需要实现其他方法。除了execute,其他方法都与执行服务的生命周期管理有关,简化起见,我们忽略其实现,主要考虑execute。submit/invokeAll/invokeAny最终都会调用execute, execute决定了到底如何执行任务,简化起见,我们为每个任务创建一个线程。一个完整的最简单的ExecutorService实现类如代码清单18-2所示。

代码清单18-2 一个简单的ExecutorService实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SimpleExecutorService extends AbstractExecutorService {
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return false;
}
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}

对于前面的例子,创建ExecutorService的代码可以替换为:

1
ExecutorService executor = new SimpleExecutorService();

可以实现相同的效果。

ExecutorService最基本的方法是submit,它是如何实现的呢?我们来看AbstractExecutor-Service的代码(基于Java
7):

1
2
3
4
5
6
public <T> Future<T> submit(Callable<T> task) {
if(task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

它调用newTaskFor生成了一个RunnableFuture, RunnableFuture是一个接口,既扩展了Runnable,又扩展了Future,没有定义新方法,作为Runnable,它表示要执行的任务,传递给execute方法进行执行,作为Future,它又表示任务执行的异步结果。这可能令人混淆,我们来看具体代码:

1
2
3
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

就是创建了一个FutureTask对象,FutureTask实现了RunnableFuture接口。它是怎么实现的呢?我们接下来看(基于Java
7)。

2. FutureTask

它有一个成员变量表示待执行的任务,声明为:

1
private Callable<V> callable;

有个整数变量state表示状态,声明为:

1
private volatile int state;

取值可能为:

1
2
3
4
5
6
7
NEW            = 0; //刚开始的状态,或任务在运行
COMPLETING = 1; //临时状态,任务即将结束,在设置结果
NORMAL = 2; //任务正常执行完成
EXCEPTIONAL = 3; //任务执行抛出异常结束
CANCELLED = 4; //任务被取消
INTERRUPTING = 5; //任务在被中断
INTERRUPTED = 6; //任务被中断

有个变量表示最终的执行结果或异常,声明为:

1
private Object outcome;

有个变量表示运行任务的线程:

1
private volatile Thread runner;

还有个单向链表表示等待任务执行结果的线程:

1
private volatile WaitNode waiters;

FutureTask的构造方法会初始化callable和状态,如果FutureTask接受的是一个Runnable对象,它会调用Executors.callable转换为Callable对象,如下所示:

1
2
3
4
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; //ensure visibility of callable
}

任务执行服务会使用一个线程执行FutureTask的run方法。run方法的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void run() {
if(state ! = NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if(c ! = null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if(ran)
set(result);
}
} finally {
//runner must be non-null until state is settled to
//prevent concurrent calls to run()
runner = null;
//state must be re-read after nulling runner to prevent
//leaked interrupts
int s = state;
if(s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

其基本逻辑是:
1)调用callable的call方法,捕获任何异常;
2)如果正常执行完成,调用set设置结果,保存到outcome;
3)如果执行过程发生异常,调用setException设置异常,异常也是保存到outcome,但状态不一样;
4)set和setException除了设置结果、修改状态外,还会调用finishCompletion,它会唤醒所有等待结果的线程。

对于任务提交者,它通过get方法获取结果,限时get方法的代码为:

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if(unit == null)
throw new NullPointerException();
int s = state;
if(s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

其基本逻辑是:如果任务还未执行完毕,就等待,最后调用report报告结果,report根据状态返回结果或抛出异常,代码为:

1
2
3
4
5
6
7
8
private V report(int s) throws ExecutionException {
Object x = outcome;
if(s == NORMAL)
return (V)x;
if(s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

cancel方法的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean cancel(boolean mayInterruptIfRunning) {
if(state ! = NEW)
return false;
if(mayInterruptIfRunning) {
if(! UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if(t ! = null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
else if(! UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}

其基本逻辑为:

  • 如果任务已结束或取消,返回false;
  • 如果mayInterruptIfRunning为true,调用interrupt中断线程,设置状态为INTERR-UPTED;
  • 如果mayInterruptIfRunning为false,设置状态为CANCELLED;
  • 调用finishCompletion唤醒所有等待结果的线程。

18.1.4 小结

本节介绍了Java并发包中任务执行服务的基本概念和原理,该服务体现了并发异步开发中“关注点分离”的思想,使用者只需要通过ExecutorService提交任务,通过Future操作任务和结果即可,不需要关注线程创建和协调的细节

本节主要介绍了AbstractExecutorService和FutureTask的基本原理,实现了一个最简单的执行服务SimpleExecutorService,对每个任务创建一个单独的线程。实际中,最经常使用的执行服务是基于线程池实现的ThreadPoolExecutor,让我们下一节来探讨。