Java多线程- ConcurrentHashMap

基础容器类ArrayList、LinkedList、HashMap都是非线程安全的,在多线程环境下使用这些容器会出现线程安全问题,为了保证线程安全,java使用内置锁Synchronized提供了一套线程安全的同步容器类,比如Vector、HashTable、Stack,这些同步容器实现线程安全的方式是在需要同步的方法上添加关键字synchronized,而在线程征用的场景下synchronized会变成重量级锁,严重影响性能,因此有了juc高并发容器

juc高并发容器是基于无锁编程算法实现的容器类,通过CAS保证操作的原子性,通过volatile保障内存的可见性。

ConcurrentHashMap就是juc包中的一个高并发容器类,是一种线程安全的哈希表。

ConcurrentHashMap对应的基础容器是HashMap

1.HashMap

基础容器HashMap是线程不安全的,在扩容的时候会导致死链。

默认负载因子是0.75,即当HashMap的元素个数(使用的桶的个数)超过容量(一共的桶的个数)的3/4时会触发扩容。

下面均以jdk7为例

假如现在HashMap的桶个数为16,添加元素,直到使用的桶个数为12时,如果再添加元素需要放在新的桶中(根据key的hash值磨桶总个数得到的桶的下标值),需要扩容了,假如现在桶下标为1的位置有3个key-value键值对,链为(1,null)->(35,null)->(16,null),在jdk7中后加入的键值对放在链表头,这个添加的顺序就是16,35,1

就在此时,假如有两个线程都要同时往HashMap中添加一个键值对,会先触发扩容,( 扩容后(1,null)和(35,null) 还是在桶下标为1的位置 )

对于线程1,e:1->35->16->null

​ next:35->16->null

对于线程2,e:1->35->16->null

​ next:35->16->null

假设线程1先扩容完毕,此时桶下标为1的位置的链是(35,null)->(1,null)

所以现在线程2的节点引用变为:

​ e:1->null

​ next: 35->1->null

新的桶1:1->null

​ e:35->1->null

​ next:1->null

接着桶1的状况:35->1->null

​ e:1->null

​ next:null

接着桶1的状况:1->35->1->null

​ e:35->1->null

​ next:1->null

注意:现在已经造成死链了!!

2.Hashtable

Hashtable实现同步是利用synchronized关键字进行锁定,即给整个哈希表进行锁定,每次锁住整张表让线程独占,虽然解决了线程安全问题,但是造成了巨大的资源浪费。

当一个线程正在访问HashTable的同步方法时,其他访问Hashtable同步方法的所有线程都必须进入阻塞或轮询状态,相当于将所有的操作串行化。

Hashtable不允许key和value为null

3.ConcurrentHashMap

3.1 JDK1.8中

重要的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static final int MAXIMUM_CAPACITY = 1073741824;
private static final int DEFAULT_CAPACITY = 16;
static final int MAX_ARRAY_SIZE = 2147483639;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75F;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static final int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = 65535;
private static final int RESIZE_STAMP_SHIFT = 16;
//表示正在转移
static final int MOVED = -1;
//表示已经转换成树
static final int TREEBIN = -2;
static final int RESERVED = -3;
static final int HASH_BITS = 2147483647;
//保存元素,数组的每个元素都是一个桶bucket
transient volatile ConcurrentHashMap.Node<K, V>[] table;
//转移时(扩容时)使用的数组
private transient volatile ConcurrentHashMap.Node<K, V>[] nextTable;
private transient volatile long baseCount;
//默认为0,当初始化时,为-1,当扩容时为-(1+扩容线程数),当初始化或扩容完成后,值为下一次扩容的阈值
// sizeCtl = -1,表示有线程正在进行真正的初始化操作
// sizeCtl = -(1 + nThreads),表示有nThreads个线程正在进行扩容操作
private transient volatile int sizeCtl;
//参数 transferIndex 控制各线程迁移哪些节点,从 n 开始,每来一个线程扩容,就减小 一个步长(stride ),即整个数组被分成若干段,一个线程迁移一小段。支持并发扩容
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile ConcurrentHashMap.CounterCell[] counterCells;
private transient ConcurrentHashMap.KeySetView<K, V> keySet;
private transient ConcurrentHashMap.ValuesView<K, V> values;
private transient ConcurrentHashMap.EntrySetView<K, V> entrySet;
private static final Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final int ABASE;
private static final int ASHIFT;

