19.4 循环栅栏CyclicBarrier 我们在15.3.7节使用wait/notify实现了一个简单的集合点AssemblePoint,我们提到, Java并发包中已经提供了类似工具,就是CyclicBarrier。它相当于是一个栅栏,所有线程在到达该栅栏后都需要等待其他线程,等所有线程都到达后再一起通过,它是循环的,可以用作重复的同步。
CyclicBarrier特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
与CountDownLatch类似,它也有一个数字,但表示的是参与的线程个数,这个数字通过构造方法进行传递:
1 public CyclicBarrier (int parties)
它还有一个构造方法,接受一个Runnable参数,如下所示:
1 public CyclicBarrier (int parties, Runnable barrierAction)
这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。
CyclicBarrier的主要方法就是await:
1 2 3 public int await () throws InterruptedException, BrokenBarrierExceptionpublic int await (long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
await在等待其他线程到达栅栏,调用await后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。
await可以被中断,可以限定最长等待时间,中断或超时后会抛出异常。需要说明的是异常BrokenBarrierException,它表示栅栏被破坏了,什么意思呢?在CyclicBarrier中,参与的线程是互相影响的,只要其中一个线程在调用await时被中断了,或者超时了,栅栏就会被破坏。此外,如果栅栏动作抛出了异常,栅栏也会被破坏。被破坏后,所有在调用await的线程就会退出,抛出BrokenBarrierException。
我们看一个简单的例子,多个游客线程分别在集合点A和B同步,如代码清单19-5所示。
代码清单19-5 CyclicBarrier应用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public CyclicBarrier (int parties) public CyclicBarrier (int parties, Runnable barrierAction) public int await () throws InterruptedException, BrokenBarrierExceptionpublic int await (long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException public class CyclicBarrierDemo { static class Tourist extends Thread { CyclicBarrier barrier; public Tourist (CyclicBarrier barrier) { this .barrier = barrier; } @Override public void run () { try { Thread.sleep((int ) (Math.random() * 1000 )); barrier.await(); System.out.println(this .getName() + " arrived A " + System.currentTimeMillis()); Thread.sleep((int ) (Math.random() * 1000 )); barrier.await(); System.out.println(this .getName() + " arrived B " + System.currentTimeMillis()); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } } } public static void main (String[] args) { int num = 3 ; Tourist[] threads = new Tourist [num]; CyclicBarrier barrier = new CyclicBarrier (num, new Runnable () { @Override public void run () { System.out.println("all arrived " + System.currentTimeMillis() + " executed by " + Thread.currentThread().getName()); } }); for (int i = 0 ; i < num; i++) { threads[i] = new Tourist (barrier); threads[i].start(); } } }
在笔者的计算机中的一次输出为:
1 2 3 4 5 6 7 8 all arrived 1490053578552 executed by Thread-1 Thread-1 arrived A 1490053578555 Thread-2 arrived A 1490053578555 Thread-0 arrived A 1490053578555 all arrived 1490053578889 executed by Thread-0 Thread-0 arrived B 1490053578890 Thread-2 arrived B 1490053578890 Thread-1 arrived B 1490053578890
多个线程到达A和B的时间是一样的,使用CyclicBarrier,达到了重复同步的目的。
CyclicBarrier与CountDownLatch可能容易混淆,我们强调下它们的区别。 1)CountDownLatch的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为0,负责倒计时和等待倒计时的线程都可以有多个,用于不同角色线程间的同步。 2)CyclicBarrier的参与线程角色是一样的,用于同一角色线程间的协调一致。 3)CountDownLatch是一次性的,而CyclicBarrier是可以重复利用的。