ConcurrentHashMap实现原理(一)

前言

最近闲下来,接着把自己所理解的一些内容做个整理,接着上次的HashMap继续看ConcurrentHashMap的源码,探究下其实现原理,源码用的JDK1.8版本。

ConcurrentHashMap和HashMap区别在于ConcurrentHashMap是线程安全的,它内部维护了多个Segment,各个Segment内部包含了Node节点,也就是键值对的集合,对外部来说,并发读写只会阻塞某个Segment而不会影响到其他,兼顾了线程安全和访问速度。

初始化

ConcurrentHashMap的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
//initialCapacity 初始容量,不指定默认为16,相当于Node节点的初始化数组长度
//loadFactor 负载因子,不指定默认为0.75,达到initialCapacity * loadFactor 会触发扩容
//concurrencyLevel 并发级别,不指定默认为1,
//private static final int MAXIMUM_CAPACITY = 1 << 30
//若大于最大值则为最大值,若不为2的幂就转变为大于入参的最小2的幂
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

sizeCtl:控制标识符,

  • 负数代表正在进行初始化或扩容操作

  • -1代表正在初始化或扩容

  • -N 表示有N-1个线程正在进行扩容操作

  • 正数或0代表hash表还没有被初始化,初始化完毕后代表扩容阈值。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的

Segment:

1
2
3
4
5
static class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
final float loadFactor;
Segment(float lf) { this.loadFactor = lf; }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

