ConcurrentHashMap实现原理(二)

扩容

继续接上文,分析扩容的原理,这里我没能完全弄懂他做了什么,一些问题也记录下来希望以后可以找到答案

ConcurrentHashMap 的扩容是支持并发的,可以大致分为三个步骤:

  1. 扩容准备
    1
    2
    3
    4
       /** Number of CPUS, to place bounds on some sizings */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    private static final int MIN_TRANSFER_STRIDE = 16;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //核数越大,就取(n >>> 3) / NCPU,显然这个值会很小(如果n=16的话),没理解
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // subdivide range
    //这里判断为null,确保只有一个线程获取
    if (nextTab == null) { // initiating
    try {
    @SuppressWarnings("unchecked")
    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
    nextTab = nt;
    } catch (Throwable ex) { // try to cope with OOME
    sizeCtl = Integer.MAX_VALUE;
    return;
    }
    nextTable = nextTab;
    transferIndex = n;
    }

    NCPU是 返回可用处理器的Java虚拟机的数量

    stride可以译为步幅,代表一个线程处理的节点数量,最小为16,在对多核处理器时,可以允许更大的步幅

    transferIndex表示每次迁移任务的下标(从后往前),如果是第一次那么为transferIndextransferIndex - stride,每次任务分配完毕会被修改

    这一步定义了之后会用到的相关变量,并允许有且只有一条线程来创建新的Node数组,方便迁移

  2. 迁移任务分配
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
    Node<K,V> f; int fh;
    while (advance) {
    int nextIndex, nextBound;
    //--i >= bound代表的含义是不满足则表示代表当前任务完成,需要将i减一,然后advance设置为false,意为分配下一个任务,第一次进入就是这样,finishing为true代表整个扩容操作完成,自然不需要再操作
    if (--i >= bound || finishing)
    advance = false;
    //如果transferIndex<0证明区间任务已经分配完毕,将advance设置为false,退出while循环,i设置为-1,在下方的if与剧中判断
    else if ((nextIndex = transferIndex) <= 0) {
    i = -1;
    advance = false;
    }
    //CAS分配任务给各个线程
    else if (U.compareAndSwapInt
    (this, TRANSFERINDEX, nextIndex,
    nextBound = (nextIndex > stride ?
    nextIndex - stride : 0))) {
    //最小下标
    bound = nextBound;
    //最大下标
    i = nextIndex - 1;
    //
    advance = false;
    }
    }
    //i<0只可能是进入上面的第一种或者第二种情况,代表扩容结束或者分配完毕
    //i >= n感觉不可能出现这种情况,如前所述,那么只有i = nextIndex - 1这里赋值修改了i,而n就是table的长度,nextIndex只会越来越小
    //
    if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    //如果扩容完成
    if (finishing) {
    nextTable = null;
    table = nextTab;
    sizeCtl = (n << 1) - (n >>> 1);
    return;
    }
    //如果扩容未完成,修改sc的值,表示的协助扩容的线程数-1
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //不等于代表扩容未结束,直接return退出方法,否则就更新finishing表示扩容结束
    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
    return;
    finishing = advance = true;
    i = n; // recheck before commit
    }
    }
    //获取旧table的i位置的节点,如果是null,插入fwd表示已经扩容了
    else if ((f = tabAt(tab, i)) == null)
    advance = casTabAt(tab, i, null, fwd);
    //如果hash值是moved证明已被处理过,因为moved是表示该节点正在扩容,证明以有线程处理了
    else if ((fh = f.hash) == MOVED)
    advance = true; // already processed

    ForwardingNode 前面说过表示该节点正在被迁移,在扩容之中,假设tableA中的i位置的节点正在或者已经被迁移到TableB中,那么i位置就会被放上一个ForwardingNode表示已经被迁移

    advance 任务分配标识,true表示经分配完当前任务

    finishing 扩容完成标识, true标识扩容完成

    i 是位置索引,bound 是当前线程可以处理的当前任务区间最小下标

  3. 迁移
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
            else {
    synchronized (f) {
    //再次判断节点是否相同
    if (tabAt(tab, i) == f) {
    Node<K,V> ln, hn;
    //因为TreeBin 的 hash 是 -2,所以这里true代表是链表
    if (fh >= 0) {
    //这里结果只有两种一种是0一种为第n位为1的某个数
    int runBit = fh & n;
    //将第i位的节点复制给lastRun
    Node<K,V> lastRun = f;
    //遍历这个链表
    for (Node<K,V> p = f.next; p != null; p = p.next) {
    int b = p.hash & n;
    if (b != runBit) {
    runBit = b;
    lastRun = p;
    }
    }
    if (runBit == 0) {
    ln = lastRun;
    hn = null;
    }
    else {
    hn = lastRun;
    ln = null;
    }
    for (Node<K,V> p = f; p != lastRun; p = p.next) {
    int ph = p.hash; K pk = p.key; V pv = p.val;
    if ((ph & n) == 0)
    ln = new Node<K,V>(ph, pk, pv, ln);
    else
    hn = new Node<K,V>(ph, pk, pv, hn);
    }
    setTabAt(nextTab, i, ln);
    setTabAt(nextTab, i + n, hn);
    setTabAt(tab, i, fwd);
    advance = true;
    }
    else if (f instanceof TreeBin) {
    TreeBin<K,V> t = (TreeBin<K,V>)f;
    TreeNode<K,V> lo = null, loTail = null;
    TreeNode<K,V> hi = null, hiTail = null;
    int lc = 0, hc = 0;
    for (Node<K,V> e = t.first; e != null; e = e.next) {
    int h = e.hash;
    TreeNode<K,V> p = new TreeNode<K,V>
    (h, e.key, e.val, null, null);
    if ((h & n) == 0) {
    if ((p.prev = loTail) == null)
    lo = p;
    else
    loTail.next = p;
    loTail = p;
    ++lc;
    }
    else {
    if ((p.prev = hiTail) == null)
    hi = p;
    else
    hiTail.next = p;
    hiTail = p;
    ++hc;
    }
    }
    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
    (hc != 0) ? new TreeBin<K,V>(lo) : t;
    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
    (lc != 0) ? new TreeBin<K,V>(hi) : t;
    setTabAt(nextTab, i, ln);
    setTabAt(nextTab, i + n, hn);
    setTabAt(tab, i, fwd);
    advance = true;
    }
    }
    }
    }
    }
    }
    1. 其他情况表示该节点不为null,也没有moved,则将该节点锁住,防止其他线程在往其中插入数据
    2. 判断该节点是链表还是红黑树,分别处理
    3. 当为链表时,根据和table.length与运算可以将链表一分为二