19.3 倒计时门栓CountDownLatch

19.3 倒计时门栓CountDownLatch

我们在15.3.5节使用wait/notify实现了一个简单的门栓MyLatch,我们提到,Java并发包中已经提供了类似工具,就是CountDownLatch。它相当于是一个门栓,一开始是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为0后,门栓打开,等待的所有线程都可以通过,它是一次性的,打开后就不能再关上了。

CountDownLatch里有一个计数,这个计数通过构造方法进行传递:

1
public CountDownLatch(int count)

多个线程可以基于这个计数进行协作,它的主要方法有:

1
2
3
public void await() throws InterruptedException
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
public void countDown()

await检查计数是否为0,如果大于0,就等待,await可以被中断,也可以设置最长等待时间。countDown检查计数,如果已经为0,直接返回,否则减少计数,如果新的计数变为0,则唤醒所有等待的线程。

之前,我们介绍了门栓的两种应用场景:一种是同时开始,另一种是主从协作。它们都有两类线程,互相需要同步,我们使用CountDownLatch重新演示。

在同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用await,主线程调用countDown,如代码清单19-3所示。

代码清单19-3 使用CountDownLatch实现同时开始场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RacerWithCountDownLatch {
static class Racer extends Thread {
CountDownLatch latch;
public Racer(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
this.latch.await();
System.out.println(getName()
+ " start run "+System.currentTimeMillis());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
CountDownLatch latch = new CountDownLatch(1);
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}

代码比较简单,就不赘述了。在主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用count-Down,主线程调用await进行等待,如代码清单19-4所示。

代码清单19-4 使用CountDownLatch实现主从协作场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MasterWorkerDemo {
static class Worker extends Thread {
CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
//模拟执行任务
Thread.sleep((int) (Math.random() * 1000));
//模拟异常情况
if(Math.random() < 0.02) {
throw new RuntimeException("bad luck");
}
} catch (InterruptedException e) {
} finally {
this.latch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
CountDownLatch latch = new CountDownLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i = 0; i < workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
}

需要强调的是,在这里,countDown的调用应该放到finally语句中,确保在工作线程发生异常的情况下也会被调用,使主线程能够从await调用中返回。