一些重要的静态内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class Node<K, V> implements Entry<K, V> {....}

//扩容时如果某个桶迁移完毕,用它作为旧的table bin的头结点,防止其他线程影响,ForwardingNode节点的哈希值是-1
static final class ForwardingNode<K, V> extends ConcurrentHashMap.Node<K, V> {
ForwardingNode(ConcurrentHashMap.Node<K, V>[] tab) {
super(-1, (Object)null, (Object)null);
this.nextTable = tab;
}
}

//红黑树,作为treebin的头结点,存储root和first
static final class TreeBin<K, V> extends ConcurrentHashMap.Node<K, V> {....}

//作为treebin的节点,寻出parent,left,right
static final class TreeNode<K, V> extends ConcurrentHashMap.Node<K, V> {....}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (loadFactor > 0.0F && initialCapacity >= 0 && concurrencyLevel > 0) {
//桶的个数至少要保证并发度的大小,即最多有concurrencyLevel个线程同时操作这个哈希表
if (initialCapacity < concurrencyLevel) {
initialCapacity = concurrencyLevel;
}
long size = (long)(1.0D + (double)((float)((long)initialCapacity) / loadFactor));
//因为上面计算的size很大概率不是2的n次方,所以把他转换成2的n次方
int cap = size >= 1073741824L ? 1073741824 : tableSizeFor((int)size);
//设置table的大小
this.sizeCtl = cap;
} else {
throw new IllegalArgumentException();
}
}

对initialCapacity有点疑惑,很奇怪他存在的意义。问了一下同学,觉得应该是这样的。
定义哈希表在容纳下initialCapacity那么多个元素之前不能扩容,并且哈希表的容量刚开始就大于等于initialCapacity,又因为上面initialCapacity = concurrencyLevel; 保证了至少有concurrencyLevel的容量,也就是这么大的并发度

initialCapacity的作用就是 提前预估table的大小,防止后面多次扩容,消耗性能


get()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public V get(Object key) {
int h = spread(key.hashCode());
ConcurrentHashMap.Node[] tab;
ConcurrentHashMap.Node e;
int n;
// tabAt(tab, n - 1 & h):获取Node[] tab中第(n - 1 & h)个node,就是取模
if ((tab = this.table) != null && (n = tab.length) > 0 && (e = tabAt(tab, n - 1 & h)) != null) {
int eh;
Object ek;
//头节点的哈希码等于我们要查找的这个节点的哈希码
if ((eh = e.hash) == h) {
//头节点的key等于我们要查找的这个key,即这个头节点就是我们要找的那个节点
if ((ek = e.key) == key || ek != null && key.equals(ek)) {
return e.val;
}
//eh为负数表示该头节点正在扩容中,扩容完的头结点是forwardingnode,则调用find到新的哈希表中找,treebin是-2
} else if (eh < 0) {
ConcurrentHashMap.Node p;
return (p = e.find(h, key)) != null ? p.val : null;
}
//上面的都不满足,则到桶的链表中挨个查找
while((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || ek != null && key.equals(ek))) {
return e.val;
}
}
}

return null;
}

put方法

使用synchronized锁住链表头节点,不影响其他线程操作其他链表

