线程安全之ConcurrentHashMap源码分析

线程安全之ConcurrentHashMap源码分析,第1张

线程安全之ConcurrentHashMap源码分析

在上一篇文章中,讲解了HashMapjdk1.7使用头插法,容易形成环形链表,在jdk1.8改为尾插法,虽然解决了环形链表的问题,但是会导致数据丢失的问题,所以我们说HashMap在多线程环境下是不安全的。

所以,不管是jdk1.7还是jdk1.8的HashMap 都存在线程安全的问题。那么在多线程环境下应该这样去保证线程安全呢,有什么办法呢?

上一篇文章中有说到3种解决方案:

  • 在多线程环境下用HashTable来解决线程安全的问题,put或者get方法源码如下图:
   //put方法
   public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {
            throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        Entry tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        @SuppressWarnings("unchecked")
        Entry entry = (Entry)tab[index];
        for(; entry != null ; entry = entry.next) {
            if ((entry.hash == hash) && entry.key.equals(key)) {
                V old = entry.value;
                entry.value = value;
                return old;
            }
        }

        addEntry(hash, key, value, index);
        return null;
    }

    //get方法
    @SuppressWarnings("unchecked")
    public synchronized V get(Object key) {
        Entry tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                return (V)e.value;
            }
        }
        return null;
    }

从上面的源码中可以看出不管是put还是get *** 作都会使用synchronized加锁,由此带来的问题是在这个时间段内只能有一个线程可以 *** 纵HashTable,最不能接收的,就是get方法只是一个读取 *** 作为什么要加锁呢?就连删除也都加上锁,你这是为了安全什么都锁啊,你牛!

