17.2 ConcurrentHashMap

17.2 ConcurrentHashMap

本节介绍一个常用的并发容器ConcurrentHashMap,它是HashMap的并发版本,与HashMap相比,它有如下特点:

  • 并发安全;
  • 直接支持一些原子复合操作;
  • 支持高并发,读操作完全并行,写操作支持一定程度的并行;
  • 与同步容器Collections.synchronizedMap相比,迭代不用加锁,不会抛出Concurre ntModificationException;
  • 弱一致性。

下面我们分别介绍。

17.2.1 并发安全

需要了解的是,HashMap不是并发安全的,在并发更新的情况下,HashMap可能出现死循环,占满CPU。我们看个例子,如代码清单17-1所示。

代码清单17-1 HashMap死循环示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void unsafeConcurrentUpdate() {
final Map<Integer, Integer> map = new HashMap<>();
for(int i = 0; i < 1000; i++) {
Thread t = new Thread() {
Random rnd = new Random();
@Override
public void run() {
for(int i = 0; i < 1000; i++) {
map.put(rnd.nextInt(), 1);
}
}
};
t.start();
}
}

运行上面的代码,在笔者的计算机中,无论是Java 7还是Java 8环境,每次都会出现死循环,占满CPU。

为什么会出现死循环呢?死循环出现在多个线程同时扩容哈希表的时候,不是同时更新一个链表的时候,那种情况可能会出现更新丢失,但不会死循环,具体过程比较复杂,我们就不解释了。关于Java 7的解释感兴趣的读者可以参考 http://coolshell.cn/articles/9606.html 中的文章。Java 8对HashMap的实现进行了大量优化,减少了死循环的可能,但在扩容的时候还是可能有死循环。

使用Collections.synchronizedMap方法可以生成一个同步容器,以避免产生死循环,替换第一行代码即可:

1
2
final Map<Integer, Integer> map = Collections.synchronizedMap(
new HashMap<Integer, Integer>());

同步容器有几个问题:

  • 每个方法都需要同步,支持的并发度比较低;
  • 对于迭代和复合操作,需要调用方加锁,使用比较麻烦,且容易忘记。

ConcurrentHashMap没有这些问题,它同样实现了Map接口,也是基于哈希表实现的,上面的代码替换第一行即可:

1
final Map<Integer, Integer> map = new ConcurrentHashMap<>();

17.2.2 原子复合操作

除了Map接口,ConcurrentHashMap还实现了一个接口ConcurrentMap,接口定义了一些条件更新操作,Java 7中的具体定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ConcurrentMap<K, V> extends Map<K, V> {
//条件更新,如果Map中没有key,设置key为value,返回原来key对应的值,
//如果没有,返回null
V putIfAbsent(K key, V value);
//条件删除,如果Map中有key,且对应的值为value,则删除,如果删除了,返回true,
//否则返回false
boolean remove(Object key, Object value);
//条件替换,如果Map中有key,且对应的值为oldValue,则替换为newValue,
//如果替换了,返回ture,否则false
boolean replace(K key, V oldValue, V newValue);
//条件替换,如果Map中有key,则替换值为value,返回原来key对应的值,
//如果原来没有,返回null
V replace(K key, V value);
}

Java 8增加了几个默认方法,包括getOrDefault、forEach、computeIfAbsent、merge等,具体可参见API文档,我们就不介绍了。如果使用同步容器,调用方必须加锁,而Concurrent-HashMap将它们实现为了原子操作。实际上,使用ConcurrentHashMap,调用方也没有办法进行加锁,它没有暴露锁接口,也不使用synchronized。

17.2.3 高并发的基本机制

ConcurrentHashMap是为高并发设计的,它是怎么做的呢?具体实现比较复杂,我们简要介绍其思路,在Java 7中,主要有两点:

  • 分段锁
  • 读不需要锁

同步容器使用synchronized,所有方法竞争同一个锁;而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立的哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。

采用分段锁,可以大大提高并发度,多个段之间可以并行读写。默认情况下,段是16个,不过,这个数字可以通过构造方法进行设置,如下所示:

1
2
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)

concurrencyLevel表示估计的并行更新的线程个数,ConcurrentHashMap会将该数转换为2的整数次幂,比如14转换为16,25转换为32。

在对每个段的数据进行读写时,ConcurrentHashMap也不是简单地使用锁进行同步,内部使用了CAS。对一些写采用原子方式的方法,实现比较复杂,我们就不介绍了。实现的效果是,对于写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写的同时也可以读,这使得ConcurrentHashMap的并行度远高于同步容器。