多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public V put(K key, V value) {
return this.putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许有空的key和value
if (key != null && value != null) {
int hash = spread(key.hashCode());
int binCount = 0;//链表长度
ConcurrentHashMap.Node[] tab = this.table;

while(true) {
int n;
while(tab == null || (n = tab.length) == 0) {
//懒惰创建哈希表,使用cas初始化
tab = this.initTable();
}

ConcurrentHashMap.Node f;
int i;
//创建链表头节点
if ((f = tabAt(tab, i = n - 1 & hash)) == null) {
//添加链表头使用了cas,如果这时其他线程先创建了头结点,就cas失败,重新进入while循环,这些失败的线程并不会阻塞
if (casTabAt(tab, i, (ConcurrentHashMap.Node)null, new ConcurrentHashMap.Node(hash, key, value))) {
break;
}
} else {
int fh;
//这个头节点是forwardingnode,正在扩容
//线程帮忙扩容
if ((fh = f.hash) == -1) {
//锁住当前链表帮忙扩容
tab = this.helpTransfer(tab, f);
} else {
//桶下标冲突,需要对这个桶的链表进行遍历
Object fk;
Object fv;
if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || fk != null && key.equals(fk)) && (fv = f.val) != null) {
return fv;
}

V oldVal = null;
//只锁住链表头节点
synchronized(f) {
if (tabAt(tab, i) == f) {
if (fh < 0) {
if (f instanceof ConcurrentHashMap.TreeBin) {
binCount = 2;
ConcurrentHashMap.TreeNode p;
if ((p = ((ConcurrentHashMap.TreeBin)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) {
p.val = value;
}
}
} else if (f instanceof ConcurrentHashMap.ReservationNode) {
throw new IllegalStateException("Recursive update");
}
} else {//fh >= 0
label124: {
binCount = 1;

ConcurrentHashMap.Node e;
Object ek;
for(e = f; e.hash != hash || (ek = e.key) != key && (ek == null || !key.equals(ek)); ++binCount) {
//没找到这个节点,追加
ConcurrentHashMap.Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new ConcurrentHashMap.Node(hash, key, value);
break label124;
}
}

oldVal = e.val;
//更新值
if (!onlyIfAbsent) {
e.val = value;
}
}
}
}
}

if (binCount != 0) {
if (binCount >= 8) {
//链表长度大于树化阈值8,将链表转换成红黑树
//不过不是直接转换,是先扩容,但是等table的长度超过64后,链表长度还是大于8,就会将链表转化成红黑树
this.treeifyBin(tab, i);
}

if (oldVal != null) {
return oldVal;
}
break;
}
}
}
}

this.addCount(1L, binCount);
return null;
} else {
throw new NullPointerException();
}
}

helpTransfer

其他线程帮忙扩容,并发扩容

Cmap 支持并发扩容,实现方式是,将哈希表拆分,让每个线程处理自己的区间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final ConcurrentHashMap.Node<K, V>[] helpTransfer(ConcurrentHashMap.Node<K, V>[] tab, ConcurrentHashMap.Node<K, V> f) {
ConcurrentHashMap.Node[] nextTab;
if (tab != null && f instanceof ConcurrentHashMap.ForwardingNode && (nextTab = ((ConcurrentHashMap.ForwardingNode)f).nextTable) != null) {
int rs = resizeStamp(tab.length);

int sc;
while(nextTab == this.nextTable && this.table == tab && (sc = this.sizeCtl) < 0 && sc >>> 16 == rs && sc != rs + 1 && sc != rs + '\uffff' && this.transferIndex > 0) {
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
this.transfer(tab, nextTab);
break;
}
}

return nextTab;
} else {
return this.table;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final void transfer(ConcurrentHashMap.Node<K, V>[] tab, ConcurrentHashMap.Node<K, V>[] nextTab) {
int n = tab.length;
int stride;
if ((stride = NCPU > 1 ? (n >>> 3) / NCPU : n) < 16) {
stride = 16;
}

if (nextTab == null) {
try {
ConcurrentHashMap.Node<K, V>[] nt = new ConcurrentHashMap.Node[n << 1];
nextTab = nt;
} catch (Throwable var27) {
this.sizeCtl = 2147483647;
return;
}

this.nextTable = nextTab;
this.transferIndex = n;
}

addCount

维护哈希表的size,增加size的计数,如果超过阈值,扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private final void addCount(long x, int check) {
ConcurrentHashMap.CounterCell[] cs;
long b;
long s;
int sc;
if ((cs = this.counterCells) != null || !U.compareAndSetLong(this, BASECOUNT, b = this.baseCount, s = b + x)) {
boolean uncontended = true;
ConcurrentHashMap.CounterCell c;
long v;
if (cs == null || (sc = cs.length - 1) < 0 || (c = cs[ThreadLocalRandom.getProbe() & sc]) == null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
this.fullAddCount(x, uncontended);
return;
}

if (check <= 1) {
return;
}

s = this.sumCount();
}

int n;
ConcurrentHashMap.Node[] tab;
if (check >= 0) {
for(; s >= (long)(sc = this.sizeCtl) && (tab = this.table) != null && (n = tab.length) < 1073741824; s = this.sumCount()) {
int rs = resizeStamp(n);
if (sc < 0) {
//别的线程正在扩容,帮助扩容
ConcurrentHashMap.Node[] nt;
if (sc >>> 16 != rs || sc == rs + 1 || sc == rs + '\uffff' || (nt = this.nextTable) == null || this.transferIndex <= 0) {
break;
}

if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
this.transfer(tab, nt);
}
} else if (U.compareAndSetInt(this, SIZECTL, sc, (rs << 16) + 2)) { //扩容
this.transfer(tab, (ConcurrentHashMap.Node[])null);
}
}
}

}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final ConcurrentHashMap.Node<K, V>[] initTable() {
ConcurrentHashMap.Node[] tab;
while((tab = this.table) == null || tab.length == 0) {
int sc;
//sizeCtl=-1说明有其他线程正在初始化table
if ((sc = this.sizeCtl) < 0) {
Thread.yield();//放弃cpu
//尝试将sizeCtl设置为-1,表示初始化table
} else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = this.table) == null || tab.length == 0) {
int n = sc > 0 ? sc : 16;
ConcurrentHashMap.Node<K, V>[] nt = new ConcurrentHashMap.Node[n];
tab = nt;
this.table = nt;
//sc现在代表为下一次要扩容时的阈值
sc = n - (n >>> 2);
}
break;
} finally {
this.sizeCtl = sc;
}
}
}

