ConcurrentHashMap实现原理(一)
前言
最近闲下来,接着把自己所理解的一些内容做个整理,接着上次的HashMap继续看ConcurrentHashMap的源码,探究下其实现原理,源码用的JDK1.8版本。
ConcurrentHashMap和HashMap区别在于ConcurrentHashMap是线程安全的,它内部维护了多个Segment,各个Segment内部包含了Node节点,也就是键值对的集合,对外部来说,并发读写只会阻塞某个Segment而不会影响到其他,兼顾了线程安全和访问速度。
初始化
ConcurrentHashMap的初始化
1 |
|
sizeCtl:控制标识符,
负数代表正在进行初始化或扩容操作
-1代表正在初始化或扩容
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,初始化完毕后代表扩容阈值。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的
Segment:
1 | static class Segment<K,V> extends ReentrantLock implements Serializable { |
1 |
|
PUT过程
1 | public V put(K key, V value) { |
首先判定key,value是否有null值,有则抛出异常
获取key的hashcode值暂记为hashcode,计算hashcode异或运算hashcode无符号右移16位后在和HASH_BIT与运算,获取hash值,0x7fffffff为最大的INT数值,相当2^31-1
1
2
3
4static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash判定Node节点数组table是否存在,不存在则初始化table
1
transient volatile Node<K,V>[] table;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}初始化过程
- sizeCtl是状态标识,小于零代表已有线程正在进行初始化操作,其他线程进入阻塞状态,第二次判断,这里使用的是CAS来保证线程安全
- 初始化容量小于0,修正为默认容量,最后计算的出sizeCtl新的阈值
判断是否是第一次插入,是就通过CAS来直接替换内存中的值
前面了解了HashMap的可以知道(n - 1) & hash就是table中的索引值(在hashMap里面解释了原因),这里还有个原因为什么使用Unsafe中的getObjectVolatile方法,不直接通过索引来获取呢?原因是getObjectVolatile获取的是内存中的最新的值,而用索引获取哪怕是用了volite关键字修饰,但是线程中储存的一直是副本值,依旧存在旧值的可能。
```java
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
3. ```java
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
//获取数组元素的偏移地址
ABASE = U.arrayBaseOffset(ak);
//获取数组元素的转换因子,它和偏移地址一起可以定位数组中每个元素在内存中的位置
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//Integer.numberOfLeadingZeros(scale)可以求出无符号整型i的最高非零位前面的0个数,如果i为负数,这个方法将会返回0,符号位为1.
//这样可以求出scale二进制的位数
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}确认第一次插入的情况下,CAS执行更新
第三种情况,当table的hash值为一个特殊值
MOVED
,表示当前哈希表正在执行resizing操作,需要当前线程去帮组做resizing操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}ForwardingNode
—— A node inserted at head of bins during transfer operations.他是在扩容操作中的一个插入在桶头部的特殊节点,他的含义表明,这个桶已经完成了扩容操作,但是整个哈希表扩容操作还没有结束,如果检测到这种节点,当前线程会被要求一起来完成部分扩容操作首先检测是否存在
ForwardingNode
resizeStamp(tab.length) 获取一个16位的扩容标识,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
第四种情况,通常情况的一般处理
- 先对table进行加锁处理,再次确认值未被修改
- 判断fh(table的hash值)大于0标识,该table为链表结构,key值相等则替换,onlyIfAbsent只有缺失才替换,这里也用到了&&的短路特性
- 当fh(table的hash值)不大于0,判断是否为树形结构,1.8在多数据的时候会转变为红黑树,替换对应的value值即可
- 最后链表插入完成后会再次检测插入数量是否超过 8个时,是则会转化成红黑树结构存储
相关:
ConcurrentHashMap实现原理(一)