16.8.2 Java8增强的ForkJoinPool
现在计算机大多已向多CPU
方向发展,即使普通PC
,甚至小型智能设备(如手机)、多核处理器也已被广泛应用。在未来的日子里,处理器的核心数将会发展到更多。
虽然硬件上的多核CPU
已经十分成熟,但很多应用程序并未为这种多核CPU
做好准备,因此并不能很好地利用多核CPU
的性能优势。
为了充分利用多CPU
、多核CPU
的性能优势,计算机软件系统应该可以充分“挖掘”每个CPU
的计算能力,绝不能让某个CPU
处于“空闲”状态。为了充分利用多CPU
、多核CPU
的优势,可以考虑把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
Java7提供的ForkJoinPool
Java7
提供了ForkJoinPool
来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。
ForkJoinPool
是ExecutorService
的实现类,因此是一种特殊的线程池。
ForkJoinPool构造器
ForkJoinPool
提供了如下两个常用的构造器。
方法 |
描述 |
ForkJoinPool(int parallelism) |
创建一个包含parallelism 个并行线程的ForkJoinPool 。 |
ForkJoinPool() |
以Runtime.availableProcessors() 方法的返回值作为parallelism 参数来创建ForkJoinPool 。 |
Java8对ForkJoinPool的扩展
Java8
进一步扩展了ForkJoinPool
的功能,Java8
为ForkJoinPool
增加了通用池功能。
ForkJoinPool
类通过如下两个静态方法提供通用池功能。
方法 |
描述 |
static ForkJoinPool commonPool() |
该方法返回一个通用池. 通用池的运行状态不会受shutdown() 或shutdownNow() 方法的影响。 当然,如果程序直接执行System.exit(0); 来终止虚拟机, 则通用池以及通用池中正在执行的任务都会被自动终止。 |
static int getCommonPoolParallelism() |
返回通用池的并行级别 |
ForkJoinTask
创建了ForkJoinPool
实例之后,就可调用ForkJoinPool
的submit(ForkJoinTask task)
或invoke(ForkJoinTask task)
方法来执行指定任务了。
- 其中**
ForkJoinTask
代表一个可以并行、合并的任务**。
- 这个
ForkJoinTask
是一个抽象类,它还有两个抽象子类:RecursiveAction
和RecursiveTask
。其中
RecursiveTask
代表有返回值的任务,
RecursiveAction
代表没有返回值的任务。
图16.14显示了ForkJoinPool
、ForkJoinTask
等类的类图。
无返回值的大任务分解
程序 使用ForkJoinPool将大任务拆分
下面以执行没有返回值的“大任务”(简单地打印0-300的数值)为例,程序将一个“大任务”拆分成多个“小任务”,并将任务交给ForkJoinPool
来执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import java.util.concurrent.*;
class PrintTask extends RecursiveAction { private static final int THRESHOLD = 50; private int start; private int end;
public PrintTask(int start, int end) { this.start = start; this.end = end; }
@Override protected void compute() { if (end - start < THRESHOLD) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "的i值:" + i); } } else { int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); left.fork(); right.fork(); } } }
public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PrintTask(0, 300)); pool.awaitTermination(2, TimeUnit.SECONDS); pool.shutdown(); } }
|
上面程序中的代码:
1 2 3 4 5 6 7 8
|
int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end);
left.fork(); right.fork();
|
实现了对指定打印任务的分解,分解后的任务分别调用fork()
方法开始并行执行运行上面程序,可以看到下所示的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ForkJoinPool-1-worker-1的i值:262 ForkJoinPool-1-worker-3的i值:37 ForkJoinPool-1-worker-4的i值:0 ForkJoinPool-1-worker-2的i值:112 ForkJoinPool-1-worker-4的i值:1 ForkJoinPool-1-worker-3的i值:38 ForkJoinPool-1-worker-1的i值:263 ForkJoinPool-1-worker-3的i值:39 ForkJoinPool-1-worker-4的i值:2 ForkJoinPool-1-worker-2的i值:113 ForkJoinPool-1-worker-4的i值:3 ForkJoinPool-1-worker-3的i值:40 ForkJoinPool-1-worker-1的i值:264 ForkJoinPool-1-worker-3的i值:41 ForkJoinPool-1-worker-4的i值:4 ForkJoinPool-1-worker-2的i值:114 ForkJoinPool-1-worker-4的i值:5 ...... ForkJoinPool-1-worker-2的i值:257 ForkJoinPool-1-worker-2的i值:258 ForkJoinPool-1-worker-2的i值:259 ForkJoinPool-1-worker-2的i值:260 ForkJoinPool-1-worker-2的i值:261
|
从执行结果来看,ForkJoinPool
启动了4个线程来执行这个打印任务—这是因为测试计算机的CPU
是4核的。不仅如此,读者可以看到程序虽然打印了0-299这300个数字,但并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印到299。
有返回值的大任务分解
上面定义的任务是一个没有返回值的打印任务,如果大任务是有返回值的任务,则可以让任务继承RecursiveTask<T>
,其中泛型参数T
就代表了该任务的返回值类型。
程序 使用RecursiveTask
对数组求和
下面程序示范了使用RecursiveTask
对一个长度为100的数组的元素值进行累加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| import java.util.concurrent.*; import java.util.*;
class CalTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 20; private int arr[]; private int start; private int end;
public CalTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; }
@Override protected Integer compute() { int sum = 0; if (end - start < THRESHOLD) { for (int i = start; i < end; i++) { sum += arr[i]; } return sum; } else { int middle = (start + end) / 2; CalTask left = new CalTask(arr, start, middle); CalTask right = new CalTask(arr, middle, end); left.fork(); right.fork(); return left.join() + right.join(); } } }
public class Sum { public static void main(String[] args) throws Exception { int[] arr = new int[100]; Random rand = new Random(); int total = 0; for (int i = 0, len = arr.length; i < len; i++) { int tmp = rand.nextInt(20); total += (arr[i] = tmp); } System.out.println(total); ForkJoinPool pool = ForkJoinPool.commonPool(); Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length)); System.out.println(future.get()); pool.shutdown(); } }
|
上面程序与前一个程序基本相似,同样是将任务进行了分解,并调用分解后的任务的fork()
方法使它们并行执行。与前一个程序不同的是,现在任务是带返回值的,因此程序还在①号代码处将两个分解后的“小任务”的返回值进行了合并。
运行上面程序,将可以看到程序通过CalTask
计算出来的总和,与初始化数组元素时统计出来的总和总是相等,这表明程序一切正常。