return tab;
}

扩容

在此处使用synchronized锁住链表头节点,并不影响其他线程操作其他的链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private final void transfer(ConcurrentHashMap.Node<K, V>[] tab, ConcurrentHashMap.Node<K, V>[] nextTab) {
int n = tab.length;
int stride;
if ((stride = NCPU > 1 ? (n >>> 3) / NCPU : n) < 16) {
stride = 16;
}
//nextTable也是懒惰加载,为空就创建一个容量为2倍的table
if (nextTab == null) {...}

int nextn = nextTab.length;
ConcurrentHashMap.ForwardingNode<K, V> fwd = new ConcurrentHashMap.ForwardingNode(nextTab);
boolean advance = true;
boolean finishing = false;
int i = 0;
int bound = 0;

while(true) {
while(true) {
int sc;
while(!advance) {
if (i >= 0 && i < n && i + n < nextn) {
ConcurrentHashMap.Node f;
//这个桶位置的链表已经被处理完了,需要将这个位置替换为forwardingnode
if ((f = tabAt(tab, i)) == null) {
advance = casTabAt(tab, i, (ConcurrentHashMap.Node)null, fwd);
} else {
int fh;
//链表头已经是forwardingnode了(因为它的hash值是-1)
if ((fh = f.hash) == -1) {
//进入下一轮循环,遍历下一个桶
advance = true;
} else {
//链表头有元素,就需要将它们移动到新的table中,这时候需要将链表锁住,保证线程安全
synchronized(f) {
if (tabAt(tab, i) == f) {
//这个节点是普通链表中的节点,进行普通节点的挪动
if (fh >= 0) {
...
advance = true;
//节点是treebin,需要进行树节点的挪动
} else if (f instanceof ConcurrentHashMap.TreeBin) { ....
advance = true;}
}
}
}
}
} else {...}
}

--i;
if (i < bound && !finishing) {
...
advance = false;
} else {
...
advance = false;
}
}
} else {
advance = false;
}
}
}
}

3.2 JDK1.7及以前

jdk7


下面是jdk1.6

它基于Segment分段锁来提高多并发程序性能。

分段锁是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap,分段锁技术将key分成一个个小的Segment存储,给每一段数据分配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能实现真正的并发。

Segment继承了ReentrantLock,Segment内部拥有一个HashEntry数组类型的成员table,要想对table内的节点修改时需要先获取Segment锁。

