基础容器类ArrayList、LinkedList、HashMap都是非线程安全的,在多线程环境下使用这些容器会出现线程安全问题,为了保证线程安全,java使用内置锁Synchronized提供了一套线程安全的同步容器类,比如Vector、HashTable 、Stack,这些同步容器实现线程安全的方式是在需要同步的方法上添加关键字synchronized,而在线程征用的场景下synchronized会变成重量级锁,严重影响性能,因此有了juc高并发容器
juc高并发容器 是基于无锁编程算法实现的容器类,通过CAS保证操作的原子性,通过volatile保障内存的可见性。
ConcurrentHashMap就是juc包中的一个高并发容器类,是一种线程安全的哈希表。
ConcurrentHashMap对应的基础容器是HashMap
1.HashMap 基础容器HashMap是线程不安全的,在扩容的时候会导致死链。
默认负载因子是0.75,即当HashMap的元素个数(使用的桶的个数)超过容量(一共的桶的个数)的3/4时会触发扩容。
下面均以jdk7为例
假如现在HashMap的桶个数为16,添加元素,直到使用的桶个数为12时,如果再添加元素需要放在新的桶中(根据key的hash值磨桶总个数得到的桶的下标值),需要扩容了,假如现在桶下标为1的位置有3个key-value键值对,链为(1,null)->(35,null)->(16,null)
,在jdk7中后加入的键值对放在链表头,这个添加的顺序就是16,35,1
就在此时,假如有两个线程都要同时往HashMap中添加一个键值对,会先触发扩容,( 扩容后(1,null)和(35,null) 还是在桶下标为1的位置 )
对于线程1,e:1->35->16->null
next:35->16->null
对于线程2,e:1->35->16->null
next:35->16->null
假设线程1先扩容完毕,此时桶下标为1的位置的链是(35,null)->(1,null)
所以现在线程2的节点引用变为:
e:1->null
next: 35->1->null
新的桶1:1->null
e:35->1->null
next:1->null
接着桶1的状况:35->1->null
e:1->null
next:null
接着桶1的状况:1->35->1->null
e:35->1->null
next:1->null
注意: 现在已经造成死链了!!
2.Hashtable Hashtable实现同步是利用synchronized关键字进行锁定,即给整个哈希表进行锁定,每次锁住整张表让线程独占,虽然解决了线程安全问题,但是造成了巨大的资源浪费。
当一个线程正在访问HashTable的同步方法时,其他访问Hashtable同步方法的所有线程都必须进入阻塞或轮询状态,相当于将所有的操作串行化。
Hashtable不允许key和value为null
3.ConcurrentHashMap 3.1 JDK1.8中
重要的属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private static final int MAXIMUM_CAPACITY = 1073741824 ;private static final int DEFAULT_CAPACITY = 16 ;static final int MAX_ARRAY_SIZE = 2147483639 ;private static final int DEFAULT_CONCURRENCY_LEVEL = 16 ;private static final float LOAD_FACTOR = 0.75F ;static final int TREEIFY_THRESHOLD = 8 ;static final int UNTREEIFY_THRESHOLD = 6 ;static final int MIN_TREEIFY_CAPACITY = 64 ;private static final int MIN_TRANSFER_STRIDE = 16 ;private static final int RESIZE_STAMP_BITS = 16 ;private static final int MAX_RESIZERS = 65535 ;private static final int RESIZE_STAMP_SHIFT = 16 ;static final int MOVED = -1 ;static final int TREEBIN = -2 ;static final int RESERVED = -3 ;static final int HASH_BITS = 2147483647 ;transient volatile ConcurrentHashMap.Node<K, V>[] table;private transient volatile ConcurrentHashMap.Node<K, V>[] nextTable;private transient volatile long baseCount;private transient volatile int sizeCtl;private transient volatile int transferIndex;private transient volatile int cellsBusy;private transient volatile ConcurrentHashMap.CounterCell[] counterCells;private transient ConcurrentHashMap.KeySetView<K, V> keySet;private transient ConcurrentHashMap.ValuesView<K, V> values;private transient ConcurrentHashMap.EntrySetView<K, V> entrySet;private static final Unsafe U;private static final long SIZECTL;private static final long TRANSFERINDEX;private static final long BASECOUNT;private static final long CELLSBUSY;private static final long CELLVALUE;private static final int ABASE;private static final int ASHIFT;
一些重要的静态内部类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static class Node <K, V> implements Entry <K, V> {....}static final class ForwardingNode <K, V> extends ConcurrentHashMap .Node<K, V> { ForwardingNode(ConcurrentHashMap.Node<K, V>[] tab) { super (-1 , (Object)null , (Object)null ); this .nextTable = tab; } }static final class TreeBin <K, V> extends ConcurrentHashMap .Node<K, V> {....}static final class TreeNode <K, V> extends ConcurrentHashMap .Node<K, V> {....}
构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (loadFactor > 0.0F && initialCapacity >= 0 && concurrencyLevel > 0 ) { if (initialCapacity < concurrencyLevel) { initialCapacity = concurrencyLevel; } long size = (long )(1.0D + (double )((float )((long )initialCapacity) / loadFactor)); int cap = size >= 1073741824L ? 1073741824 : tableSizeFor((int )size); this .sizeCtl = cap; } else { throw new IllegalArgumentException (); } }
对initialCapacity有点疑惑,很奇怪他存在的意义。问了一下同学,觉得应该是这样的。 定义哈希表在容纳下initialCapacity那么多个元素之前不能扩容,并且哈希表的容量刚开始就大于等于initialCapacity,又因为上面initialCapacity = concurrencyLevel; 保证了至少有concurrencyLevel的容量,也就是这么大的并发度
initialCapacity的作用就是 提前预估table的大小,防止后面多次扩容,消耗性能
get()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public V get (Object key) { int h = spread(key.hashCode()); ConcurrentHashMap.Node[] tab; ConcurrentHashMap.Node e; int n; if ((tab = this .table) != null && (n = tab.length) > 0 && (e = tabAt(tab, n - 1 & h)) != null ) { int eh; Object ek; if ((eh = e.hash) == h) { if ((ek = e.key) == key || ek != null && key.equals(ek)) { return e.val; } } else if (eh < 0 ) { ConcurrentHashMap.Node p; return (p = e.find(h, key)) != null ? p.val : null ; } while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || ek != null && key.equals(ek))) { return e.val; } } } return null ; }
put方法
使用synchronized锁住链表头节点,不影响其他线程操作其他链表
多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 public V put (K key, V value) { return this .putVal(key, value, false ); }final V putVal (K key, V value, boolean onlyIfAbsent) { if (key != null && value != null ) { int hash = spread(key.hashCode()); int binCount = 0 ; ConcurrentHashMap.Node[] tab = this .table; while (true ) { int n; while (tab == null || (n = tab.length) == 0 ) { tab = this .initTable(); } ConcurrentHashMap.Node f; int i; if ((f = tabAt(tab, i = n - 1 & hash)) == null ) { if (casTabAt(tab, i, (ConcurrentHashMap.Node)null , new ConcurrentHashMap .Node(hash, key, value))) { break ; } } else { int fh; if ((fh = f.hash) == -1 ) { tab = this .helpTransfer(tab, f); } else { Object fk; Object fv; if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || fk != null && key.equals(fk)) && (fv = f.val) != null ) { return fv; } V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh < 0 ) { if (f instanceof ConcurrentHashMap.TreeBin) { binCount = 2 ; ConcurrentHashMap.TreeNode p; if ((p = ((ConcurrentHashMap.TreeBin)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) { p.val = value; } } } else if (f instanceof ConcurrentHashMap.ReservationNode) { throw new IllegalStateException ("Recursive update" ); } } else { label124: { binCount = 1 ; ConcurrentHashMap.Node e; Object ek; for (e = f; e.hash != hash || (ek = e.key) != key && (ek == null || !key.equals(ek)); ++binCount) { ConcurrentHashMap.Node<K, V> pred = e; if ((e = e.next) == null ) { pred.next = new ConcurrentHashMap .Node(hash, key, value); break label124; } } oldVal = e.val; if (!onlyIfAbsent) { e.val = value; } } } } } if (binCount != 0 ) { if (binCount >= 8 ) { this .treeifyBin(tab, i); } if (oldVal != null ) { return oldVal; } break ; } } } } this .addCount(1L , binCount); return null ; } else { throw new NullPointerException (); } }
helpTransfer
其他线程帮忙扩容,并发扩容
Cmap 支持并发扩容,实现方式是,将哈希表拆分,让每个线程处理自己的区间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final ConcurrentHashMap.Node<K, V>[] helpTransfer(ConcurrentHashMap.Node<K, V>[] tab, ConcurrentHashMap.Node<K, V> f) { ConcurrentHashMap.Node[] nextTab; if (tab != null && f instanceof ConcurrentHashMap.ForwardingNode && (nextTab = ((ConcurrentHashMap.ForwardingNode)f).nextTable) != null ) { int rs = resizeStamp(tab.length); int sc; while (nextTab == this .nextTable && this .table == tab && (sc = this .sizeCtl) < 0 && sc >>> 16 == rs && sc != rs + 1 && sc != rs + '\uffff' && this .transferIndex > 0 ) { if (U.compareAndSetInt(this , SIZECTL, sc, sc + 1 )) { this .transfer(tab, nextTab); break ; } } return nextTab; } else { return this .table; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private final void transfer (ConcurrentHashMap.Node<K, V>[] tab, ConcurrentHashMap.Node<K, V>[] nextTab) { int n = tab.length; int stride; if ((stride = NCPU > 1 ? (n >>> 3 ) / NCPU : n) < 16 ) { stride = 16 ; } if (nextTab == null ) { try { ConcurrentHashMap.Node<K, V>[] nt = new ConcurrentHashMap .Node[n << 1 ]; nextTab = nt; } catch (Throwable var27) { this .sizeCtl = 2147483647 ; return ; } this .nextTable = nextTab; this .transferIndex = n; }
addCount
维护哈希表的size,增加size的计数,如果超过阈值,扩容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private final void addCount (long x, int check) { ConcurrentHashMap.CounterCell[] cs; long b; long s; int sc; if ((cs = this .counterCells) != null || !U.compareAndSetLong(this , BASECOUNT, b = this .baseCount, s = b + x)) { boolean uncontended = true ; ConcurrentHashMap.CounterCell c; long v; if (cs == null || (sc = cs.length - 1 ) < 0 || (c = cs[ThreadLocalRandom.getProbe() & sc]) == null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { this .fullAddCount(x, uncontended); return ; } if (check <= 1 ) { return ; } s = this .sumCount(); } int n; ConcurrentHashMap.Node[] tab; if (check >= 0 ) { for (; s >= (long )(sc = this .sizeCtl) && (tab = this .table) != null && (n = tab.length) < 1073741824 ; s = this .sumCount()) { int rs = resizeStamp(n); if (sc < 0 ) { ConcurrentHashMap.Node[] nt; if (sc >>> 16 != rs || sc == rs + 1 || sc == rs + '\uffff' || (nt = this .nextTable) == null || this .transferIndex <= 0 ) { break ; } if (U.compareAndSetInt(this , SIZECTL, sc, sc + 1 )) { this .transfer(tab, nt); } } else if (U.compareAndSetInt(this , SIZECTL, sc, (rs << 16 ) + 2 )) { this .transfer(tab, (ConcurrentHashMap.Node[])null ); } } } }
初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private final ConcurrentHashMap.Node<K, V>[] initTable() { ConcurrentHashMap.Node[] tab; while ((tab = this .table) == null || tab.length == 0 ) { int sc; if ((sc = this .sizeCtl) < 0 ) { Thread.yield (); } else if (U.compareAndSetInt(this , SIZECTL, sc, -1 )) { try { if ((tab = this .table) == null || tab.length == 0 ) { int n = sc > 0 ? sc : 16 ; ConcurrentHashMap.Node<K, V>[] nt = new ConcurrentHashMap .Node[n]; tab = nt; this .table = nt; sc = n - (n >>> 2 ); } break ; } finally { this .sizeCtl = sc; } } } return tab; }
扩容
在此处使用synchronized锁住链表头节点,并不影响其他线程操作其他的链表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private final void transfer (ConcurrentHashMap.Node<K, V>[] tab, ConcurrentHashMap.Node<K, V>[] nextTab) { int n = tab.length; int stride; if ((stride = NCPU > 1 ? (n >>> 3 ) / NCPU : n) < 16 ) { stride = 16 ; } if (nextTab == null ) {...} int nextn = nextTab.length; ConcurrentHashMap.ForwardingNode<K, V> fwd = new ConcurrentHashMap .ForwardingNode(nextTab); boolean advance = true ; boolean finishing = false ; int i = 0 ; int bound = 0 ; while (true ) { while (true ) { int sc; while (!advance) { if (i >= 0 && i < n && i + n < nextn) { ConcurrentHashMap.Node f; if ((f = tabAt(tab, i)) == null ) { advance = casTabAt(tab, i, (ConcurrentHashMap.Node)null , fwd); } else { int fh; if ((fh = f.hash) == -1 ) { advance = true ; } else { synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { ... advance = true ; } else if (f instanceof ConcurrentHashMap.TreeBin) { .... advance = true ;} } } } } } else {...} } --i; if (i < bound && !finishing) { ... advance = false ; } else { ... advance = false ; } } } else { advance = false ; } } } }
3.2 JDK1.7及以前 jdk7
下面是jdk1.6
它基于Segment分段锁来提高多并发程序性能。
分段锁是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap,分段锁技术将key分成一个个小的Segment存储,给每一段数据分配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能实现真正的并发。
Segment继承了ReentrantLock,Segment内部拥有一个HashEntry数组类型的成员table,要想对table内的节点修改时需要先获取Segment锁。
ConcurrentHashMap在默认并发级别会创建包含16个Segment对象的数组,其中第N个哈希桶由第N mod 16个锁来保护,这个容量初始化指定后就不能改变了,而且不是懒加载的。
采用锁分段技术,每一个Segment读写操作高度自治,互不影响 ,同一Segment的写和读是可以并发执行的。但Segment的写入是需要上锁的 ,因此对同一Segment的并发写入会被阻塞。每个Segment各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。
ConcurrentHashMap成员变量
ConcurrentHashMap实际上是一个Segment数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final int segmentMask; final int segmentShift; final Segment<K,V>[] segments;
Segment的定义
Segment是一个小的哈希表HashEntry<K,V>[] table
,Segment类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。
通过使用段(Segment)将ConcurrentHashMap划分为不同的部分,ConcurrentHashMap就可以使用不同的锁来控制对哈希表的不同部分的修改,从而允许多个修改操作并发进行, 这正是ConcurrentHashMap锁分段技术的核心内涵。
https://github.com/CodePrometheus/Starry-Notes/blob/master/KeyPoints/ConcurrentHashMap1.6&1.7.md
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 static final class Segment <K,V> extends ReentrantLock implements Serializable { transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry<K,V>[] table; final float loadFactor; ... }
HashEntry
HashEntry用来封装具体的键值对,它的属性中key、hash、next都是final 修饰的,所以这些属性一旦赋值了就不能再改变了,因此当进行新节点的插入时是插入到头节点 ,如果进行链表的删除节点,需要重新创建所有节点,再连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static final class HashEntry <K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this .key = key; this .hash = hash; this .next = next; this .value = value; } @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V>[] newArray(int i) { return new HashEntry [i]; } }
初始化
主要的参数是初始容量,负载因子,并发级别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException (); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } segmentShift = 32 - sshift; segmentMask = ssize - 1 ; this .segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1 ; while (cap < c) cap <<= 1 ; for (int i = 0 ; i < this .segments.length; ++i) this .segments[i] = new Segment <K,V>(cap, loadFactor); }
put
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public V put (K key, V value) { if (value == null ) throw new NullPointerException (); int hash = hash(key.hashCode()); return segmentFor(hash).put(key, hash, value, false ); }final Segment<K,V> segmentFor (int hash) { return segments[(hash >>> segmentShift) & segmentMask]; } V put (K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1 ); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null ) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null ; ++modCount; tab[index] = new HashEntry <K,V>(key, hash, first, value); count = c; } return oldValue; } finally { unlock(); } }
rehash
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 void rehash () { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return ; HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1 ); threshold = (int )(newTable.length * loadFactor); int sizeMask = newTable.length - 1 ; for (int i = 0 ; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null ) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null ) newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null ; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry <K,V>(p.key, p.hash, n, p.value); } } } } table = newTable; }
get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public V get (Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); } V get (Object key, int hash) { if (count != 0 ) { HashEntry<K,V> e = getFirst(hash); while (e != null ) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null ) return v; return readValueUnderLock(e); } e = e.next; } } return null ; }
remove
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 V remove (Object key, int hash, Object value) { lock(); try { int c = count - 1 ; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1 ); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null ; if (e != null ) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry <K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; } } return oldValue; } finally { unlock(); } }
在jdk1.7中的一些改变
HashEntry
所以现在在一个链表中
1 2 3 4 5 6 7 8 9 10 11 12 13 static final class HashEntry <K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this .hash = hash; this .key = key; this .value = value; this .next = next; } }
4.总结 (1)HashMap允许键值对为null,而Hashtable和ConcurrentHashMap不允许 //这里有点疑惑
HashMap是线程不安全的,一般用在单线程环境下,在同一个线程下,如果想要得到key的value,调用get(key)
这里如果返回null,会产生二义性:**1.可能是没有找到对应的key, 2.**可能是value为null
hashMap的解决办法是:调用containskey就知道key是否为存在了。如果返回false说明不存在这个key,因为这是单线程下的,所以一定正确
1 2 3 4 5 6 7 8 public V get (Object key) { HashMap.Node e; return (e = this .getNode(hash(key), key)) == null ? null : e.value; }public boolean containsKey (Object key) { return this .getNode(hash(key), key) != null ; }
对于Hashtable,整个哈希表只有一个锁,如果一个线程操作这个哈希表,就必须先获得这个锁,假如成功获得锁,调用get(key)得到null,也会出现二义性,但是当调用完containsKey获得结果后再想利用这个结果去进行逻辑处理时可能由于其他线程的并发执行导致错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public synchronized V get (Object key) { Hashtable.Entry<?, ?>[] tab = this .table; int hash = key.hashCode(); int index = (hash & 2147483647 ) % tab.length; for (Hashtable.Entry e = tab[index]; e != null ; e = e.next) { if (e.hash == hash && e.key.equals(key)) { return e.value; } } return null ; }public synchronized boolean containsKey (Object key) { Hashtable.Entry<?, ?>[] tab = this .table; int hash = key.hashCode(); int index = (hash & 2147483647 ) % tab.length; for (Hashtable.Entry e = tab[index]; e != null ; e = e.next) { if (e.hash == hash && e.key.equals(key)) { return true ; } } return false ; }
(2)ConcurrentHashMap 和 Hashtable 的区别
底层数据结构
JDK1.7的ConcurrentHashMap底层采用分段数组+链表 实现,JDK1.8底层采用数组+红黑树/链表 实现,而Hashtable则是采用数组+链表 实现
实现线程安全的方式
ConcurrentHashMap
JDK1.7中ConcurrentHashMap对整个桶数组进行了分割分段,每一把锁只锁一个Segment段,所以多线程下,访问不同的Segment段可以并发执行。默认的并发度是16,JDK1.8中ConcurrentHashMap将并发控制的粒度进一步细化,细化到每个桶,并发度等于桶个数,不再将容器分段,而是直接使用数组+链表/红黑树来实现,每次访问只需要对一个桶锁定,利用**CAS(保证桶间并发操作的线程安全)和 synchronized(保证桶内并发操作的线程安全)**来保证并发更新的安全。
Hashtable
使用synchronized来保证线程安全,只有一把锁(对全表的锁),即方法锁,synchronized修饰普通方法,默认锁对象为当前实例对象this。所以每次只能有一个线程操作哈希表,效率非常低下。
(3)ConcurrentHashMap 扩容 jdk8中,采用多线程扩容。整个扩容过程,通过CAS设置sizeCtl,transferIndex等变量协调多个线程进行并发扩容。多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量 ,协调多个线程对table数组中的node进行迁移。
何时触发扩容?
当往hashMap中成功插入一个key/value节点时,有可能触发扩容动作:
当新增完这个节点后,如果链表长度大于树化阈值8,会先判断table的长度是否超过64,如果没有,则会把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。如果大于64,就将链表转换为红黑树。
与此同时, 会调用addCount方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达到阈值(table容量*负载因子)时,会触发transfer方法,重新调整节点的位置。
如果当前线程发现此时map正在扩容,则协助扩容
如果准备加入扩容的线程,发现以下情况,放弃扩容,直接返回
a、发现transferIndex=0,即所有node均已分配 b、发现扩容线程已经达到最大扩容线程数
transferIndex 扩容索引
初始值是table的长度,刚开始位于数组的最右边,每个线程最少扩容的桶个数为16,当每次来一个线程进行扩容,transferIndex就向左移动16**(以CAS的方式操作transferIndex变量,确保扩容的并发安全)**,标志还剩多少桶等待扩容,当transferIndex为0时,说明所有桶都被分配了线程扩容。这个扩容线程按照逆序进行节点的迁移工作(逆序遍历扩容 )。
其他线程什么时候帮忙扩容?
此时其他线程访问到了ForwardingNode节点,如果这个线程执行的put或remove等写操作,那么就会先帮其扩容。如果这个线程执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素