如果线程数量大一点的话,HashTable的性能会急剧下降,因为每次 *** 作都需要锁住整个对象,而其他线程在此期间是不能 *** 作的。不仅如此,还会带来额外的上下文切换等开销,所以此时它的吞吐量甚至还不如单线程的情况,因此并不适合高并发场景。

  • Collections.synchronizedMap(Map
  • 它和HashTable差不多,要锁住整张表,效率低下,如下图源码:

    从源码中可以看出SynchronizedMap在Hashtable对比上改进了很多,synchronized不再放在方法上,而是放在方法内部,作为同步块出现,但仍然是对象级别的同步锁,读和写 *** 作都需要获取锁,本质上,仍然只允许一个线程访问,其他线程被排斥在外。

    • 上面的两种方式实现线程安全性都不是很合适,所以剩下ConcurrentHashMap,既然锁住整张表效率低下,那能不能把表分成N份呢,把数据尽量均匀的分布到每个部分中,分别给他们加锁,互相之间并不影响呢?那就使用ConcurrentHashMap实现,因为ConcurrentHashMap将Map分段了,每个段进行加锁,而不是像HashTable,SynchronizedMap都是整个map加锁,这样就可以多线程访问了!

    在jdk1.7中ConcurrentHashMap 是基于分段锁的,就是将内部分成不同的 Segment(段),每个段里面是HashEntry数组

    在jdk1.8之后对jdk1.7做了很大的改进,不再采用分段锁的机制了,而采用的是 Synchronized + CAS(Compare and Swap,即比较并替换,实现并发算法时常用到的一种技术) ,把锁的粒度进一步降低,而放弃了 Segment 分段。

    下面我们就以 jdk1.8的ConcurrentHashMap中put方法源码进行解析

    而put方法中主要有以下几个重点的方法进行讲解:

    • initTable方法:初始化表
    • helpTransfer方法:帮助扩容
    • addCount方法:增加元素个数,并且触发扩容 *** 作
    • fullAddCount方法:初始化CounterCell,来记录元素个数
    • transfer方法:扩容和迁移元素的

    说明一下:由于jdk1.8的HashMap和ConcurrentHashMap的基本属性变量、结构和一些初始化相关的逻辑都差不多,这里不在单独说明介绍。看懂HashMap的伙伴们都知道滴吧,嘻嘻

    put方法

    先从put方法中看一下插入新元素的时候,怎么保证线程安全的吧

        public V put(K key, V value) {
            return putVal(key, value, false);
        }
    
        
        final V putVal(K key, V value, boolean onlyIfAbsent) {
           //在并发情况下,key 和 value 不支持为空的,为空报异常
            if (key == null || value == null) throw new NullPointerException();
            获取key的hash值
            int hash = spread(key.hashCode());
            // 用来记录所在table数组中的桶的中链表的个数,后面会用于判断是否链表过长需要转红黑树
            int binCount = 0;
             // for循环,直到put成功插入数据才会跳出
            for (Node[] tab = table;;) {
            // f=桶头节点  n=table的长度  i=在数组中的哪个下标  fh=头节点的hash值
                Node f; int n, i, fh;
                //如果表为空,则说明还未初始化。
                if (tab == null || (n = tab.length) == 0)
                 //第一次put的时候table没有初始化,则初始化table,只有一个线程可以初始化成功。
                    tab = initTable();
                    //若表已经初始化,则找到当前key所在的桶,并且判断是否为空
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                  //若当前桶为空,则通过ConcurrentHashMap原子 *** 作,把新节点插入到此位置,
                 //保证了只有一个线程可以ConcurrentHashMap成功,其它线程都会失败。
                    if (casTabAt(tab, i, null,
                                 new Node(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                 //若所在桶不为空,则判断节点的 hash 值是否为 MOVED(值是-1),说明正在扩容
                else if ((fh = f.hash) == MOVED)
                //若为-1,说明当前数组正在进行扩容,则需要当前线程帮忙迁移数据
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    //这里用同步锁的方式来保证线程安全,给桶中头节点对象加锁
                    synchronized (f) {
                    
                        // 在一次检测,看在加锁之前,防止该桶的头节点被其他线程修改
                        if (tabAt(tab, i) == f) {
                         //如果hash值大于等于0,说明是正常的链表结构
                            if (fh >= 0) {
                                binCount = 1;
                                //从头结点开始遍历,每遍历一次,binCount计数加1
                                for (Node e = f;; ++binCount) {
                                    K ek;
                                     //要存的元素的hash,key跟要存储的位置的节点相同的时候,则用新值替换旧值
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node pred = e;
                                    //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空
                                    if ((e = e.next) == null) {
                                    //为空的话把这个要加入的节点设置为当前节点的下一个节点
                                        pred.next = new Node(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                             //表示已经转化成红黑树了
                            else if (f instanceof TreeBin) {
                                Node p;
                                binCount = 2;
                                //调用putTreeval方法,将该元素添加到树中去
                                if ((p = ((TreeBin)f).putTreeval(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                      //如果节点个数大于等于8,则调用treeifyBin方法将链表转换为红黑树
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                        //如果是修改,不是新增,则把旧节点值返回
                            return oldVal;
                        break;
                    }
                }
            }
            //使用cas统计数量增加1,同时判断是否满足扩容需求,进行扩容
            
            // 元素个数增加1,完成新增后并有可能会触发扩容
            addCount(1L, binCount);
            return null;
        }
    

    整个put方法的流程:

    当添加key、value键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,如果没有的话就初始化数组,然后通过计算hash值来确定放在数组的哪个位置。如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来,如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制。

    最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加 *** 作。然后判断当前取出的节点位置存放的是链表还是树,如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话,则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾。如果是树的话,则调用putTreeval方法把这个元素添加到树中去,最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,则调用treeifyBin方法来尝试将链表转为树,或者扩容数组。

    initTable方法

    我们看一下当数组为空的时候是怎么初始化的

        private final Node[] initTable() {
            Node[] tab; int sc;
            
            // 循环判断table是否为空,进入while准备开始初始化,直到初始化成功为止
            while ((tab = table) == null || tab.length == 0) {
            //sizeCtl表示有其他线程正在进行初始化 *** 作,把线程挂起。对于table的初始化工作,只能有一个线程在进行
           //sizeCtl值有很多中情况,默认值为0
           //当为-1时,说明有其它线程正在对表进行初始化 *** 作
           //当表初始化成功后,又会把它设置为扩容阈值
           //当为一个小于-1的负数,用来表示当前有几个线程正在帮助扩容
                if ((sc = sizeCtl) < 0)
                //线程等待
                    Thread.yield(); // lost initialization race; just spin
                      //把sc的值设置为-1,表明当前线程正在进行表的初始化,其它失败的线程就会自旋
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                    //重新检查一下table是否为空
                        if ((tab = table) == null || tab.length == 0) {
                        //如果sc大于0,则为sc,否则返回默认容量 16
                        //当调用有参构造创建 Map 时,sc的值是大于0的
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            //这里是创建数组
                            Node[] nt = (Node[])new Node[n];
                            table = tab = nt;
                          //sc赋值,如果n为16,则sc = 16-16/4 = 12
                            sc = n - (n >>> 2);
                        }
                    } finally {
                     	//赋值给sizeCtl,初始化结束,sizeCtl的值>0
                        sizeCtl = sc;
                    }
                     //若当前线程初始化表成功,则跳出循环
                    break;
                }
            }
            return tab;
        }
    
    helpTransfer方法

    当一个线程要对table中元素进行 *** 作的时候,如果检测到节点的hash值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容

        final Node[] helpTransfer(Node[] tab, Node f) {
            Node[] nextTab; int sc;
         //如果table不为空,并且当前桶头节点为ForwardingNode类型,并且nextTab不为空
        // 说明当前桶已经迁移完毕并且去帮助迁移其它桶的元素
        // tab != null:表示数组已经初始化
        // ForwardingNode:表示在扩容时已经对该节点处理完成且扩容尚未结束
        // nextTable != null:表示扩容尚未完成
            if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode)f).nextTable) != null) {
                //这里根据table的length得到一个扩容唯一标识戳
                int rs = resizeStamp(tab.length);
                //Map仍在扩容状态的判断
                //nextTab == nextTable
                //条件成立:表示当前还在扩容中
                //条件不成立:1.扩容完毕后,nextTable会被设为Null
                //table == tab
                //条件成立:表示当前还在扩容中,还未完成
                //条件不成立:表示扩容已经结束了,扩容结束之后,最后退出的线程会设置nextTable为table
                //(sc = sizeCtl) < 0
                //条件成立:表示扩容正在进行中
                //条件不成立:表示sizeCtl当前是一个大于0的数,此时代表下次扩容的阈值,当前扩容已经结束。
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {
                      
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                    // 这里将sizeCtl的值自增1,表明参与扩容的线程数量+1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                      // 进行扩容操作
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }
    
    transfer方法

    transfer这个方法我们一直说是在扩容,要明确的一点是与其说是帮助扩容,其实更准确的说应该是帮助迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶中。该方法发生扩容并且迁移数据。

    还有就是扩容是第一次初始化表,只能由一个线程完成,而其他线程帮助迁移元素到新数组当中。

    看源码之前先看一下扩容迁移数据的流程图,熟悉一下流程:

    下面分析源码:

        private final void transfer(Node[] tab, Node[] nextTab) {
            // n:表示扩容之前table数组的长度
            // stride:表示分配给线程任务的步长
            int n = tab.length, stride;
           //将 (n>>>3相当于n/8)然后除以CPU核心数。如果得到的结果小于 16,那么就使用 16 
    	   // 这里的目的是让每个CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶,也就是长度为16的时候,扩容的时候只会有一个线程来扩容
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                // 控制线程迁移数据的最小步长
                stride = MIN_TRANSFER_STRIDE; // subdivide range
                
    	     //nextTab未初始化,nextTab是用来扩容的node数组 
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    // 如果新出租未初始化,则新建数组为原数组的两倍
                    Node[] nt = (Node[])new Node[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                //扩容失败,sizeCtl使用int的最大值
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                //使用nextTable代指新数组
                nextTable = nextTab;
                 // 记录迁移数据整体位置的一个标记,初始值是原table数组的长度。
                // 可以理解为:全局范围内散列表的数据任务处理到哪个桶的位置了
                transferIndex = n;
            }
             //表示新数组长度
            int nextn = nextTab.length;
             //创建一个标志类(用于表示扩容中状态的节点)
            ForwardingNode fwd = new ForwardingNode(nextTab);
            
    	是否向前推进的标志,首次推进为true,如果等于true,说明需要再次推进一个下标(i--),反之,如果是false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
            boolean advance = true;
            //是否所有线程都全部迁移完成的标志
            boolean finishing = false; // to ensure sweep before committing nextTab
            //i:表示当前线程正在迁移的桶的下标
            //bound:表示它本次可以迁移的范围下限
            for (int i = 0, bound = 0;;) {
             // f:表示桶位的头节点
             // fh:表示头节点的hash
                Node f; int fh;
                // 如果当前线程可以向后推进,整个while循环就是在算i的值,这个循环就是控制i依次递减,同  时,每个线程都会进入这里取得自己需要转移的桶的范围
                while (advance) {
                    int nextIndex, nextBound;
                    //i每次自减 1,直到 bound。若超过bound范围,或者finishing标志为true,则不用向前推进
                    //若未全部完成迁移,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个桶位
                    if (--i >= bound || finishing)
                    //这里设置advance=false,是为了防止在没有成功处理一个桶的情况下却进行了推进
                        advance = false;
                     //这里每次执行都会把transferIndex最新的值赋给nextIndex
                    //transferIndex <= 0 说明原数组中的每个桶位置,都有线程在处理迁移了,需要跳出while循环,并把i改成-1,推进状态变成false,以跳转到以下的if判断在处理的线程是否已经全部完成。
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        //这里设置advance=false,是为了防止在没有成功处理一个桶的情况下却进行了推进
                        advance = false;
                    }
                    
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                       //这个值就是当前线程可以处理的数据迁移下限
                        bound = nextBound;
                       //第一次的i为15,因为长度16的数组,最后一个元素的下标为15
                        i = nextIndex - 1;
                   //这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样会导致漏掉某个桶,下面的tabAt方法判断会出现这样的情况。
                        advance = false;
                    }
                }
                 //i:表示需要转移的桶的下标,n:表示原数组的容量
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
               //判断是否已经完成扩容,已完成扩容则做收尾逻辑
                    if (finishing) {
                      //完成扩容后,将引用设置为null
                        nextTable = null;
                         //更新table为新的数组,这里的table是个volatile变量,所以这个赋值 *** 作对其他线程是可见的
                        table = nextTab;
                         //设置新的扩容阈值,将阈值设置为新容量的3/4,也就是新数组长度的0.75倍
                        sizeCtl = (n << 1) - (n >>> 1);
                       // 返回结果,扩容结束
                        return;
                    }
                    //在扩容开始时,会将sizeCtl设置成一个负数,每次有新的线程并发扩容时,会将sizeCtl+1,而当有线程处理完扩容逻辑后再减1,以此来判断是否是最后一个线程
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                   //这里就是去校验当前sizeCtl是否和初始值是否相等。相等,则说明全部线程迁移完成。
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                      //只有此处,才会把finishing设置为true,finishing为true才会走到上面的if扩容结束条件判断
                    //推进标记advance置为true
                        finishing = advance = true;
                     //这里会把i从-1修改为16,
                    //这么做是让i再从后向前扫描一遍数组,看看是不是都迁移完成了
                    //也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件
                        i = n; // recheck before commit
                    }
                }
                //若i的位置元素为空,则说明当前桶的元素已经被迁移完成,就把头节点设置为fwd标志。
                else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
                //若当前桶的头节点是fwd ,说明别的线程已经处理过了,再次推进
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {//处理当前桶的数据迁移
                 //锁定该桶,防止对同一个桶数据并发操作
                    synchronized (f) { 
                //再次判断当前桶是否有修改过,防止获得锁后,该桶内数据被别的线程插入了新的数据,因为这个f是在未加锁之前获取的node对象,在这期间,可能这个下标处插入了新数据
                    //如果不做这层校验,会导致新加入到桶内的数据没有被处理,导致数据丢失
                        if (tabAt(tab, i) == f) {
                         //ln:表示低位链表引用
                         //hn:表示高位链表引用
                             Node ln, hn;
                            //若fh >= 0表示是普通链表节点
                            if (fh >= 0) {
                               //这里主要对长度进行计算
                                 //如果结果为0 ,将其放在低位链表,反之放在高位链表,目的是将链表重新 hash,放到对应的位置上
                               int runBit = fh & n;
                                Node lastRun = f;
                                for (Node p = f.next; p != null; p = p.next) {
                                    //取于桶中每个节点的hash值
                                    int b = p.hash & n;
                                        // 如果节点的hash值和首节点的hash值取于结果不同
                                    if (b != runBit) {
                                    //更新runBit,用于下面判断lastRun该赋值给ln(低位链表)还是 hn(高位链表)。
                                      runBit = b; 
                                      //这个lastRun保证后面的节点与自己的取于值相同,避免后面没有必要的循环
                                      lastRun = p; 
                                    }
                                }
                                //如果runBit == 0说明lastRun引用的链表为低位链表,那么就让ln指向 低位链表
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                //否则lastRun引用的链表为高位链表,hn指向高位链表
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                //遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中
                                //循环跳出条件:当前循环结点!=lastRun
                                for (Node p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    //如果与运算结果是0,那么就还在低位
                                    if ((ph & n) == 0)// 如果是0 ,那么创建低位节点
                                        ln = new Node(ph, pk, pv, ln);
                                    else//则创建高位
                                        hn = new Node(ph, pk, pv, hn);
                                }
                               //设置低位链表的位置不变
                                setTabAt(nextTab, i, ln);
                                 //设置高位链表,在原有长度上 +n
                                setTabAt(nextTab, i + n, hn);
                                 //标记当前桶已完成迁移
                                setTabAt(tab, i, fwd);
                                 //继续向后推进
                                advance = true;
                            }
                            //这里是红黑树结构的迁移,逻辑与链表差不多
                            else if (f instanceof TreeBin) {
                            
                                TreeBin t = (TreeBin)f;
                                TreeNode lo = null, loTail = null;
                                TreeNode hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode p = new TreeNode
                                        (h, e.key, e.val, null, null);
                                       //以下的 *** 作和链表相同,结果为0的放在低位
                                    if ((h & n) == 0) {
                                    //为空的话,说明当前低位链表还没有数据
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                            //否则低位链表存在数据了,把数据追加到低位链表的末尾
                                        else
                                            loTail.next = p;
                                            //将低位链表尾指针指向p节点
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {//否则放在高位
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                 //如果树的节点数小于等于6就转成链表,否则创建新的树
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin(hi) : t;
                               //设置低位树的位置不变
                                setTabAt(nextTab, i, ln);
                               //设置高位树,在原有长度上 +n
                                setTabAt(nextTab, i + n, hn);
                               // 标记该桶已迁移
                                setTabAt(tab, i, fwd);
                                 //继续向后推进
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    

    以 runBit =0 为例,迁移后的新数组链表示意图:

    addCount方法

    addCount 方法的主要作用:

    对table的长度+1。无论是通过修改 baseCount,还是通过使用CounterCell。当CounterCell被初始化了,就优先使用他,不再使用 baseCount。

    检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。

    //x为1,check表示链表上的元素个数
        private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
               //判断counterCells是否为空. 为空有可能竞争的线程非常少,可通过cas *** 作尝试修改 baseCount 变量,对这个变量进行原子累加 *** 作
              //如果cas失败说明存在竞争,这个时候不能再采用baseCount来累加,而是通过counterCells来记录
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, baseCOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                 //是否冲突标识,标记为true,默认没有冲突
                boolean uncontended = true;
                //条件一:as == null || (m = as.length - 1) < 0
                //若数组为空,进fullAddCount方法
                //条件二:(a = as[ThreadLocalRandom.getProbe() & m]) == null
                //表示方法会给当前线程生成一个随机数,然后用随机数与数组长度取模,计算它所在的表格。若当前线程所分配到的表格为空,需要当前线程进入fullAddCount方法
                //条件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
                //若CounterCell不为空,且线程所在表格不为空,则尝试CAS修改表格对应的value值加1。
                //若修改成功,则执行下一步,否则把uncontended值设为fasle,说明产生了竞争,然后进fullAddCount方法
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                     //执行循环让当前线程一定把1加成功,这个会在稍后讲
                    fullAddCount(x, uncontended);
                    return;
                }
                //这里若是返回了,那后边怎么判断扩容?搞不懂
                if (check <= 1)
                    return;
               //统计容量大小,主要用于下边的逻辑,是否达到了扩容阈值。
                s = sumCount();
            }
            //这里用于检查是否需要扩容
            if (check >= 0) {
              //tab:表示table
             //nt:表示nextTable
             //n:表示数组的长度
             //sc:表示sizeCtl的临时值
                Node[] tab, nt; int n, sc;
                //sc此时为阈值,容量达到阈值了且table不为空,且table数组长度小于最大长度(可以扩容)
                //条件一:s >= (long)(sc = sizeCtl)
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                       //根据数组长度得到一个标识
                    int rs = resizeStamp(n);
                    //表示当前table正在扩容
                    if (sc < 0) {
                    //这里是有bug的,感兴趣的小伙伴可以看一下:
                    // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
                    //sc == rs + 1想表达的是sc == (rs << RESIZE_STAMP_SHIFT) + 1
                    //sc == rs + MAX_RESIZERS想表达的是sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
                         //sc的高16位是数据校验标识,低16位代表当前有几个线程正在帮助扩容,RESIZE_STAMP_SHIFT=16
                        //如果sc == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
                        //nextTable=null 说明需要扩容的新数组还未创建完成
                       //transferIndex这个参数小于等于0,说明已经不需要其它线程帮助扩容了
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                          //到这里说明当前线程可以帮助扩容,因此sc值+1,代表扩容的线程数加1
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    //rs << RESIZE_STAMP_SHIFT + 2,这个其实就是将rs的值赋值到高16位
                       //当sc大于0,说明sc代表扩容阈值,因此第一次扩容之前肯定走这个分支,用于初始化数组nextTable
                      //rs<<16 1000 0000 0001 1011 0000 0000 0000 0000 +2 => 1000 0000 0001 1011 0000 0000 0000 0010
       //这个值,转为十进制就是 -2145714174,用于标识,这是扩容时,初始化新数组的状态,
       //扩容时,需要用到这个参数校验是否所有线程都全部帮助扩容完成。
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                      //帮助扩容,第二个参数传入null,则说明是第一次初始化新数组
                        transfer(tab, null);
                    s = sumCount();
                }
            }
        }
    
    fullAddCount方法

    该方法负责初始化CounterCells和更新计数,方法中第二个参数wasUncontended则表示是否不存在竞争,CAS失败之后调用该方法说明存在竞争所以传false。

        private final void fullAddCount(long x, boolean wasUncontended) {
            int h;
            
            ///如果当前线程的随机数为0,则强制初始化一个值
            if ((h = ThreadLocalRandom.getProbe()) == 0) {
                ThreadLocalRandom.localInit();      // force initialization// 初始化
                //得到probe,用于counterCells数组寻址
                h = ThreadLocalRandom.getProbe();
                //把wasUncontended 设置为 true 表示不存在竞争
                wasUncontended = true;
            }
            冲突标志,若为true,表示可能需要扩容,以减少碰撞冲突
            boolean collide = false;                // True if last slot nonempty
            //死循环,表示一定要计数成功
            for (;;) {
                CounterCell[] as; CounterCell a; int n; long v;
                //如果counterCells不为空,说明已经初始化了
                if ((as = counterCells) != null && (n = as.length) > 0) {
                //当前线程所在的CounterCell对象为空
                    if ((a = as[(n - 1) & h]) == null) {
                        if (cellsBusy == 0) {            // Try to attach new Cell
                        //先创建一个CounterCell对象,把x保存进去
                            CounterCell r = new CounterCell(x); // Optimistic create
                            //当前线程占用锁
                            if (cellsBusy == 0 &&
                                U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                                boolean created = false;
                                try {               // Recheck under lock
                                    CounterCell[] rs; int m, j;
                                    //条件三:rs[j = (m - 1) & h] == null
                                    //这里又做了一层判断,如果为空才进入
                                    //因为存在一种情况就是上面第一次判断cellsBusy为0时,
                                    //可能有一个线程在CAS修改cellsBusy并且初始化完成了,
                                    //然后复位cellsBusy
                                    //此时另一个线程进入,已经是初始化过了,所以需要再次判断
                                    if ((rs = counterCells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        //把新创建的对象r的元素放入对应下标的位置
                                        rs[j] = r;
                                        //created=true表示创建成功
                                        created = true;
                                    }
                                } finally {
                                 //手动释放锁
                                    cellsBusy = 0;
                                }
                                //创建成功,且上边的赋值成功,说明加1成功,退出循环
                                if (created)
                                    break;
                               //说明存在竞争该位置已被其他线程放入了CounterCell,继续下次循环
                                continue;           // Slot is now non-empty
                            }
                        }
                           //若cellsBusy=1,说明有其它线程抢锁成功。或者若抢锁的CAS *** 作失败,
                           //都会走到这里,到下面的h = ThreadLocalRandom.advanceProbe(h)中
                           //重新生成随机数,进行下次循环判断
                        collide = false;
                    }
                      //如果走到这,说明当前方法在被调用之前已经 CAS 失败过一次,
                      //重新获取线程随机数,并把wasUncontended 设置为true,再循环一次
                    else if (!wasUncontended)       // CAS already known to fail
                        wasUncontended = true;      // Continue after rehash
                    //若wasUncontended为true表示不存在竞争,尝试cas修改,成功则退出,否则继续往下走
                    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                        break;
    
                   //如果发生过扩容或者数组长度已经达到虚拟机最大可以核心数
                   //因为长度超限,则说明已经无法扩容,只能认为无碰撞。
                    else if (counterCells != as || n >= NCPU)
                        collide = false;            // At max size or stale
                   //数组长度小于CPU核心数并且collide为false,就把collide改为true,
                   //说明下次循环可能需要扩容
                    else if (!collide)
                        collide = true;
                    //到了这里,说明竞争激烈,数组容量不够大,需要进行扩容了
                    else if (cellsBusy == 0 &&
                             U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        try {
                            if (counterCells == as) {// Expand table unless stale
                            //创建一个容量为原来两倍的数组,再通过for循环进行数据迁移。
                                CounterCell[] rs = new CounterCell[n << 1];
                              //转移旧数组的值
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                //把扩容后的对象赋值给counterCells。
                                counterCells = rs;
                            }
                        } finally {
                            // 释放锁
                            cellsBusy = 0;
                        }
                      //认为扩容后,下次不会产生冲突了
                        collide = false;
                        //当次扩容后,就不需要重新生成随机数了
                        continue;                   // Retry with expanded table
                    }
                    //上面失败都会到这里来
                    //这里重新生成一个随机数,下一次循环判断
                    h = ThreadLocalRandom.advanceProbe(h);
                }
                //若cellsBusy为0,说明不存在锁,线程都可以抢锁,若为1,表示已经有线程拿到了锁,
                //则其它线程不能抢占锁
               //上面是counterCells数组已初始化的情况,下面是未初始化情况的处理
                else if (cellsBusy == 0 && counterCells == as &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                         //这一步是初始化CounterCells数组
                    boolean init = false;
                    try {                           // Initialize table
                      //重新检查下counterCells数组引用是否有变化
                        if (counterCells == as) {
                            //初始化一个长度为2的数组
                            CounterCell[] rs = new CounterCell[2];
                           //根据当前线程的随机数值,计算下标,只有两个结果0或1,并初始化对象
                            rs[h & 1] = new CounterCell(x);
                            //赋值给counterCells
                            counterCells = rs;
                            //初始化成功的标志
                            init = true;
                        }
                    } finally {
                         //释放锁
                        cellsBusy = 0;
                    }
                     //若初始化成功,则说明当前加1的操作也已经完成了,则退出整个循环
                    if (init)
                        break;
                }
             //到这里标志着线程竞争激烈,其它线程占据着counterCells数组,
             //尝试更新baseCount的值,若成功,也说明加1操作成功,则退出循环。。
                else if (U.compareAndSwapLong(this, baseCOUNT, v = baseCount, v + x))
                   // 更新成功直接返回
                    break;                          // Fall back on using base
            }
        }
    
    最后

    到这里ConcurrentHashMap的put方法插入最主要的逻辑基本上都讲完了,相对于jdk1.7来说,jdk1.8ConcurrentHashMap代码实现相对复杂,但是锁的粒度降低了,效率也提高了。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5564068.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存