ConcurrentHashMap在默认并发级别会创建包含16个Segment对象的数组,其中第N个哈希桶由第N mod 16个锁来保护,这个容量初始化指定后就不能改变了,而且不是懒加载的。

采用锁分段技术,每一个Segment读写操作高度自治,互不影响,同一Segment的写和读是可以并发执行的。但Segment的写入是需要上锁的,因此对同一Segment的并发写入会被阻塞。每个Segment各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。


ConcurrentHashMap成员变量

ConcurrentHashMap实际上是一个Segment数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Mask value for indexing into segments. The upper bits of a
* key's hash code are used to choose the segment.
*/
final int segmentMask; // 用于定位段,大小等于segments数组的大小减 1,是不可变的

/**
* Shift value for indexing within segments.
*/
final int segmentShift; // 用于定位段,大小等于32(hash值的位数)减去对segments的大小取以2为底的对数值,是不可变的

/**
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments; // ConcurrentHashMap的底层结构是一个Segment数组

Segment的定义

Segment是一个小的哈希表HashEntry<K,V>[] table,Segment类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。

通过使用段(Segment)将ConcurrentHashMap划分为不同的部分,ConcurrentHashMap就可以使用不同的锁来控制对哈希表的不同部分的修改,从而允许多个修改操作并发进行, 这正是ConcurrentHashMap锁分段技术的核心内涵。

https://github.com/CodePrometheus/Starry-Notes/blob/master/KeyPoints/ConcurrentHashMap1.6&1.7.md

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
//Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。
static final class Segment<K,V> extends ReentrantLock implements Serializable {

/**
* The number of elements in this segment's region.
*/
transient volatile int count; // Segment中元素的数量,可见的

/**
* Number of updates that alter the size of the table. This is
* used during bulk-read methods to make sure they see a
* consistent snapshot: If modCounts change during a traversal
* of segments computing size or checking containsValue, then
* we might have an inconsistent view of state so (usually)
* must retry.
*/
transient int modCount; //对count的大小造成影响的操作的次数(比如put或者remove操作)

/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
transient int threshold; // 阈值,段中元素的数量超过这个值就会对Segment进行扩容

/**
* The per-segment table.
*/
transient volatile HashEntry<K,V>[] table; // 链表数组

/**
* The load factor for the hash table. Even though this value
* is same for all segments, it is replicated to avoid needing
* links to outer object.
* @serial
*/
final float loadFactor; // 段的负载因子,其值等同于ConcurrentHashMap的负载因子
...
}

HashEntry

HashEntry用来封装具体的键值对,它的属性中key、hash、next都是final修饰的,所以这些属性一旦赋值了就不能再改变了,因此当进行新节点的插入时是插入到头节点,如果进行链表的删除节点,需要重新创建所有节点,再连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;

HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}

@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}

初始化

主要的参数是初始容量,负载因子,并发级别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();

if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments
int sshift = 0; // 大小为 lg(ssize)
int ssize = 1; // 段的数目,segments数组的大小(2的幂次方)
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift; // 用于定位段
segmentMask = ssize - 1; // 用于定位段
this.segments = Segment.newArray(ssize); // 创建segments数组,不是懒加载

if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; // 总的桶数/总的段数
if (c * ssize < initialCapacity)
++c;
int cap = 1; // 每个段所拥有的桶的数目(2的幂次方)
while (cap < c)
cap <<= 1;

for (int i = 0; i < this.segments.length; ++i) // 初始化segments数组
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); // 上锁
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table; // table是Volatile的
int index = hash & (tab.length - 1); // 定位到段中特定的桶
HashEntry<K,V> first = tab[index]; // first指向桶中链表的表头
HashEntry<K,V> e = first;

// 检查该桶中是否存在相同key的结点
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue;
if (e != null) {
// 该桶中存在相同key的结点
oldValue = e.value;
if (!onlyIfAbsent)
// 更新value值
e.value = value;
} else { // 该桶中不存在相同key的结点
oldValue = null;
// 结构性修改,modCount加1
++modCount;
// 创建HashEntry并将其链到表头
tab[index] = new HashEntry<K,V>(key, hash, first, value);
// write-volatile,count值的更新一定要放在最后一步(volatile变量)
count = c;
}
// 返回旧值(该桶中不存在相同key的结点,则返回null)
return oldValue;
} finally {
// 在finally子句中解锁
unlock();
}
}

