16.6.3 使用阻塞队列(BlockingQueue)控制线程通信

16.6.3 使用阻塞队列(BlockingQueue)控制线程通信

Java5提供了一个BlockingQueue接口,虽然BlockingQueue也是Queue的子接口,但BlockingQueue的主要用途并不是作为容器,而是作为线程同步的工具。

BlockingQueue特征

BlockingQueue具有一个特征:

  • 当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该生产者线程被阻塞;
  • 当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该消费者线程被阻塞

程序的两个线程通过交替向BlockingQueue中放入元素、取出元素,即可很好地控制线程的通信。

BlockingQueue阻塞方法

BlockingQueue提供如下两个支持阻塞的方法。

方法 描述
void put(E e) 尝试把E元素放入BlockingQueue中,如果该队列的元素已满,则阻塞该线程。
E take() 尝试从BlockingQueue的头部取出元素,如果该队列的元素已空,则阻塞该线程。

Queue接口方法

BlockingQueue继承了Queue接口,当然也可使用Queue接口中的方法。这些方法归纳起来可分为如下三组。

  1. 在队列尾部插入元素。包括add(E e)offer(E e)put(E e)方法,当该队列已满时,这三个方法分别会抛出异常、返回false阻塞队列
  2. 在队列头部删除并返回删除的元素。包括removepoll()take()方法。当该队列已空时,这三个方法分别会抛出异常、返回false、阻塞队列。
  3. 在队列头部取出但不删除元素。包括element()pek()方法,当队列已空时,这两个方法分别抛出异常、返回false

BlockingQueue方法分类

BlockingQueue包含的方法之间的对应关系如表16.1所示。

功能 抛出异常 不同返回值 阻塞线程 指定超时时长
队尾插入元素 add(e) offer(e) put(e) offer(e,time,unit)
队头删除元素 remove() poll() take() poll(time,unit)
获取、不删除元素 element() peek()

BlockingQueue实现类

BlockingQueue与其实现类之间的类图如图16.11所示。
图片

图16.11中以黑色方框框出的都是Java7新增的阻塞队列。从图16.11可以看到,BlockingQueue包含如下5个实现类。

BlockingQueue实现类 描述
ArrayBlockingQueue 基于数组实现的BlockingQueue队列。
LinkedBlockingQueue 基于链表实现的BlockingQueue队列。
SynchronousQueue 同步队列。对该队列的存、取操作必须交替进行。
PriorityBlockingQueue 它并不是标准的阻塞队列。
与前面介绍的PriorityQueue类似,该队列调用remove()poll()take()等方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素
PriorityBlockingQueue判断元素的大小即可根据元素(实现Comparable接口)的本身大小来自然排序,也可使用Comparator进行定制排序。
DelayQueue 它是一个特殊的BlockingQueue,底层基于PriorityBlockingQueue实现。不过,DelayQueue要求集合元素都实现Delay接口(该接口里只有一个longGetDelay()方法),DelayQueue根据集合元素的getDalay()方法的返回值进行排序

程序示例 ArrayBlockingQueue阻塞队列

下面以ArrayBlockingQueue为例介绍阻塞队列的功能和用法。下面先用一个最简单的程序来测试BlockingQueueput()方法。

1
2
3
4
5
6
7
8
9
10
11
import java.util.concurrent.*;

public class BlockingQueueTest {
public static void main(String[] args) throws Exception {
// 定义一个长度为2的阻塞队列
BlockingQueue<String> bq = new ArrayBlockingQueue<>(2);
bq.put("Java"); // 与bq.add("Java"、bq.offer("Java")相同
bq.put("Java"); // 与bq.add("Java"、bq.offer("Java")相同
bq.put("Java"); // ① 阻塞线程。
}
}

上面程序先定义一个大小为2的BlockingQueue,程序先向该队列中放入两个元素,此时队列还没有满,两个元素都可以放入,因此使用put()add()offer()方法效果完全一样。当程序试图放入第三个元素时,

  • 如果使用put()方法尝试放入元素将会阻塞线程,如上面程序①号代码所示。
  • 此时,如果使用add()方法尝试放入元素将会引发异常;
  • 此时,如果使用offer()方法尝试放入元素则会返回false,元素不会被放入

与此类似的是,在BlockingQueue已空的情况下,程序

  • 使用take()方法尝试取出元素将会阻塞线程;
  • 使用remove()方法尝试取出元素将引发异常;
  • 使用poll()方法尝试取出元素将返回false,元素不会被删除.

程序 使用BlockingQueue实现线程通信

掌握了BlockingQueue阻塞队列的特性之后,下面程序就可以利用BlockingQueue来实现线程通信了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.concurrent.*;

// 生产者线程
class Producer extends Thread {
private BlockingQueue<String> bq;

public Producer(BlockingQueue<String> bq) {
this.bq = bq;
}

public void run() {
String[] strArr = new String[] { "Java", "Struts", "Spring" };
for (int i = 0; i < 999999999; i++) {
System.out.println(getName() + "生产者准备生产集合元素!");
try {
Thread.sleep(200);
// 尝试放入元素,如果队列已满,线程被阻塞
bq.put(strArr[i % 3]);
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println(getName() + "生产完成:" + bq);
}
}
}
// 消费者线程
class Consumer extends Thread {
private BlockingQueue<String> bq;

public Consumer(BlockingQueue<String> bq) {
this.bq = bq;
}

public void run() {
while (true) {
System.out.println(getName() + "消费者准备消费集合元素!");
try {
Thread.sleep(200);
// 尝试取出元素,如果队列已空,线程被阻塞
bq.take();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println(getName() + "消费完成:" + bq);
}
}
}

public class BlockingQueueTest2 {
public static void main(String[] args) {
// 创建一个容量为1的BlockingQueue
BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
// 启动3条生产者线程
new Producer(bq).start();
new Producer(bq).start();
new Producer(bq).start();
// 启动一条消费者线程
new Consumer(bq).start();
}
}

上面程序启动了3个生产者线程向BlockingQueue集合放入元素,启动了1个消费者线程从BlockingQueue集合取出元素。
本程序的BlockingQueue集合容量为1,因此3个生产者线程无法连续放入元素,必须等待消费者线程取出一个元素后,3个生产者线程的其中之一才能放入一个元素。运行该程序,会看到如下所示的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread-1生产者准备生产集合元素!
Thread-0生产者准备生产集合元素!
Thread-2生产者准备生产集合元素!
Thread-3消费者准备消费集合元素!
Thread-0生产完成:[Java]
Thread-0生产者准备生产集合元素!
Thread-3消费完成:[]
Thread-3消费者准备消费集合元素!
Thread-2生产完成:[Java]
Thread-2生产者准备生产集合元素!
Thread-1生产完成:[Java]
Thread-1生产者准备生产集合元素!
Thread-3消费完成:[Java]
Thread-3消费者准备消费集合元素!
Thread-3消费完成:[]
Thread-3消费者准备消费集合元素!

3个生产者线程都想向BlockingQueue中放入元素,但只要其中一个线程向该队列中放入元素之后,其他生产者线程就必须等待,等待消费者线程取出BlockingQueue队列里的元素。