16.3 显式条件
16.2节我们介绍了显式锁,本节介绍关联的显式条件,介绍其用法和原理。显式条件在不同上下文中也可以被称为条件变量、条件队列、或条件,后文我们可能会交替使用。
16.3.1 用法
锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronized相对应,而显式条件与wait/notify相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。条件与锁相关联,创建条件变量需要通过显式锁,Lock接口定义了创建方法:
1
| Condition newCondition();
|
Condition表示条件变量,是一个接口,它的定义为:
1 2 3 4 5 6 7 8 9
| public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
|
await对应于Object的wait, signal对应于notify, signalAll对应于notifyAll,语义也是一样的。
与Object的wait方法类似,await也有几个限定等待时间的方法,但功能更多一些:
1 2 3 4 5 6
| boolean await(long time, TimeUnit unit) throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
|
这些await方法都是响应中断的,如果发生了中断,会抛出InterruptedException,但中断标志位会被清空。Condition还定义了一个不响应中断的等待方法:
1
| void awaitUninterruptibly();
|
该方法不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置。
一般而言,与Object的wait方法一样,调用await方法前需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。
await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从await方法中退出。
另外,与Object的wait方法一样,await返回后,不代表其等待的条件就一定满足了,通常要将await的调用放到一个循环内,只有条件满足后才退出。
一般而言,signal/signalAll与notify/notifyAll一样,调用它们需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。signal与notify一样,挑选一个线程进行唤醒,signalAll与notifyAll一样,唤醒所有等待的线程,但这些线程被唤醒后都需要重新竞争锁,获取锁后才会从await调用中返回。
ReentrantLock实现了newCondition方法,通过它,我们来看下条件的基本用法。我们实现与15.3节类似的例子WaitThread,一个线程启动后,在执行一项操作前,等待主线程给它指令,收到指令后才执行,示例代码如代码清单16-7所示。
代码清单16-7 使用显式条件进行协作的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class WaitThread extends Thread { private volatile boolean fire = false; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); try { while (! fire) { condition.await(); } } finally { lock.unlock(); } System.out.println("fired"); } catch (InterruptedException e) { Thread.interrupted(); } } public void fire() { lock.lock(); try { this.fire = true; condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { WaitThread waitThread = new WaitThread(); waitThread.start(); Thread.sleep(1000); System.out.println("fire"); waitThread.fire(); } }
|
需要特别注意的是,不要将signal/signalAll与notify/notifyAll混淆,notify/notifyAll是Object中定义的方法,Condition对象也有,稍不注意就会误用。比如,对上面例子中的fire方法,可能会写为:
1 2 3 4 5 6 7 8 9
| public void fire() { lock.lock(); try { this.fire = true; condition.notify(); } finally { lock.unlock(); } }
|
写成这样,编译器不会报错,但运行时会抛出IllegalMonitorStateException,因为notify的调用不在synchronized语句内。同样,避免将锁与synchronized混用,那样非常令人混淆,比如:
1 2 3 4 5 6
| public void fire() { synchronized(lock){ this.fire = true; condition.signal(); } }
|
记住,显式条件与显式锁配合,wait/notify与synchronized配合。
16.3.2 生产者/消费者模式
在15.3节,我们用wait/notify实现了生产者/消费者模式,我们提到了wait/notify的一个局限,它只能有一个条件等待队列,分析等待条件也很复杂。在生产者/消费者模式中,其实有两个条件,一个与队列满有关,一个与队列空有关。使用显式锁,可以创建多个条件等待队列。下面,我们用显式锁/条件重新实现下其中的阻塞队列,如代码清单16-8所示。
代码清单16-8 使用显式锁/条件实现的阻塞队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| static class MyBlockingQueue<E> { private Queue<E> queue = null; private int limit; private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public MyBlockingQueue(int limit) { this.limit = limit; queue = new ArrayDeque<>(limit); } public void put(E e) throws InterruptedException { lock.lockInterruptibly(); try{ while (queue.size() == limit) { notFull.await(); } queue.add(e); notEmpty.signal(); }finally{ lock.unlock(); } } public E take() throws InterruptedException { lock.lockInterruptibly(); try{ while(queue.isEmpty()) { notEmpty.await(); } E e = queue.poll(); notFull.signal(); return e; }finally{ lock.unlock(); } } }
|
上述代码定义了两个等待条件:不满(notFull)、不空(notEmpty)。在put方法中,如果队列满,则在notFull上等待;在take方法中,如果队列空,则在notEmpty上等待。put操作后通知notEmpty, take操作后通知notFull。这样,代码更为清晰易读,同时避免了不必要的唤醒和检查,提高了效率。Java并发包中的类ArrayBlockingQueue就采用了类似的方式实现。
16.3.3 实现原理
理解了显式条件的概念和用法,我们来看下ReentrantLock是如何实现它的,其new-Condition()的代码为:
1 2 3
| public Condition newCondition() { return sync.newCondition(); }
|
sync是ReentrantLock的内部类对象,其newCondition()代码为:
1 2 3
| final ConditionObject newCondition() { return new ConditionObject(); }
|
ConditionObject是AQS中定义的一个内部类,它的实现也比较复杂,我们通过一些主要代码来简要探讨其实现原理。ConditionObject内部也有一个队列,表示条件等待队列,其成员声明为:
1 2 3 4
| private transient Node firstWaiter;
private transient Node lastWaiter;
|
ConditionObject是AQS的成员内部类,它可以直接访问AQS中的数据,比如AQS中定义的锁等待队列。我们看下主要方法的实现。先看await方法,如代码清单16-9所示。我们通过添加注释解释其基本思路。
代码清单16-9 await的实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public final void await() throws InterruptedException { if(Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (! isOnSyncQueue(node)) { LockSupport.park(this); if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; } if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; if(node.nextWaiter ! = null) unlinkCancelledWaiters(); if(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }
|
awaitNanos与await的实现是基本类似的,区别主要是会限定等待的时间,具体就不列举了。
signal方法代码为:
1 2 3 4 5 6 7 8 9
| public final void signal() { if(! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if(first ! = null) doSignal(first); }
|
doSignal的代码就不列举了,其基本逻辑是:
1)将节点从条件等待队列移到锁等待队列;
2)调用LockSupport.unpark将线程唤醒。
16.3.4 小结
本节介绍了显式条件的用法和实现原理。它与显式锁配合使用,与wait/notify相比,可以支持多个条件队列,代码更为易读,效率更高,使用时注意不要将signal/signalAll误写为notify/notifyAll。
至此,关于并发包的基础:原子变量和CAS、显式锁和条件,就介绍完了,基于这些, Java并发包还提供了很多更为易用的高层数据结构、工具和服务,下一章,我们介绍一些并发容器。