rehash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 扩容时为了不影响正在进行的读线程,最好的方式是全部节点复制一次并重新添加
// 这里根据扩容时节点迁移的性质,最大可能的重用一部分节点,这个性质跟1.8的HashMap中的高低位是一个道理,必须要求hash值是final的
void rehash() {
HashEntry<K,V>[] oldTable = table; // 扩容前的table
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY) // 已经扩到最大容量,直接返回
return;

/**
* HashEntry的next是final的,resize/rehash时需要重新new,这里的特殊之处就是最大程度重用HashEntry链尾部的一部分,尽量减少重新new的次数
*/

// 新创建一个table,其容量是原来的2倍
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
threshold = (int)(newTable.length * loadFactor); // 新的阈值
int sizeMask = newTable.length - 1; // 用于定位桶
for (int i = 0; i < oldCapacity ; i++) {
// We need to guarantee that any existing reads of old Map can
// proceed. So we cannot yet null out each bin.
HashEntry<K,V> e = oldTable[i]; // 依次指向旧table中的每个桶的链表表头

if (e != null) { // 旧table的该桶中链表不为空
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask; // 重哈希已定位到新桶
if (next == null) // 旧table的该桶中只有一个节点
newTable[idx] = e;
else {
// Reuse trailing consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 这个循环是寻找HashEntry链最大的可重用的尾部
// 这里重用部分中,所有节点的去向相同,它们可以不用被复制
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
// 寻找k值相同的子链,该子链尾节点与父链的尾节点必须是同一个
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}

// JDK直接将子链lastRun放到newTable[lastIdx]桶中
// 把重用部分整体放在扩容后的hash桶中
newTable[lastIdx] = lastRun;

// 对该子链之前的结点,JDK会挨个遍历并把它们复制到新桶中
// 复制不能重用的部分,并把它们插入到rehash后的所在HashEntry链的头部
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable; // 扩容完成
}


get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) {
if (count != 0) { // read-volatile,首先读 count 变量
HashEntry<K,V> e = getFirst(hash); // 获取桶中链表头结点
while (e != null) {
// 查找链中是否存在指定Key的键值对
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null) // 如果读到value域不为 null,直接返回
return v;
// 如果读到value域为null,说明发生了重排序,加锁后重新读取
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null; // 如果不存在,直接返回null
}

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 因为1.6的HashEntry的next指针是final的,所以比普通的链表remove要复杂些,只有被删除节点的后面可以被重用,前面的都要再重新insert一次
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay in list, but all preceding ones need to be cloned.
// 因为next指针是final的,所以删除不能用简单的链表删除,需要把前面的节点都重新复制再插入一次,后面的节点可以重用
// 删除后,后面的可以重用的那部分顺序不变且还是放在最后,前面的被复制的那部分顺序颠倒地放在前面
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value);
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}

在jdk1.7中的一些改变

HashEntry

所以现在在一个链表中

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class HashEntry<K,V> {
final int hash; // hash是final的,1.7的HashMap中不是final的,用final对扩容比较友好
final K key;
volatile V value;
volatile HashEntry<K,V> next; // jdk1.7中next指针不再是final的,改为volatile,使用 setNext 方法(内部用Unsafe的提供的方法)更新

HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}

4.总结

(1)HashMap允许键值对为null,而Hashtable和ConcurrentHashMap不允许

//这里有点疑惑

HashMap是线程不安全的,一般用在单线程环境下,在同一个线程下,如果想要得到key的value,调用get(key)

这里如果返回null,会产生二义性:**1.可能是没有找到对应的key,2.**可能是value为null

hashMap的解决办法是:调用containskey就知道key是否为存在了。如果返回false说明不存在这个key,因为这是单线程下的,所以一定正确

