19.3 倒计时门栓CountDownLatch
我们在15.3.5节使用wait/notify实现了一个简单的门栓MyLatch,我们提到,Java并发包中已经提供了类似工具,就是CountDownLatch。它相当于是一个门栓,一开始是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为0后,门栓打开,等待的所有线程都可以通过,它是一次性的,打开后就不能再关上了。
CountDownLatch里有一个计数,这个计数通过构造方法进行传递:
1
| public CountDownLatch(int count)
|
多个线程可以基于这个计数进行协作,它的主要方法有:
1 2 3
| public void await() throws InterruptedException public boolean await(long timeout, TimeUnit unit) throws InterruptedException public void countDown()
|
await检查计数是否为0,如果大于0,就等待,await可以被中断,也可以设置最长等待时间。countDown检查计数,如果已经为0,直接返回,否则减少计数,如果新的计数变为0,则唤醒所有等待的线程。
之前,我们介绍了门栓的两种应用场景:一种是同时开始,另一种是主从协作。它们都有两类线程,互相需要同步,我们使用CountDownLatch重新演示。
在同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用await,主线程调用countDown,如代码清单19-3所示。
代码清单19-3 使用CountDownLatch实现同时开始场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class RacerWithCountDownLatch { static class Racer extends Thread { CountDownLatch latch; public Racer(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { this.latch.await(); System.out.println(getName() + " start run "+System.currentTimeMillis()); } catch (InterruptedException e) { } } } public static void main(String[] args) throws InterruptedException { int num = 10; CountDownLatch latch = new CountDownLatch(1); Thread[] racers = new Thread[num]; for(int i = 0; i < num; i++) { racers[i] = new Racer(latch); racers[i].start(); } Thread.sleep(1000); latch.countDown(); } }
|
代码比较简单,就不赘述了。在主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用count-Down,主线程调用await进行等待,如代码清单19-4所示。
代码清单19-4 使用CountDownLatch实现主从协作场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class MasterWorkerDemo { static class Worker extends Thread { CountDownLatch latch; public Worker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { Thread.sleep((int) (Math.random() * 1000)); if(Math.random() < 0.02) { throw new RuntimeException("bad luck"); } } catch (InterruptedException e) { } finally { this.latch.countDown(); } } } public static void main(String[] args) throws InterruptedException { int workerNum = 100; CountDownLatch latch = new CountDownLatch(workerNum); Worker[] workers = new Worker[workerNum]; for(int i = 0; i < workerNum; i++) { workers[i] = new Worker(latch); workers[i].start(); } latch.await(); System.out.println("collect worker results"); } }
|
需要强调的是,在这里,countDown的调用应该放到finally语句中,确保在工作线程发生异常的情况下也会被调用,使主线程能够从await调用中返回。