Analysis of source code of ConcurrentHashMap: put method

  • 2021-09-12 01:16:54
  • OfStack

Chapter 1: Preheating (internal analysis of 1 small method)

put() The method is the key method of concurrent HashMap source code analysis, which involves concurrent expansion, bucket addressing and so on … Structure diagram of JDK1.8 ConcurrentHashMap:

1. Source code analysis of put method


// 向并发Map中put1个数据
public V put(K key, V value) {
    return putVal(key, value, false);
}
// 向并发Map中put1个数据
// Key: 数据的键
// value:数据的值
// onlyIfAbsent:是否替换数据:
// 如果为false,则当put数据时,遇到Map中有相同K,V的数据,则将其替换
// 如果为true,则当put数据时,遇到Map中有相同K,V的数据,则不做替换,不插入
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 控制k 和 v 不能为null
    if (key == null || value == null) throw new NullPointerException();
    // 通过spread方法,可以让高位也能参与进寻址运算,使最终得到的hash值更分散
    int hash = spread(key.hashCode());
    // binCount表示当前k-v 封装成node插入到指定桶位后,在桶位中的所属链表的下标位置
    // =0 表示当前桶位为null,node可以直接放入
    // >0 表示当前桶位中有节点,遍历链表,将封装k-v的新node放入链表末尾,并记录链表末尾的下标binCount
    // =2 表示当前桶位**可能**已经树化为红黑树了
    int binCount = 0;
    // tab 引用map对象的table
	// 自旋
    for (Node<K,V>[] tab = table;;) {
        // f 表示桶位的头结点
        // n 表示散列表数组的长度
        // i 表示key通过寻址计算后,得到的桶位下标
        // fh 表示桶位头结点的hash值
        Node<K,V> f; int n, i, fh;
        // -----------------------------------------------------------------------------
        // CASE1:成立,表示当前map中的table尚未初始化...
        if (tab == null || (n = tab.length) == 0)
            // 对map中的table进行初始化
            tab = initTable();
        // -----------------------------------------------------------------------------
        // CASE2:table已经初始化,此时根据寻址算法确定1个桶,并且桶的头结点f为null
        // i 表示key使用路由寻址算法得到key对应table数组的下标位置,tabAt方法获取指定桶位i的头结点f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 这时候就可以将封装k-v的结点直接放入桶
            // casTabAt通过CAS的方式去向Node数组指定位置i设置节点值,设置成功返回true 否则返回false
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;
        }
        // -----------------------------------------------------------------------------
        // CASE3:table已经初始化,寻址得到的桶位中的头结点f不是null,如果该头结点f的hash值fh=-1:
        // 则,表示当前节点是FWD(forwarding)节点
        // 如果CASE3条件成立:表示当前桶位的头结点为 FWD结点,表示目前map正处于扩容过程中..
        else if ((fh = f.hash) == MOVED)
            // 发现f是FWD结点后,当前结点有义务去帮助当前map对象完成数据迁移工作...
            // 等学完扩容再来分析这里~
            tab = helpTransfer(tab, f);
        // -----------------------------------------------------------------------------
        // CASE4:CASE1-3都不满足时,那么当前桶位存放的可能是链表也可能是红黑树代理结点TreeBin
        else {
            // 保留替换之前的数据引用:当新插入的key存在后,会将旧值赋给OldVal,返回给put方法调用
            V oldVal = null;
            // 使用synchronized加锁“头节点”(**理论上是“头结点”**)
            synchronized (f) {
                // -----------------------------------------------------------------------
        		// CASE5:tabAt(tab, i) == f
                // 对比1下当前桶位的头节点是否为之前获取的头结点:为什么又要对比1下?
                // 如果其它线程在当前线程之前将该桶位的头结点修改掉了,当前线程再使用sync对该节点f加锁就有问题了(锁本身加错了地方~) 
                if (tabAt(tab, i) == f) {// 如果条件成立,说明加锁的对象f没有问题,持有锁!
                    // ------------------------------------------------------------------
                    // CASE6:fh >= 0)
                    // 如果条件成立,说明当前桶位就是普通链表桶位,这里回顾下常量属性字段:
                    // static final int MOVED = -1; 表示当前节点是FWD(forwarding)节点
                    // static final int TREEBIN = -2; 表示当前节点已经树化
                    if (fh >= 0) {
                        // 1.当前插入key与链表当中所有元素的key都不1致时,当前的插入操作是追加到链表的末尾,binCount此时表示链表长度
                        // 2.当前插入key与链表当中的某个元素的key1致时,当前插入操作可能就是替换了。binCount表示冲突位置(binCount - 1)
                        binCount = 1;
                        // 迭代循环当前桶位的链表,e是每次循环处理节点。
                        for (Node<K,V> e = f;; ++binCount) {
                            // 当前循环遍历节点的key
                            K ek;
                            // --------------------------------------------------------
                    		// CASE7:
                            // 条件1:e.hash == hash
                            // 成立:表示循环的当前元素的hash值与插入节点的hash值1致,需要进1步判断
                            // 条件2:((ek = e.key) == key ||(ek != null && key.equals(ek)))
                            // 成立:说明循环的当前节点与插入节点的key1致,确实发生冲突了~
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 将当前循环的元素的值赋值给oldVal
                                oldVal = e.val;
                                // 传入的putVal方法参数onlyIfAbsent:是否替换数据:
                                // false,当put数据时,遇到Map中有相同K,V的数据,则将其替换
                                // true,当put数据时,遇到Map中有相同K,V的数据,则不做替换,不插入
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // --------------------------------------------------------
                    		// CASE8:
                            // 当前元素与插入元素的key不1致时,会走下面程序:
                            // 1.更新循环遍历的节点为当前节点的下1个节点
                            // 2.判断下1个节点是否为null,如果是null,说明当前节点已经是队尾了,插入数据需要追加到队尾节点的后面。
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // ------------------------------------------------------------------
                    // CASE9:f instanceof TreeBin
                    // 如果条件成立,表示当前桶位是红黑树代理结点TreeBin
                    //(前置条件:该桶位1定不是链表)
                    else if (f instanceof TreeBin) {
                        // p表示红黑树中如果与你插入节点的key 有冲突节点的话,则putTreeVal方法会返回冲突节点的引用。
                        Node<K,V> p;
                        // 强制设置binCount为2,因为binCount <= 1 时有其它含义,所以这里设置为了2 (回头讲addCount的时候会详细介绍)
                        binCount = 2;
                        // 条件1成立,说明当前插入节点的key与红黑树中的某个节点的key1致,冲突了:
                        // putTreeVal向红黑树中插入结点,插入成功返回null,否则返回冲突结点对象
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            // 将冲突节点的值赋值给oldVal
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // ------------------------------------------------------------------
            // CASE10:binCount != 0
            // 说明当前桶位不为null,可能是红黑树也可能是链表
            if (binCount != 0) {
                // 如果binCount>=8 表示处理的桶位1定是链表
                if (binCount >= TREEIFY_THRESHOLD)
                    // 因为桶中链表长度大于了8,需要树化:
                    // 调用转化链表为红黑树的方法
                    treeifyBin(tab, i);
                // 说明当前线程插入的数据key,与原有k-v发生冲突,需要将原数据v返回给调用者。
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // Map中的元素数据量累加方法:有额外的以下功能
    // 1.统计当前table1共有多少数据
    // 2.并判断是否达到扩容阈值标准,触发扩容。
    addCount(1L, binCount);
    return null;
}

2. Source code analysis of initTable method

Initialize the bucket array the first time you put an element:


/** 
 * table初始化
 */
private final Node<K,V>[] initTable() {
    // tab: 引用map.table
	// sc: sizeCtl的临时值
    // sizeCtl:默认为0,用来控制table的状态、以及初始化和扩容操作:
    // sizeCtl<0表示table的状态:
    //(1)=-1,表示有其他线程正在进行初始化操作。(其他线程就不能再进行初始化,相当于1把锁)
	//(2)=-(1 + nThreads),表示有n个线程正在1起扩容。
    // sizeCtl>=0表示table的初始化和扩容相关操作:
	//(3)=0,默认值,后续在真正初始化table的时候使用,设置为默认容量DEFAULT_CAPACITY --> 16。
	//(4)>0,将sizeCtl设置为table初始容量或扩容完成后的下1次扩容的门槛。
    Node<K,V>[] tab; int sc;
    // 附加条件的自旋: 条件是map.table尚未初始化...
    while ((tab = table) == null || tab.length == 0) {
        // -----------------------------------------------------------------------------
        // CASE1: sizeCtl) < 0
        // sizeCtl < 0可能是以下2种情况:
        //(1)-1,表示有其他线程正在进行table初始化操作。
		//(2)-(1 + nThreads),表示有n个线程正在1起扩容。
        if ((sc = sizeCtl) < 0)
            // 这里sizeCtl大概率就是-1,表示其它线程正在进行创建table的过程,当前线程没有竞争到初始化table的锁,进而当前线程被迫等待...
            Thread.yield();
        // -----------------------------------------------------------------------------
        // CASE2:sizeCtl) >= 0 且U.compareAndSwapInt(this, SIZECTL, sc, -1)结果为true
        // U.compareAndSwapInt(this, SIZECTL, sc, -1):以CAS的方式修改当前线程的sizeCtl为-1,
        // sizeCtl如果成功被修改为-1,就返回true,否则返回false。
        // 当返回true时,则该线程就可以进入下面的else if代码块中,这时候sizeCtl=-1相当于是1把锁,表示下面的else if代码块已经被该线程占用,其他线程不能再进入~
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // ----------------------------------------------------------------------
                // CASE3: 这里为什么又要判断呢? 
                // 为了防止其它线程已经初始化table完毕了,然后当前线程再次对其初始化..导致丢失数据。
				// 如果条件成立,说明其它线程都没有进入过这个if块,当前线程就是具备初始化table权利了。
                if ((tab = table) == null || tab.length == 0) {
                    // sc大于等于0的情况如下:
                    //(3)=0,默认值,后续在真正初始化table的时候使用,设置为默认容量DEFAULT_CAPACITY --> 16。
					//(4)>0,将sizeCtl设置为table初始容量或扩容完成后的下1次扩容的门槛。
                    // 如果sc大于0,则创建table时使用sc为指定table初始容量大小,
                    // 否则使用16默认值DEFAULT_CAPACITY
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 创建新数组nt
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 将新数组nt赋值给table、tab
                    table = tab = nt;
                    // sc设置为下次散列表扩容的门槛:0.75n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 将sc赋值给sizeCtl,分为1下2种情况:
                // 1、当前线程既通过了CASE2的判断,也通过了CASE3的判断:
                // 则,当前线程是第1次创建map.table的线程,那么,sc就表示下1次扩容的阈值。
                // 2、当线程通过了CASE2的判断,但是没有通过CASE3的判断:
                // 则,当前线程并不是第1次创建map.table的线程,当前线程通过CASE2的判断时,将
                // sizeCtl设置为了-1 ,那么在该线程结束上面的代码逻辑之后,需要将sc修改回进入CASE2之前的sc值。
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

(1) Using CAS lock to control only one thread to initialize bucket array;

(2) sizeCtl stores the expansion threshold after initialization;

(3) The expansion threshold is 0.75 times the size of bucket array, which is the capacity of map, that is, how many elements are stored at most.

3. addCount method (difficulty)

Reading addCount Before the method source code, it is best to familiarize yourself with it again LongAdder Source code: 1 article takes you to analyze the introduction LongAdder source code

addCount Method function: After adding elements every time, the number of elements is increased by 1, and whether the expansion threshold is reached is judged. If it is reached, the expansion is carried out or assisted in expansion.


/**
 * Map中的元素数据量累加方法:有额外的以下功能
 * 1.统计当前table1共有多少数据
 * 2.并判断是否达到扩容阈值标准,触发扩容
 */
private final void addCount(long x, int check) {
    // as 表示 LongAdder.cells
    // b 表示LongAdder.base
    // s 表示当前map.table中元素的数量
    CounterCell[] as; long b, s;
    // -------------------------统计当前table1共有多少数据----------------------------------
    // CASE1: 
    // (as = counterCells) != null
    // 条件1:true->表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell 去累加数据
    //        false->表示当前线程应该直接将数据累加到base(没有线程竞争)
    // !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
    // 条件2:false->表示写base成功,数据累加到base中了,当前竞争不激烈,不需要创建cells
    //        true->表示写base失败,与其他线程在base上发生了竞争,当前线程应该去尝试创建cells。
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // 有几种情况进入到if块中?
        // 条件1为true->表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell去累加数据
        // 条件2为true->表示写base失败,与其他线程在base上发生了竞争,当前线程应该去尝试创建cells。
        // a 表示当前线程hash寻址命中的cell
        CounterCell a;
        // v 表示当前线程写cell时的期望值
        long v;
        // m 表示当前cells数组的长度
        int m;
        // true -> 未发生线程竞争  false->发生线程竞争
        boolean uncontended = true;
        // ---------------------------------------------------------------------------
        // CASE2: 
        // 条件1:as == null || (m = as.length - 1) < 0
        // true-> 表示当前线程是通过写base竞争失败(CASE1的条件2),然后进入的if块,就需要调用fullAddCount方法去扩容或者重试.. 
        // (fullAddCount方法就类似于LongAdder.longAccumulate方法)
        // 条件2:a = as[ThreadLocalRandom.getProbe() & m]) == null 前置条件:cells已经初始化了
        // true->表示当前线程命中的cell表格是个空的,需要当前线程进入fullAddCount方法去初始化cell,放入当前位置.
        // 条件3:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
        // 前置条件:条件2中当前线程命中的cell表格不是空的~
        //       false->取反得到false,表示当前线程使用cas方式更新当前命中的cell成功
        //       true->取反得到true,表示当前线程使用cas方式更新当前命中的cell失败,需要进入fullAddCount进行重试或者扩容cells。
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
        ) {
            fullAddCount(x, uncontended);
            // 考虑到fullAddCount里面的事情比较累,就让当前线程不参与到扩容相关的逻辑了,直接返回到调用点。
            return;
        }
        if (check <= 1)
            return;
        // sumCount统计当前散列表元素个数sum = base + cells[0] + ... + cells[n],这是1个期望值
        s = sumCount();
    }
    // -------------------------判断是否达到扩容阈值标准,触发扩容----------------------------
    // CASE3: 
    // check >= 0表示1定是1个put操作调用的addCount,check < 0是remove操作调用的addCount方法
    if (check >= 0) {
        // tab 表示 map.table
        // nt 表示 map.nextTable
        // n 表示map.table数组的长度
        // sc 表示sizeCtl的临时值
        Node<K,V>[] tab, nt; int n, sc;
        /**
         * sizeCtl < 0
         * 1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待..
         * 2.表示当前table数组正在进行扩容 ,高16位表示:扩容的标识戳   低16位表示:(1 + nThread) 当前参与并发扩容的线程数量
         *
         * sizeCtl = 0,表示创建table数组时 使用DEFAULT_CAPACITY为大小
         *
         * sizeCtl > 0
         *
         * 1. 如果table未初始化,表示初始化大小
         * 2. 如果table已经初始化,表示下次扩容时的 触发条件(阈值)
         */
        // 有条件自旋~
        // 条件1:s >= (long)(sc = sizeCtl)
        //        true -> 1.当前sizeCtl为1个负数 表示正在扩容中..
        //                2.当前sizeCtl是1个正数,表示扩容阈值,但是s已经达到扩容阈值(需要扩容)
        //        false -> 表示当前table尚未达到扩容阈值条件(不需要扩容)
        // 条件2:(tab = table) != null 恒成立 true
        // 条件3:(n = tab.length) < MAXIMUM_CAPACITY
        //        true -> 当前table长度小于最大值限制,则可以进行扩容。
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 获取扩容唯1标识戳
            // eg: 16 -> 32 扩容标识戳为:1000 0000 0001 1011
            // 什么意思呢?
            // 即,所有将map容量由16扩容到32的线程,其拿到的扩容唯1标识戳都是1000 0000 0001 1011
            int rs = resizeStamp(n);
            // --------------------------------------------------------------------------
        	// CASE4: 
            // 条件成立:表示当前table正在扩容,则,当前线程理论上应该协助table完成扩容
            if (sc < 0) {
                // --------------------------------------------------------------
        		// CASE2: 条件1~4只要有个为true就会跳出循环,不会继续扩容~
                // 条件1:(sc >>> RESIZE_STAMP_SHIFT) != rs
                //       true -> 说明当前线程获取到的扩容唯1标识戳 非 本批次扩容
                //       false -> 说明当前线程获取到的扩容唯1标识戳 是 本批次扩容
                // 条件2:JDK1.8 中有bug,jira已经提出来了,其实想表达的是 sc == (rs << 16 ) + 1
                //        true -> 表示扩容完毕,当前线程不需要再参与进来了
                //        false -> 扩容还在进行中,当前线程可以参与
                // 条件3:JDK1.8 中有bug,jira已经提出来了,其实想表达的是:
                // sc == (rs << 16) + MAX_RESIZERS
                //        true-> 表示当前参与并发扩容的线程达到了最大值 65535 - 1
                //        false->表示当前线程可以参与进来
                //条件4:(nt = nextTable) == null
                //        true -> 表示本次扩容结束
                //        false -> 扩容正在进行中
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    // 条件1~4只要有个为true就会跳出循环,不会继续扩容~
                    break;
                // --------------------------------------------------------------
        		// CASE5: 
                // 前置条件:当前table正在执行扩容中(即,CASE4没有被通过)当前线程有机会参与进扩容。
                // 条件成立:说明当前线程成功参与到扩容任务中,并且将sc低16位值加1,表示多了1个线程参与工作~
                // 条件失败:
                // 1.当前有很多线程都在此处尝试修改sizeCtl,有其它1个线程修改成功了
                // 导致你的sc期望值与内存中的值不1致,CAS修改失败。(下次自选大概率不会在来到这里~)
                // 2.transfer任务内部的线程也修改了sizeCtl。
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 扩容(迁移数据):协助扩容线程,持有nextTable参数,进入该方法协助扩容~
                    transfer(tab, nt);
            }
            // --------------------------------------------------------------------------
        	// CASE6: 以CAS的方式去更新sc,更新成功返回true,否则返回false
            // 条件成立,说明当前线程是触发扩容的**第1个**线程,在transfer方法需要做1些扩容准备工作:
            // rs << RESIZE_STAMP_SHIFT:将扩容唯1标识戳左移16位 eg:
            // 1000 0000 0001 1011 << 16 得到 1000 0000 0001 1011 0000 0000 0000 0000
            // 紧接这 (rs << RESIZE_STAMP_SHIFT) + 2)操作:
            // 1000 0000 0001 1011 0000 0000 0000 0000 + 2 
            // => 1000 0000 0001 1011 0000 0000 0000 0010,这个2进制数有如下含义:
            // sizeCtl的高16位: 1000 0000 0001 1011 表示当前的扩容标识戳
            // sizeCtl的低16位: 0000 0000 0000 0010 表示(1 + nThread),即,目前有多少个线程正在参与扩容~
            // 那么这里的n是怎么来的呢? 
            // eg: (rs << RESIZE_STAMP_SHIFT) + 2 这里的2,就是1 + 1,后面的1就是对1*Thread,即(1+1*Threads)
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 扩容(迁移数据):触发扩容条件的线程 不持有nextTable
                transfer(tab, null);
            // 重新计算table中的元素个数
            s = sumCount();
        }
    }
}

4. Summary

(1) The storage mode of element number is similar to LongAdder class, which is stored on different segments to reduce the conflict when different threads update size at the same time;

(2) When calculating the number of elements, add the values of these segments and baseCount to calculate the total number of elements;

(3) Under normal circumstances, sizeCtl stores an expansion threshold, which is 0.75 times of the capacity;

(4) During expansion, sizeCtl high-level storage expansion postmark (resizeStamp) and the number of low-level storage expansion threads plus 1 (1 + nThreads);

(5) If other threads find expansion after adding elements, they will also join the expansion ranks;

The above is the ConcurrentHashMap source code put data storage method and related methods, because transfer migration data this method is more complex, we put in the next article separate analysis ~ also hope that you pay more attention to other content of this site!


Related articles: