16.6.3 使用阻塞队列(BlockingQueue)控制线程通信
16.6.3 使用阻塞队列(BlockingQueue)控制线程通信
Java5提供了一个BlockingQueue接口,虽然BlockingQueue也是Queue的子接口,但BlockingQueue的主要用途并不是作为容器,而是作为线程同步的工具。
BlockingQueue特征
BlockingQueue具有一个特征:
- 当生产者线程试图向
BlockingQueue中放入元素时,如果该队列已满,则该生产者线程被阻塞; - 当消费者线程试图从
BlockingQueue中取出元素时,如果该队列已空,则该消费者线程被阻塞
程序的两个线程通过交替向BlockingQueue中放入元素、取出元素,即可很好地控制线程的通信。
BlockingQueue阻塞方法
BlockingQueue提供如下两个支持阻塞的方法。
| 方法 | 描述 |
|---|---|
void put(E e) |
尝试把E元素放入BlockingQueue中,如果该队列的元素已满,则阻塞该线程。 |
E take() |
尝试从BlockingQueue的头部取出元素,如果该队列的元素已空,则阻塞该线程。 |
Queue接口方法
BlockingQueue继承了Queue接口,当然也可使用Queue接口中的方法。这些方法归纳起来可分为如下三组。
- 在队列尾部插入元素。包括
add(E e)、offer(E e)和put(E e)方法,当该队列已满时,这三个方法分别会抛出异常、返回false、阻塞队列。 - 在队列头部删除并返回删除的元素。包括
remove、poll()和take()方法。当该队列已空时,这三个方法分别会抛出异常、返回false、阻塞队列。 - 在队列头部取出但不删除元素。包括
element()和pek()方法,当队列已空时,这两个方法分别抛出异常、返回false
BlockingQueue方法分类
BlockingQueue包含的方法之间的对应关系如表16.1所示。
| 功能 | 抛出异常 | 不同返回值 | 阻塞线程 | 指定超时时长 |
|---|---|---|---|---|
| 队尾插入元素 | add(e) |
offer(e) |
put(e) |
offer(e,time,unit) |
| 队头删除元素 | remove() |
poll() |
take() |
poll(time,unit) |
| 获取、不删除元素 | element() |
peek() |
无 | 无 |
BlockingQueue实现类
BlockingQueue与其实现类之间的类图如图16.11所示。
图16.11中以黑色方框框出的都是Java7新增的阻塞队列。从图16.11可以看到,BlockingQueue包含如下5个实现类。
BlockingQueue实现类 |
描述 |
|---|---|
ArrayBlockingQueue |
基于数组实现的BlockingQueue队列。 |
LinkedBlockingQueue |
基于链表实现的BlockingQueue队列。 |
SynchronousQueue |
同步队列。对该队列的存、取操作必须交替进行。 |
PriorityBlockingQueue |
它并不是标准的阻塞队列。 与前面介绍的 PriorityQueue类似,该队列调用remove()、poll()、take()等方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。PriorityBlockingQueue判断元素的大小即可根据元素(实现Comparable接口)的本身大小来自然排序,也可使用Comparator进行定制排序。 |
DelayQueue |
它是一个特殊的BlockingQueue,底层基于PriorityBlockingQueue实现。不过,DelayQueue要求集合元素都实现Delay接口(该接口里只有一个longGetDelay()方法),DelayQueue根据集合元素的getDalay()方法的返回值进行排序 |
程序示例 ArrayBlockingQueue阻塞队列
下面以ArrayBlockingQueue为例介绍阻塞队列的功能和用法。下面先用一个最简单的程序来测试BlockingQueue的put()方法。
1 | import java.util.concurrent.*; |
上面程序先定义一个大小为2的BlockingQueue,程序先向该队列中放入两个元素,此时队列还没有满,两个元素都可以放入,因此使用put()、add()和offer()方法效果完全一样。当程序试图放入第三个元素时,
- 如果使用
put()方法尝试放入元素将会阻塞线程,如上面程序①号代码所示。 - 此时,如果使用
add()方法尝试放入元素将会引发异常; - 此时,如果使用
offer()方法尝试放入元素则会返回false,元素不会被放入
与此类似的是,在BlockingQueue已空的情况下,程序
- 使用
take()方法尝试取出元素将会阻塞线程; - 使用
remove()方法尝试取出元素将引发异常; - 使用
poll()方法尝试取出元素将返回false,元素不会被删除.
程序 使用BlockingQueue实现线程通信
掌握了BlockingQueue阻塞队列的特性之后,下面程序就可以利用BlockingQueue来实现线程通信了。
1 | import java.util.concurrent.*; |
上面程序启动了3个生产者线程向BlockingQueue集合放入元素,启动了1个消费者线程从BlockingQueue集合取出元素。
本程序的BlockingQueue集合容量为1,因此3个生产者线程无法连续放入元素,必须等待消费者线程取出一个元素后,3个生产者线程的其中之一才能放入一个元素。运行该程序,会看到如下所示的结果
1 | Thread-1生产者准备生产集合元素! |
3个生产者线程都想向BlockingQueue中放入元素,但只要其中一个线程向该队列中放入元素之后,其他生产者线程就必须等待,等待消费者线程取出BlockingQueue队列里的元素。