16.8.2 Java8增强的ForkJoinPool

16.8.2 Java8增强的ForkJoinPool

现在计算机大多已向多CPU方向发展,即使普通PC,甚至小型智能设备(如手机)、多核处理器也已被广泛应用。在未来的日子里,处理器的核心数将会发展到更多。
虽然硬件上的多核CPU已经十分成熟,但很多应用程序并未为这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。
为了充分利用多CPU、多核CPU的性能优势,计算机软件系统应该可以充分“挖掘”每个CPU的计算能力,绝不能让某个CPU处于“空闲”状态。为了充分利用多CPU、多核CPU的优势,可以考虑把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

Java7提供的ForkJoinPool

Java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果
ForkJoinPoolExecutorService的实现类,因此是一种特殊的线程池

ForkJoinPool构造器

ForkJoinPool提供了如下两个常用的构造器。

方法 描述
ForkJoinPool(int parallelism) 创建一个包含parallelism个并行线程的ForkJoinPool
ForkJoinPool() Runtime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool

Java8对ForkJoinPool的扩展

Java8进一步扩展了ForkJoinPool的功能,Java8ForkJoinPool增加了通用池功能。

ForkJoinPool类通过如下两个静态方法提供通用池功能。

方法 描述
static ForkJoinPool commonPool() 该方法返回一个通用池.
通用池的运行状态不会受shutdown()shutdownNow()方法的影响。
当然,如果程序直接执行System.exit(0);来终止虚拟机,
则通用池以及通用池中正在执行的任务都会被自动终止。
static int getCommonPoolParallelism() 返回通用池的并行级别

ForkJoinTask

创建了ForkJoinPool实例之后,就可调用ForkJoinPoolsubmit(ForkJoinTask task)invoke(ForkJoinTask task)方法来执行指定任务了。

  • 其中**ForkJoinTask代表一个可以并行、合并的任务**。
  • 这个ForkJoinTask是一个抽象类,它还有两个抽象子类:RecursiveActionRecursiveTask。其中
    • RecursiveTask代表有返回值的任务,
    • RecursiveAction代表没有返回值的任务

图16.14显示了ForkJoinPoolForkJoinTask等类的类图。
这里有一张图片

无返回值的大任务分解

程序 使用ForkJoinPool将大任务拆分

下面以执行没有返回值的“大任务”(简单地打印0-300的数值)为例,程序将一个“大任务”拆分成多个“小任务”,并将任务交给ForkJoinPool来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.concurrent.*;

// 继承RecursiveAction来实现"可分解"的任务
class PrintTask extends RecursiveAction {
// 每个“小任务”只最多只打印50个数
private static final int THRESHOLD = 50;
private int start;
private int end;

// 打印从start到end的任务
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected void compute() {
// 当end与start之间的差小于THRESHOLD时,开始打印
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + "的i值:" + i);
}
} else {
// 如果当end与start之间的差大于THRESHOLD时,即要打印的数超过50个
// 将大任务分解成两个小任务。
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
// 并行执行两个“小任务”
left.fork();
right.fork();
}
}
}

public class ForkJoinPoolTest {
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool();
// 提交可分解的PrintTask任务
pool.submit(new PrintTask(0, 300));
pool.awaitTermination(2, TimeUnit.SECONDS);
// 关闭线程池
pool.shutdown();
}
}

上面程序中的代码:

1
2
3
4
5
6
7
8
// 如果当end与start之间的差大于THRESHOLD时,即要打印的数超过50个
// 将大任务分解成两个小任务。
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
// 并行执行两个“小任务”
left.fork();
right.fork();

实现了对指定打印任务的分解,分解后的任务分别调用fork()方法开始并行执行运行上面程序,可以看到下所示的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ForkJoinPool-1-worker-1的i值:262
ForkJoinPool-1-worker-3的i值:37
ForkJoinPool-1-worker-4的i值:0
ForkJoinPool-1-worker-2的i值:112
ForkJoinPool-1-worker-4的i值:1
ForkJoinPool-1-worker-3的i值:38
ForkJoinPool-1-worker-1的i值:263
ForkJoinPool-1-worker-3的i值:39
ForkJoinPool-1-worker-4的i值:2
ForkJoinPool-1-worker-2的i值:113
ForkJoinPool-1-worker-4的i值:3
ForkJoinPool-1-worker-3的i值:40
ForkJoinPool-1-worker-1的i值:264
ForkJoinPool-1-worker-3的i值:41
ForkJoinPool-1-worker-4的i值:4
ForkJoinPool-1-worker-2的i值:114
ForkJoinPool-1-worker-4的i值:5
......
ForkJoinPool-1-worker-2的i值:257
ForkJoinPool-1-worker-2的i值:258
ForkJoinPool-1-worker-2的i值:259
ForkJoinPool-1-worker-2的i值:260
ForkJoinPool-1-worker-2的i值:261

从执行结果来看,ForkJoinPool启动了4个线程来执行这个打印任务—这是因为测试计算机的CPU是4核的。不仅如此,读者可以看到程序虽然打印了0-299这300个数字,但并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印到299。

有返回值的大任务分解

上面定义的任务是一个没有返回值的打印任务,如果大任务是有返回值的任务,则可以让任务继承RecursiveTask<T>,其中泛型参数T就代表了该任务的返回值类型。

程序 使用RecursiveTask对数组求和

下面程序示范了使用RecursiveTask对一个长度为100的数组的元素值进行累加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.concurrent.*;
import java.util.*;

// 继承RecursiveTask来实现"可分解"的任务
class CalTask extends RecursiveTask<Integer> {
// 每个“小任务”只最多只累加20个数
private static final int THRESHOLD = 20;
private int arr[];
private int start;
private int end;

// 累加从start到end的数组元素
public CalTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
// 当end与start之间的差小于THRESHOLD时,开始进行实际累加
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else {
// 如果当end与start之间的差大于THRESHOLD时,即要累加的数超过20个时
// 将大任务分解成两个小任务。
int middle = (start + end) / 2;
CalTask left = new CalTask(arr, start, middle);
CalTask right = new CalTask(arr, middle, end);
// 并行执行两个“小任务”
left.fork();
right.fork();
// 把两个“小任务”累加的结果合并起来
return left.join() + right.join(); // ①
}
}
}

public class Sum {
public static void main(String[] args) throws Exception {
int[] arr = new int[100];
Random rand = new Random();
int total = 0;
// 初始化100个数字元素
for (int i = 0, len = arr.length; i < len; i++) {
int tmp = rand.nextInt(20);
// 对数组元素赋值,并将数组元素的值添加到sum总和中。
total += (arr[i] = tmp);
}
System.out.println(total);
// 创建一个通用池
ForkJoinPool pool = ForkJoinPool.commonPool();
// 提交可分解的CalTask任务
Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
System.out.println(future.get());
// 关闭线程池
pool.shutdown();
}
}

上面程序与前一个程序基本相似,同样是将任务进行了分解,并调用分解后的任务的fork()方法使它们并行执行。与前一个程序不同的是,现在任务是带返回值的,因此程序还在①号代码处将两个分解后的“小任务”的返回值进行了合并。
运行上面程序,将可以看到程序通过CalTask计算出来的总和,与初始化数组元素时统计出来的总和总是相等,这表明程序一切正常。