PUT过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;;++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
  1. 首先判定key,value是否有null值,有则抛出异常

  2. 获取key的hashcode值暂记为hashcode,计算hashcode异或运算hashcode无符号右移16位后在和HASH_BIT与运算,获取hash值,0x7fffffff为最大的INT数值,相当2^31-1

    1
    2
    3
    4
       static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
    }
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
  3. 判定Node节点数组table是否存在,不存在则初始化table

    1
    transient volatile Node<K,V>[] table;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
    Thread.yield(); // lost initialization race; just spin
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    try {
    if ((tab = table) == null || tab.length == 0) {
    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    @SuppressWarnings("unchecked")
    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    table = tab = nt;
    sc = n - (n >>> 2);
    }
    } finally {
    sizeCtl = sc;
    }
    break;
    }
    }
    return tab;
    }

    初始化过程

    1. sizeCtl是状态标识,小于零代表已有线程正在进行初始化操作,其他线程进入阻塞状态,第二次判断,这里使用的是CAS来保证线程安全
    2. 初始化容量小于0,修正为默认容量,最后计算的出sizeCtl新的阈值
  4. 判断是否是第一次插入,是就通过CAS来直接替换内存中的值

    1. 前面了解了HashMap的可以知道(n - 1) & hash就是table中的索引值(在hashMap里面解释了原因),这里还有个原因为什么使用Unsafe中的getObjectVolatile方法,不直接通过索引来获取呢?原因是getObjectVolatile获取的是内存中的最新的值,而用索引获取哪怕是用了volite关键字修饰,但是线程中储存的一直是副本值,依旧存在旧值的可能。

    2. 1
      2
      3
      static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
      return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
      }
    3. 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      static {
      try {
      U = sun.misc.Unsafe.getUnsafe();
      Class<?> k = ConcurrentHashMap.class;
      SIZECTL = U.objectFieldOffset
      (k.getDeclaredField("sizeCtl"));
      TRANSFERINDEX = U.objectFieldOffset
      (k.getDeclaredField("transferIndex"));
      BASECOUNT = U.objectFieldOffset
      (k.getDeclaredField("baseCount"));
      CELLSBUSY = U.objectFieldOffset
      (k.getDeclaredField("cellsBusy"));
      Class<?> ck = CounterCell.class;
      CELLVALUE = U.objectFieldOffset
      (ck.getDeclaredField("value"));
      Class<?> ak = Node[].class;
      //获取数组元素的偏移地址
      ABASE = U.arrayBaseOffset(ak);
      //获取数组元素的转换因子,它和偏移地址一起可以定位数组中每个元素在内存中的位置
      int scale = U.arrayIndexScale(ak);
      if ((scale & (scale - 1)) != 0)
      throw new Error("data type scale not a power of two");
      //Integer.numberOfLeadingZeros(scale)可以求出无符号整型i的最高非零位前面的0个数,如果i为负数,这个方法将会返回0,符号位为1.
      //这样可以求出scale二进制的位数
      ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
      } catch (Exception e) {
      throw new Error(e);
      }
      }
    4. 确认第一次插入的情况下,CAS执行更新

  5. 第三种情况,当table的hash值为一个特殊值MOVED,表示当前哈希表正在执行resizing操作,需要当前线程去帮组做resizing操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
    (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
    int rs = resizeStamp(tab.length);
    while (nextTab == nextTable && table == tab &&
    (sc = sizeCtl) < 0) {
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || transferIndex <= 0)
    break;
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
    transfer(tab, nextTab);
    break;
    }
    }
    return nextTab;
    }
    return table;
    }

    static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

    ForwardingNode—— A node inserted at head of bins during transfer operations.他是在扩容操作中的一个插入在桶头部的特殊节点,他的含义表明,这个桶已经完成了扩容操作,但是整个哈希表扩容操作还没有结束,如果检测到这种节点,当前线程会被要求一起来完成部分扩容操作

    1. 首先检测是否存在ForwardingNode

    2. resizeStamp(tab.length) 获取一个16位的扩容标识,

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
      int n = tab.length, stride;
      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
      stride = MIN_TRANSFER_STRIDE; // subdivide range
      if (nextTab == null) { // initiating
      try {
      @SuppressWarnings("unchecked")
      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
      nextTab = nt;
      } catch (Throwable ex) { // try to cope with OOME
      sizeCtl = Integer.MAX_VALUE;
      return;
      }
      nextTable = nextTab;
      transferIndex = n;
      }
      int nextn = nextTab.length;
      ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
      boolean advance = true;
      boolean finishing = false; // to ensure sweep before committing nextTab
      for (int i = 0, bound = 0;;) {
      Node<K,V> f; int fh;
      while (advance) {
      int nextIndex, nextBound;
      if (--i >= bound || finishing)
      advance = false;
      else if ((nextIndex = transferIndex) <= 0) {
      i = -1;
      advance = false;
      }
      else if (U.compareAndSwapInt
      (this, TRANSFERINDEX, nextIndex,
      nextBound = (nextIndex > stride ?
      nextIndex - stride : 0))) {
      bound = nextBound;
      i = nextIndex - 1;
      advance = false;
      }
      }
      if (i < 0 || i >= n || i + n >= nextn) {
      int sc;
      if (finishing) {
      nextTable = null;
      table = nextTab;
      sizeCtl = (n << 1) - (n >>> 1);
      return;
      }
      if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
      if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
      return;
      finishing = advance = true;
      i = n; // recheck before commit
      }
      }
      else if ((f = tabAt(tab, i)) == null)
      advance = casTabAt(tab, i, null, fwd);
      else if ((fh = f.hash) == MOVED)
      advance = true; // already processed
      else {
      synchronized (f) {
      if (tabAt(tab, i) == f) {
      Node<K,V> ln, hn;
      if (fh >= 0) {
      int runBit = fh & n;
      Node<K,V> lastRun = f;
      for (Node<K,V> p = f.next; p != null; p = p.next) {
      int b = p.hash & n;
      if (b != runBit) {
      runBit = b;
      lastRun = p;
      }
      }
      if (runBit == 0) {
      ln = lastRun;
      hn = null;
      }
      else {
      hn = lastRun;
      ln = null;
      }
      for (Node<K,V> p = f; p != lastRun; p = p.next) {
      int ph = p.hash; K pk = p.key; V pv = p.val;
      if ((ph & n) == 0)
      ln = new Node<K,V>(ph, pk, pv, ln);
      else
      hn = new Node<K,V>(ph, pk, pv, hn);
      }
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
      }
      else if (f instanceof TreeBin) {
      TreeBin<K,V> t = (TreeBin<K,V>)f;
      TreeNode<K,V> lo = null, loTail = null;
      TreeNode<K,V> hi = null, hiTail = null;
      int lc = 0, hc = 0;
      for (Node<K,V> e = t.first; e != null; e = e.next) {
      int h = e.hash;
      TreeNode<K,V> p = new TreeNode<K,V>
      (h, e.key, e.val, null, null);
      if ((h & n) == 0) {
      if ((p.prev = loTail) == null)
      lo = p;
      else
      loTail.next = p;
      loTail = p;
      ++lc;
      }
      else {
      if ((p.prev = hiTail) == null)
      hi = p;
      else
      hiTail.next = p;
      hiTail = p;
      ++hc;
      }
      }
      ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
      (hc != 0) ? new TreeBin<K,V>(lo) : t;
      hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
      (lc != 0) ? new TreeBin<K,V>(hi) : t;
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
      }
      }
      }
      }
      }
      }
  1. 第四种情况,通常情况的一般处理

    1. 先对table进行加锁处理,再次确认值未被修改
    2. 判断fh(table的hash值)大于0标识,该table为链表结构,key值相等则替换,onlyIfAbsent只有缺失才替换,这里也用到了&&的短路特性
    3. 当fh(table的hash值)不大于0,判断是否为树形结构,1.8在多数据的时候会转变为红黑树,替换对应的value值即可
    4. 最后链表插入完成后会再次检测插入数量是否超过 8个时,是则会转化成红黑树结构存储

相关:

  1. java8—ConcurrentHashMap实现原理
  2. Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析