1
2
3
4
5
6
7
8
public V get(Object key) {
HashMap.Node e;
return (e = this.getNode(hash(key), key)) == null ? null : e.value;
//节点不存在或者value为null
}
public boolean containsKey(Object key) {
return this.getNode(hash(key), key) != null;
}

对于Hashtable,整个哈希表只有一个锁,如果一个线程操作这个哈希表,就必须先获得这个锁,假如成功获得锁,调用get(key)得到null,也会出现二义性,但是当调用完containsKey获得结果后再想利用这个结果去进行逻辑处理时可能由于其他线程的并发执行导致错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//Hashtable
public synchronized V get(Object key) {
Hashtable.Entry<?, ?>[] tab = this.table;
int hash = key.hashCode();
int index = (hash & 2147483647) % tab.length;

for(Hashtable.Entry e = tab[index]; e != null; e = e.next) {
if (e.hash == hash && e.key.equals(key)) {
return e.value;
}
}
return null;
}
public synchronized boolean containsKey(Object key) {
Hashtable.Entry<?, ?>[] tab = this.table;
int hash = key.hashCode();
int index = (hash & 2147483647) % tab.length;

for(Hashtable.Entry e = tab[index]; e != null; e = e.next) {
if (e.hash == hash && e.key.equals(key)) {
return true;
}
}

return false;
}

(2)ConcurrentHashMap 和 Hashtable 的区别

  1. 底层数据结构

    JDK1.7的ConcurrentHashMap底层采用分段数组+链表实现,JDK1.8底层采用数组+红黑树/链表实现,而Hashtable则是采用数组+链表实现

  2. 实现线程安全的方式

    • ConcurrentHashMap

      JDK1.7中ConcurrentHashMap对整个桶数组进行了分割分段,每一把锁只锁一个Segment段,所以多线程下,访问不同的Segment段可以并发执行。默认的并发度是16,JDK1.8中ConcurrentHashMap将并发控制的粒度进一步细化,细化到每个桶,并发度等于桶个数,不再将容器分段,而是直接使用数组+链表/红黑树来实现,每次访问只需要对一个桶锁定,利用**CAS(保证桶间并发操作的线程安全)synchronized(保证桶内并发操作的线程安全)**来保证并发更新的安全。

    • Hashtable

      使用synchronized来保证线程安全,只有一把锁(对全表的锁),即方法锁,synchronized修饰普通方法,默认锁对象为当前实例对象this。所以每次只能有一个线程操作哈希表,效率非常低下。

(3)ConcurrentHashMap 扩容

jdk8中,采用多线程扩容。整个扩容过程,通过CAS设置sizeCtl,transferIndex等变量协调多个线程进行并发扩容。多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。

何时触发扩容?

当往hashMap中成功插入一个key/value节点时,有可能触发扩容动作:

当新增完这个节点后,如果链表长度大于树化阈值8,会先判断table的长度是否超过64,如果没有,则会把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。如果大于64,就将链表转换为红黑树。

与此同时,会调用addCount方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达到阈值(table容量*负载因子)时,会触发transfer方法,重新调整节点的位置。

如果当前线程发现此时map正在扩容,则协助扩容

如果准备加入扩容的线程,发现以下情况,放弃扩容,直接返回

a、发现transferIndex=0,即所有node均已分配
b、发现扩容线程已经达到最大扩容线程数

transferIndex 扩容索引

初始值是table的长度,刚开始位于数组的最右边,每个线程最少扩容的桶个数为16,当每次来一个线程进行扩容,transferIndex就向左移动16**(以CAS的方式操作transferIndex变量,确保扩容的并发安全)**,标志还剩多少桶等待扩容,当transferIndex为0时,说明所有桶都被分配了线程扩容。这个扩容线程按照逆序进行节点的迁移工作(逆序遍历扩容)。

其他线程什么时候帮忙扩容?

此时其他线程访问到了ForwardingNode节点,如果这个线程执行的put或remove等写操作,那么就会先帮其扩容。如果这个线程执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素


Java多线程- ConcurrentHashMap
https://vickkkyz.fun/2022/04/10/Java/JUC/ConcurrentHashMap/
作者
Vickkkyz
发布于
2022年4月10日
许可协议