Java 8对ConcurrentHashMap的实现进一步做了优化。首先,与HashMap的改进类似,在哈希冲突比较严重的时候,会将单向链表转化为平衡的排序二叉树,提高查找的效率;其次,锁的粒度进一步细化了,以提高并行性,哈希表数组中的每个位置(指向一个单链表或树)都有一个单独的锁,具体比较复杂,我们就不介绍了。

17.2.4 迭代安全

我们在15.2.3节介绍过,使用同步容器,在迭代中需要加锁,否则可能会抛出Concurrent-ModificationException。ConcurrentHashMap没有这个问题,在迭代器创建后,在迭代过程中,如果另一个线程对容器进行了修改,迭代会继续,不会抛出异常。

问题是,迭代会反映其他线程的修改吗?还是像CopyOnWriteArrayList一样,反映的是创建时的副本?答案是,都不是!我们看个例子,如代码清单17-2所示。

代码清单17-2 ConcurrentHashMap的迭代示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public static void unsafeConcurrentUpdate() {
final Map<Integer, Integer> map = new HashMap<>();
for(int i = 0; i < 1000; i++) {
Thread t = new Thread() {
Random rnd = new Random();
@Override
public void run() {
for(int i = 0; i < 1000; i++) {
map.put(rnd.nextInt(), 1);
}
}
};
t.start();
}
}
final Map<Integer, Integer> map = Collections.synchronizedMap(
new HashMap<Integer, Integer>());
public interface ConcurrentMap<K, V> extends Map<K, V> {
//条件更新,如果Map中没有key,设置key为value,返回原来key对应的值,
//如果没有,返回null
V putIfAbsent(K key, V value);
//条件删除,如果Map中有key,且对应的值为value,则删除,如果删除了,返回true,
//否则返回false
boolean remove(Object key, Object value);
//条件替换,如果Map中有key,且对应的值为oldValue,则替换为newValue,
//如果替换了,返回ture,否则false
boolean replace(K key, V oldValue, V newValue);
//条件替换,如果Map中有key,则替换值为value,返回原来key对应的值,
//如果原来没有,返回null
V replace(K key, V value);
}
final Map<Integer, Integer> map = new ConcurrentHashMap<>();
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)
public class ConcurrentHashMapIteratorDemo {
public static void test() {
final ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("a", "abstract");
map.put("b", "basic");
Thread t1 = new Thread() {
@Override
public void run() {
for(Entry<String, String> entry : map.entrySet()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(entry.getKey() + ", " + entry.getValue());
}
}
};
t1.start();
// 确保线程t1启动
try {
Thread.sleep(100);
} catch(InterruptedException e) {
}
map.put("c", "call");
}
public static void main(String[] args) {
test();
}
}

t1启动后,创建迭代器,但在迭代输出每个元素前,先睡眠1秒,主线程启动t1后,先睡眠一下,确保t1先运行,然后给map增加了一个元素,程序输出为:

1
2
3
a, abstract
b, basic
c, call

上述代码说明迭代器反映了最新的更新。将添加语句更改为:

1
map.put("g", "call");

会发现程序输出为:

1
2
a, abstract
b, basic

这说明迭代器没有反映最新的更新。需要说明的是,这是Java 7的输出,Java 8和Java 9的实现不太一样,输出也不太一样,但也有相同的问题。到底是怎么回事呢?这需要我们理解ConcurrentHashMap的弱一致性

17.2.5 弱一致性

ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性。

类似的情况还会出现在ConcurrentHashMap的另一个方法:

1
2
//批量添加m中的键值对到当前Map
public void putAll(Map<? extends K, ? extends V> m)

该方法并非原子操作,而是调用put方法逐个元素进行添加的,在该方法没有结束的时候,部分修改效果就会体现出来。

17.2.6 小结

本节介绍了ConcurrentHashMap,它是并发版的HashMap,通过降低锁的粒度和CAS等实现了高并发,支持原子条件更新操作,不会抛出ConcurrentModificationException,实现了弱一致性。

Java中没有并发版的HashSet,但可以通过Collections.newSetFromMap方法基于Con-currentHashMap构建一个。

我们知道HashMap/HashSet基于哈希,不能对元素排序,对应的可排序的容器类是TreeMap/TreeSet,并发包中可排序的对应版本不是基于树,而是基于Skip List(跳跃表),类分别是ConcurrentSkipListMap和ConcurrentSkipListSet,它们到底是什么呢?让我们下节讨论。