16.2 显式锁 15.2节介绍了利用synchronized实现锁,我们提到了synchronized的一些局限性,本节探讨Java并发包中的显式锁,它可以解决synchronized的限制。
Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类有:
锁接口Lock,主要实现类是ReentrantLock;
读写锁接口ReadWriteLock,主要实现类是ReentrantReadWriteLock。
本节主要介绍接口Lock和实现类ReentrantLock,关于读写锁,我们后续章节介绍。
16.2.1 接口Lock 显式锁接口Lock的定义为:
1 2 3 4 5 6 7 8 public interface Lock { void lock () ; void lockInterruptibly () throws InterruptedException; boolean tryLock () ; boolean tryLock (long time, TimeUnit unit) throws InterruptedException; void unlock () ; Condition newCondition () ; }
下面解释一下。
1)lock()/unlock():就是普通的获取锁和释放锁方法,lock()会阻塞直到成功。 2)lockInterruptibly():与lock()的不同是,它可以响应中断,如果被其他线程中断了,则抛出InterruptedException。 3)tryLock():只是尝试获取锁,立即返回,不阻塞,如果获取成功,返回true,否则返回false。 4)tryLock(long time, TimeUnit unit):先尝试获取锁,如果能成功则立即返回true,否则阻塞等待,但等待的最长时间由指定的参数设置,在等待的同时响应中断,如果发生了中断,抛出InterruptedException,如果在等待的时间内获得了锁,返回true,否则返回false。 5)newCondition:新建一个条件,一个Lock可以关联多个条件,关于条件,我们留待16.3节介绍。
可以看出,相比synchronized,显式锁支持以非阻塞方式获取锁、可以响应中断、可以限时 ,这使得它灵活得多。
16.2.2 可重入锁ReentrantLock 下面,先介绍ReentrantLock的基本用法,然后重点介绍如何使用tryLock避免死锁。
1.基本用法 Lock接口的主要实现类是ReentrantLock,它的基本用法lock/unlock实现了与syn-chronized一样的语义,包括:
可重入,一个线程在持有一个锁的前提下,可以继续获得该锁;
可以解决竞态条件问题;
可以保证内存可见性。
ReentrantLock有两个构造方法:
1 2 public ReentrantLock () public ReentrantLock (boolean fair)
参数fair表示是否保证公平,不指定的情况下,默认为false,表示不保证公平。所谓公平是指,等待时间最长的线程优先获得锁。保证公平会影响性能,一般也不需要,所以默认不保证,synchronized锁也是不保证公平的 ,16.2.3节还会再分析实现细节。
使用显式锁,一定要记得调用unlock。一般而言,应该将lock之后的代码包装到try语句内,在finally语句内释放锁。比如,使用ReentrantLock实现Counter,代码可以为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Counter { private final Lock lock = new ReentrantLock (); private volatile int count; public void incr () { lock.lock(); try { count++; } finally { lock.unlock(); } } public int getCount () { return count; } }
2.使用tryLock避免死锁 使用tryLock(),可以避免死锁 。在持有一个锁获取另一个锁而获取不到的时候,可以释放已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。
我们来看个例子,银行账户之间转账,用类Account表示账户,如代码清单16-3所示。
代码清单16-3 表示账户的类Account
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class Account { private Lock lock = new ReentrantLock (); private volatile double money; public Account (double initialMoney) { this .money = initialMoney; } public void add (double money) { lock.lock(); try { this .money += money; } finally { lock.unlock(); } } public void reduce (double money) { lock.lock(); try { this .money -= money; } finally { lock.unlock(); } } public double getMoney () { return money; } void lock () { lock.lock(); } void unlock () { lock.unlock(); } boolean tryLock () { return lock.tryLock(); } }
Account里的money表示当前余额,add/reduce用于修改余额。在账户之间转账,需要两个账户都锁定,如果不使用tryLock,而直接使用lock,则代码如代码清单27-6所示。
代码清单16-4 转账的错误写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class AccountMgr { public static class NoEnoughMoneyException extends Exception {} public static void transfer (Account from, Account to, double money) throws NoEnoughMoneyException { from.lock(); try { to.lock(); try { if (from.getMoney() >= money) { from.reduce(money); to.add(money); } else { throw new NoEnoughMoneyException (); } } finally { to.unlock(); } } finally { from.unlock(); } } }
但这么写是有问题的,如果两个账户都同时给对方转账,都先获取了第一个锁,则会发生死锁。我们写段代码来模拟这个过程,如代码清单16-5所示。
代码清单16-5 模拟账户转账的死锁过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void simulateDeadLock () { final int accountNum = 10 ; final Account[] accounts = new Account [accountNum]; final Random rnd = new Random (); for (int i = 0 ; i < accountNum; i++) { accounts[i] = new Account (rnd.nextInt(10000 )); } int threadNum = 100 ; Thread[] threads = new Thread [threadNum]; for (int i = 0 ; i < threadNum; i++) { threads[i] = new Thread () { public void run () { int loopNum = 100 ; for (int k = 0 ; k < loopNum; k++) { int i = rnd.nextInt(accountNum); int j = rnd.nextInt(accountNum); int money = rnd.nextInt(10 ); if (i ! = j) { try { transfer(accounts[i], accounts[j], money); } catch (NoEnoughMoneyException e) { } } } } }; threads[i].start(); } }
以上代码创建了10个账户,100个线程,每个线程执行100次循环,在每次循环中,随机挑选两个账户进行转账。在笔者的计算机中,每次执行该段代码都会发生死锁。读者可以更改这些数值进行试验。
我们使用tryLock来进行修改,先定义一个tryTransfer方法,如代码清单16-6所示。
代码清单16-6 使用tryLock尝试转账
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static boolean tryTransfer (Account from, Account to, double money) throws NoEnoughMoneyException { if (from.tryLock()) { try { if (to.tryLock()) { try { if (from.getMoney() >= money) { from.reduce(money); to.add(money); } else { throw new NoEnoughMoneyException (); } return true ; } finally { to.unlock(); } } } finally { from.unlock(); } } return false ; }
如果两个锁都能够获得,且转账成功,则返回true,否则返回false。不管怎样,结束都会释放所有锁。transfer方法可以循环调用该方法以避免死锁,代码可以为:
1 2 3 4 5 6 7 8 9 10 public static void transfer (Account from, Account to, double money) throws NoEnoughMoneyException { boolean success = false ; do { success = tryTransfer(from, to, money); if (! success) { Thread.yield(); } } while (! success); }
除了实现Lock接口中的方法,ReentrantLock还有一些其他方法,通过它们,可以获取关于锁的一些信息,这些信息可以用于监控和调试目的,具体可参看API文档,就不介绍了。
16.2.3 ReentrantLock的实现原理 ReentrantLock的用法是比较简单的,它是怎么实现的呢?在最底层,它依赖于16.1节介绍的CAS方法,另外,它依赖于类LockSupport中的一些方法。我们先介绍Lock-Support。
1. LockSupport 类LockSupport也位于包java.util.concurrent.locks下,它的基本方法有:
1 2 3 4 public static void park () public static void parkNanos (long nanos) public static void parkUntil (long deadline) public static void unpark (Thread thread)
park使得当前线程放弃CPU,进入等待状态(WAITING),操作系统不再对它进行调度,什么时候再调度呢?有其他线程对它调用了unpark, unpark使参数指定的线程恢复可运行状态。我们看个例子:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws InterruptedException { Thread t = new Thread (){ public void run () { LockSupport.park(); System.out.println("exit" ); } }; t.start(); Thread.sleep(1000 ); LockSupport.unpark(t); }
上述例子中,主线程启动子线程t,线程t启动后调用park,放弃CPU,主线程睡眠1秒以确保子线程已执行LockSupport.park(),调用unpark,线程t恢复运行,输出exit。
park不同于Thread.yield(), yield只是告诉操作系统可以先让其他线程运行,但自己依然是可运行状态,而park会放弃调度资格,使线程进入WAITING状态。
需要说明的是,park是响应中断的 ,当有中断发生时,park会返回,线程的中断状态会被设置。另外还需要说明,park可能会无缘无故地返回,程序应该重新检查park等待的条件是否满足。
park有两个变体:
parkNanos:可以指定等待的最长时间,参数是相对于当前时间的纳秒数;
parkUntil:可以指定最长等到什么时候,参数是绝对时间,是相对于纪元时的毫秒数。
当等待超时的时候,它们也会返回。
这些park方法还有一些变体,可以指定一个对象,表示是由于该对象而进行等待的,以便于调试,通常传递的值是this,比如:
1 public static void park (Object blocker)
LockSupport有一个方法,可以返回一个线程的blocker对象:
1 public static Object getBlocker (Thread t)
这些park/unpark方法是怎么实现的呢?与CAS方法一样,它们也调用了Unsafe类中的对应方法。Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为Lock-Support中的这些方法就是基本操作 。
2. AQS 利用CAS和LockSupport提供的基本方法,就可以用来实现ReentrantLock了。但Java中还有很多其他并发工具,如ReentrantReadWriteLock、Semaphore、CountDownLatch,它们的实现有很多类似的地方,为了复用代码,Java提供了一个抽象类AbstractQueued-Synchronizer,简称AQS,它简化了并发工具的实现。AQS的整体实现比较复杂,我们主要以ReentrantLock的使用为例进行简要介绍。
AQS封装了一个状态,给子类提供了查询和设置状态的方法:
1 2 3 4 private volatile int state;protected final int getState () protected final void setState (int newState) protected final boolean compareAndSetState (int expect, int update)
用于实现锁时,AQS可以保存锁的当前持有线程,提供了方法进行查询和设置:
1 2 3 private transient Thread exclusiveOwnerThread;protected final void setExclusiveOwnerThread (Thread t) protected final Thread getExclusiveOwnerThread ()
AQS内部维护了一个等待队列,借助CAS方法实现了无阻塞算法进行更新。
下面,我们以ReentrantLock的使用为例简要介绍AQS的原理。
3. ReentrantLock ReentrantLock内部使用AQS,有三个内部类:
1 2 3 abstract static class Sync extends AbstractQueuedSynchronizer static final class NonfairSync extends Sync static final class FairSync extends Sync
Sync是抽象类,NonfairSync是fair为false时使用的类,FairSync是fire为true时使用的类。ReentrantLock内部有一个Sync成员:
1 private final Sync sync;
在构造方法中sync被赋值,比如:
1 2 3 public ReentrantLock () { sync = new NonfairSync (); }
我们来看ReentrantLock中的基本方法lock/unlock的实现。先看lock方法,代码为:
1 2 3 public void lock () { sync.lock(); }
sync默认类型是NonfairSync, NonfairSync的lock代码为:
1 2 3 4 5 6 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); }
ReentrantLock使用state表示是否被锁和持有数量,如果当前未被锁定,则立即获得锁,否则调用acquire(1)获得锁。acquire是AQS中的方法,代码为:
1 2 3 4 5 public final void acquire (int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
它调用tryAcquire获取锁,tryAcquire必须被子类重写。NonfairSync的实现为:
1 2 3 protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); }
nonfairTryAcquire是sync中实现的,代码为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
这段代码容易理解,如果未被锁定,则使用CAS进行锁定;如果已被当前线程锁定,则增加锁定次数。如果tryAcquire返回false,则AQS会调用:
1 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
其中,addWaiter会新建一个节点Node,代表当前线程,然后加入内部的等待队列中,限于篇幅,具体代码就不列出来了。放入等待队列后,调用acquireQueued尝试获得锁,代码为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (; ; ) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
主体是一个死循环,在每次循环中,首先检查当前节点是不是第一个等待的节点,如果是且能获得到锁,则将当前节点从等待队列中移除并返回,否则最终调用LockSupport. park放弃CPU,进入等待,被唤醒后,检查是否发生了中断,记录中断标志,在最终方法返回时返回中断标志。如果发生过中断,acquire方法最终会调用selfInterrupt方法设置中断标志位,其代码为:
1 2 3 private static void selfInterrupt () { Thread.currentThread().interrupt(); }
以上就是lock方法的基本过程,能获得锁就立即获得,否则加入等待队列,被唤醒后检查自己是否是第一个等待的线程,如果是且能获得锁,则返回,否则继续等待。这个过程中如果发生了中断,lock会记录中断标志位,但不会提前返回或抛出异常。
ReentrantLock的unlock方法的代码为:
1 2 3 public void unlock () { sync.release(1 ); }
release是AQS中定义的方法,代码为:
1 2 3 4 5 6 7 8 9 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null && h.waitStatus ! = 0 ) unparkSuccessor(h); return true ; } return false ; }
tryRelease方法会修改状态释放锁,unparkSuccessor会调用LockSupport.unpark将第一个等待的线程唤醒,具体代码就不列举了。
FairSync和NonfairSync的主要区别是:在获取锁时,即在tryAcquire方法中,如果当前未被锁定,即c==0, FairSync多了一个检查,如下:
1 2 3 4 5 6 7 8 9 10 11 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (! hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } ..
这个检查是指,只有不存在其他等待时间更长的线程,它才会尝试获取锁。
这样保证公平不是很好吗?为什么默认不保证公平呢?保证公平整体性能比较低,低的原因不是这个检查慢,而是会让活跃线程得不到锁,进入等待状态,引起频繁上下文切换,降低了整体的效率 ,通常情况下,谁先运行关系不大,而且长时间运行,从统计角度而言,虽然不保证公平,也基本是公平的。需要说明是,即使fair参数为true, ReentrantLock中不带参数的tryLock方法也是不保证公平的 ,它不会检查是否有其他等待时间更长的线程。
16.2.4 对比ReentrantLock和synchronized 相比synchronized, ReentrantLock可以实现与synchronized相同的语义,而且支持以非阻塞方式获取锁,可以响应中断,可以限时,更为灵活。不过,synchronized的使用更为简单,写的代码更少,也更不容易出错。
synchronized代表一种声明式编程思维 ,程序员更多的是表达一种同步声明,由Java系统负责具体实现,程序员不知道其实现细节;显式锁代表一种命令式编程思维 ,程序员实现所有细节。
声明式编程的好处除了简单,还在于性能,在较新版本的JVM上,ReentrantLock和synchronized的性能是接近的,但Java编译器和虚拟机可以不断优化synchronized的实现 ,比如自动分析synchronized的使用,对于没有锁竞争的场景,自动省略对锁获取/释放的调用。
简单总结下,能用synchronized就用synchronized,不满足要求时再考虑Reentrant-Lock 。