在上一篇文章中,讲解了HashMapjdk1.7使用头插法,容易形成环形链表,在jdk1.8改为尾插法,虽然解决了环形链表的问题,但是会导致数据丢失的问题,所以我们说HashMap在多线程环境下是不安全的。
所以,不管是jdk1.7还是jdk1.8的HashMap 都存在线程安全的问题。那么在多线程环境下应该这样去保证线程安全呢,有什么办法呢?
上一篇文章中有说到3种解决方案:
- 在多线程环境下用HashTable来解决线程安全的问题,put或者get方法源码如下图:
//put方法 public synchronized V put(K key, V value) { // Make sure the value is not null if (value == null) { throw new NullPointerException(); } // Makes sure the key is not already in the hashtable. Entry,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; @SuppressWarnings("unchecked") Entryentry = (Entry )tab[index]; for(; entry != null ; entry = entry.next) { if ((entry.hash == hash) && entry.key.equals(key)) { V old = entry.value; entry.value = value; return old; } } addEntry(hash, key, value, index); return null; } //get方法 @SuppressWarnings("unchecked") public synchronized V get(Object key) { Entry,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return (V)e.value; } } return null; }
从上面的源码中可以看出不管是put还是get *** 作都会使用synchronized加锁,由此带来的问题是在这个时间段内只能有一个线程可以 *** 纵HashTable,最不能接收的,就是get方法只是一个读取 *** 作为什么要加锁呢?就连删除也都加上锁,你这是为了安全什么都锁啊,你牛!
如果线程数量大一点的话,HashTable的性能会急剧下降,因为每次 *** 作都需要锁住整个对象,而其他线程在此期间是不能 *** 作的。不仅如此,还会带来额外的上下文切换等开销,所以此时它的吞吐量甚至还不如单线程的情况,因此并不适合高并发场景。
- Collections.synchronizedMap(Map
它和HashTable差不多,要锁住整张表,效率低下,如下图源码:
从源码中可以看出SynchronizedMap在Hashtable对比上改进了很多,synchronized不再放在方法上,而是放在方法内部,作为同步块出现,但仍然是对象级别的同步锁,读和写 *** 作都需要获取锁,本质上,仍然只允许一个线程访问,其他线程被排斥在外。
- 上面的两种方式实现线程安全性都不是很合适,所以剩下ConcurrentHashMap,既然锁住整张表效率低下,那能不能把表分成N份呢,把数据尽量均匀的分布到每个部分中,分别给他们加锁,互相之间并不影响呢?那就使用ConcurrentHashMap实现,因为ConcurrentHashMap将Map分段了,每个段进行加锁,而不是像HashTable,SynchronizedMap都是整个map加锁,这样就可以多线程访问了!
在jdk1.7中ConcurrentHashMap 是基于分段锁的,就是将内部分成不同的 Segment(段),每个段里面是HashEntry数组。
在jdk1.8之后对jdk1.7做了很大的改进,不再采用分段锁的机制了,而采用的是 Synchronized + CAS(Compare and Swap,即比较并替换,实现并发算法时常用到的一种技术) ,把锁的粒度进一步降低,而放弃了 Segment 分段。
下面我们就以 jdk1.8的ConcurrentHashMap中put方法源码进行解析
而put方法中主要有以下几个重点的方法进行讲解:
- initTable方法:初始化表
- helpTransfer方法:帮助扩容
- addCount方法:增加元素个数,并且触发扩容 *** 作
- fullAddCount方法:初始化CounterCell,来记录元素个数
- transfer方法:扩容和迁移元素的
说明一下:由于jdk1.8的HashMap和ConcurrentHashMap的基本属性变量、结构和一些初始化相关的逻辑都差不多,这里不在单独说明介绍。看懂HashMap的伙伴们都知道滴吧,嘻嘻
put方法先从put方法中看一下插入新元素的时候,怎么保证线程安全的吧
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //在并发情况下,key 和 value 不支持为空的,为空报异常 if (key == null || value == null) throw new NullPointerException(); 获取key的hash值 int hash = spread(key.hashCode()); // 用来记录所在table数组中的桶的中链表的个数,后面会用于判断是否链表过长需要转红黑树 int binCount = 0; // for循环,直到put成功插入数据才会跳出 for (Node
[] tab = table;;) { // f=桶头节点 n=table的长度 i=在数组中的哪个下标 fh=头节点的hash值 Node f; int n, i, fh; //如果表为空,则说明还未初始化。 if (tab == null || (n = tab.length) == 0) //第一次put的时候table没有初始化,则初始化table,只有一个线程可以初始化成功。 tab = initTable(); //若表已经初始化,则找到当前key所在的桶,并且判断是否为空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若当前桶为空,则通过ConcurrentHashMap原子 *** 作,把新节点插入到此位置, //保证了只有一个线程可以ConcurrentHashMap成功,其它线程都会失败。 if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } //若所在桶不为空,则判断节点的 hash 值是否为 MOVED(值是-1),说明正在扩容 else if ((fh = f.hash) == MOVED) //若为-1,说明当前数组正在进行扩容,则需要当前线程帮忙迁移数据 tab = helpTransfer(tab, f); else { V oldVal = null; //这里用同步锁的方式来保证线程安全,给桶中头节点对象加锁 synchronized (f) { // 在一次检测,看在加锁之前,防止该桶的头节点被其他线程修改 if (tabAt(tab, i) == f) { //如果hash值大于等于0,说明是正常的链表结构 if (fh >= 0) { binCount = 1; //从头结点开始遍历,每遍历一次,binCount计数加1 for (Node e = f;; ++binCount) { K ek; //要存的元素的hash,key跟要存储的位置的节点相同的时候,则用新值替换旧值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空 if ((e = e.next) == null) { //为空的话把这个要加入的节点设置为当前节点的下一个节点 pred.next = new Node (hash, key, value, null); break; } } } //表示已经转化成红黑树了 else if (f instanceof TreeBin) { Node p; binCount = 2; //调用putTreeval方法,将该元素添加到树中去 if ((p = ((TreeBin )f).putTreeval(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果节点个数大于等于8,则调用treeifyBin方法将链表转换为红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) //如果是修改,不是新增,则把旧节点值返回 return oldVal; break; } } } //使用cas统计数量增加1,同时判断是否满足扩容需求,进行扩容 // 元素个数增加1,完成新增后并有可能会触发扩容 addCount(1L, binCount); return null; } 整个put方法的流程:
当添加key、value键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,如果没有的话就初始化数组,然后通过计算hash值来确定放在数组的哪个位置。如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来,如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制。
最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加 *** 作。然后判断当前取出的节点位置存放的是链表还是树,如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话,则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾。如果是树的话,则调用putTreeval方法把这个元素添加到树中去,最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,则调用treeifyBin方法来尝试将链表转为树,或者扩容数组。
initTable方法我们看一下当数组为空的时候是怎么初始化的
private final Node
helpTransfer方法[] initTable() { Node [] tab; int sc; // 循环判断table是否为空,进入while准备开始初始化,直到初始化成功为止 while ((tab = table) == null || tab.length == 0) { //sizeCtl表示有其他线程正在进行初始化 *** 作,把线程挂起。对于table的初始化工作,只能有一个线程在进行 //sizeCtl值有很多中情况,默认值为0 //当为-1时,说明有其它线程正在对表进行初始化 *** 作 //当表初始化成功后,又会把它设置为扩容阈值 //当为一个小于-1的负数,用来表示当前有几个线程正在帮助扩容 if ((sc = sizeCtl) < 0) //线程等待 Thread.yield(); // lost initialization race; just spin //把sc的值设置为-1,表明当前线程正在进行表的初始化,其它失败的线程就会自旋 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //重新检查一下table是否为空 if ((tab = table) == null || tab.length == 0) { //如果sc大于0,则为sc,否则返回默认容量 16 //当调用有参构造创建 Map 时,sc的值是大于0的 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //这里是创建数组 Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; //sc赋值,如果n为16,则sc = 16-16/4 = 12 sc = n - (n >>> 2); } } finally { //赋值给sizeCtl,初始化结束,sizeCtl的值>0 sizeCtl = sc; } //若当前线程初始化表成功,则跳出循环 break; } } return tab; } 当一个线程要对table中元素进行 *** 作的时候,如果检测到节点的hash值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容
final Node
transfer方法[] helpTransfer(Node [] tab, Node f) { Node [] nextTab; int sc; //如果table不为空,并且当前桶头节点为ForwardingNode类型,并且nextTab不为空 // 说明当前桶已经迁移完毕并且去帮助迁移其它桶的元素 // tab != null:表示数组已经初始化 // ForwardingNode:表示在扩容时已经对该节点处理完成且扩容尚未结束 // nextTable != null:表示扩容尚未完成 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode )f).nextTable) != null) { //这里根据table的length得到一个扩容唯一标识戳 int rs = resizeStamp(tab.length); //Map仍在扩容状态的判断 //nextTab == nextTable //条件成立:表示当前还在扩容中 //条件不成立:1.扩容完毕后,nextTable会被设为Null //table == tab //条件成立:表示当前还在扩容中,还未完成 //条件不成立:表示扩容已经结束了,扩容结束之后,最后退出的线程会设置nextTable为table //(sc = sizeCtl) < 0 //条件成立:表示扩容正在进行中 //条件不成立:表示sizeCtl当前是一个大于0的数,此时代表下次扩容的阈值,当前扩容已经结束。 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 这里将sizeCtl的值自增1,表明参与扩容的线程数量+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 进行扩容操作 transfer(tab, nextTab); break; } } return nextTab; } return table; } transfer这个方法我们一直说是在扩容,要明确的一点是与其说是帮助扩容,其实更准确的说应该是帮助迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶中。该方法发生扩容并且迁移数据。
还有就是扩容是第一次初始化表,只能由一个线程完成,而其他线程帮助迁移元素到新数组当中。
看源码之前先看一下扩容迁移数据的流程图,熟悉一下流程:
下面分析源码:
private final void transfer(Node
[] tab, Node [] nextTab) { // n:表示扩容之前table数组的长度 // stride:表示分配给线程任务的步长 int n = tab.length, stride; //将 (n>>>3相当于n/8)然后除以CPU核心数。如果得到的结果小于 16,那么就使用 16 // 这里的目的是让每个CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶,也就是长度为16的时候,扩容的时候只会有一个线程来扩容 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 控制线程迁移数据的最小步长 stride = MIN_TRANSFER_STRIDE; // subdivide range //nextTab未初始化,nextTab是用来扩容的node数组 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") // 如果新出租未初始化,则新建数组为原数组的两倍 Node [] nt = (Node [])new Node,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME //扩容失败,sizeCtl使用int的最大值 sizeCtl = Integer.MAX_VALUE; return; } //使用nextTable代指新数组 nextTable = nextTab; // 记录迁移数据整体位置的一个标记,初始值是原table数组的长度。 // 可以理解为:全局范围内散列表的数据任务处理到哪个桶的位置了 transferIndex = n; } //表示新数组长度 int nextn = nextTab.length; //创建一个标志类(用于表示扩容中状态的节点) ForwardingNode fwd = new ForwardingNode (nextTab); 是否向前推进的标志,首次推进为true,如果等于true,说明需要再次推进一个下标(i--),反之,如果是false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进 boolean advance = true; //是否所有线程都全部迁移完成的标志 boolean finishing = false; // to ensure sweep before committing nextTab //i:表示当前线程正在迁移的桶的下标 //bound:表示它本次可以迁移的范围下限 for (int i = 0, bound = 0;;) { // f:表示桶位的头节点 // fh:表示头节点的hash Node f; int fh; // 如果当前线程可以向后推进,整个while循环就是在算i的值,这个循环就是控制i依次递减,同 时,每个线程都会进入这里取得自己需要转移的桶的范围 while (advance) { int nextIndex, nextBound; //i每次自减 1,直到 bound。若超过bound范围,或者finishing标志为true,则不用向前推进 //若未全部完成迁移,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个桶位 if (--i >= bound || finishing) //这里设置advance=false,是为了防止在没有成功处理一个桶的情况下却进行了推进 advance = false; //这里每次执行都会把transferIndex最新的值赋给nextIndex //transferIndex <= 0 说明原数组中的每个桶位置,都有线程在处理迁移了,需要跳出while循环,并把i改成-1,推进状态变成false,以跳转到以下的if判断在处理的线程是否已经全部完成。 else if ((nextIndex = transferIndex) <= 0) { i = -1; //这里设置advance=false,是为了防止在没有成功处理一个桶的情况下却进行了推进 advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //这个值就是当前线程可以处理的数据迁移下限 bound = nextBound; //第一次的i为15,因为长度16的数组,最后一个元素的下标为15 i = nextIndex - 1; //这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样会导致漏掉某个桶,下面的tabAt方法判断会出现这样的情况。 advance = false; } } //i:表示需要转移的桶的下标,n:表示原数组的容量 if (i < 0 || i >= n || i + n >= nextn) { int sc; //判断是否已经完成扩容,已完成扩容则做收尾逻辑 if (finishing) { //完成扩容后,将引用设置为null nextTable = null; //更新table为新的数组,这里的table是个volatile变量,所以这个赋值 *** 作对其他线程是可见的 table = nextTab; //设置新的扩容阈值,将阈值设置为新容量的3/4,也就是新数组长度的0.75倍 sizeCtl = (n << 1) - (n >>> 1); // 返回结果,扩容结束 return; } //在扩容开始时,会将sizeCtl设置成一个负数,每次有新的线程并发扩容时,会将sizeCtl+1,而当有线程处理完扩容逻辑后再减1,以此来判断是否是最后一个线程 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //这里就是去校验当前sizeCtl是否和初始值是否相等。相等,则说明全部线程迁移完成。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //只有此处,才会把finishing设置为true,finishing为true才会走到上面的if扩容结束条件判断 //推进标记advance置为true finishing = advance = true; //这里会把i从-1修改为16, //这么做是让i再从后向前扫描一遍数组,看看是不是都迁移完成了 //也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件 i = n; // recheck before commit } } //若i的位置元素为空,则说明当前桶的元素已经被迁移完成,就把头节点设置为fwd标志。 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //若当前桶的头节点是fwd ,说明别的线程已经处理过了,再次推进 else if ((fh = f.hash) == MOVED) advance = true; // already processed else {//处理当前桶的数据迁移 //锁定该桶,防止对同一个桶数据并发操作 synchronized (f) { //再次判断当前桶是否有修改过,防止获得锁后,该桶内数据被别的线程插入了新的数据,因为这个f是在未加锁之前获取的node对象,在这期间,可能这个下标处插入了新数据 //如果不做这层校验,会导致新加入到桶内的数据没有被处理,导致数据丢失 if (tabAt(tab, i) == f) { //ln:表示低位链表引用 //hn:表示高位链表引用 Node ln, hn; //若fh >= 0表示是普通链表节点 if (fh >= 0) { //这里主要对长度进行计算 //如果结果为0 ,将其放在低位链表,反之放在高位链表,目的是将链表重新 hash,放到对应的位置上 int runBit = fh & n; Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { //取于桶中每个节点的hash值 int b = p.hash & n; // 如果节点的hash值和首节点的hash值取于结果不同 if (b != runBit) { //更新runBit,用于下面判断lastRun该赋值给ln(低位链表)还是 hn(高位链表)。 runBit = b; //这个lastRun保证后面的节点与自己的取于值相同,避免后面没有必要的循环 lastRun = p; } } //如果runBit == 0说明lastRun引用的链表为低位链表,那么就让ln指向 低位链表 if (runBit == 0) { ln = lastRun; hn = null; } //否则lastRun引用的链表为高位链表,hn指向高位链表 else { hn = lastRun; ln = null; } //遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中 //循环跳出条件:当前循环结点!=lastRun for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; //如果与运算结果是0,那么就还在低位 if ((ph & n) == 0)// 如果是0 ,那么创建低位节点 ln = new Node (ph, pk, pv, ln); else//则创建高位 hn = new Node (ph, pk, pv, hn); } //设置低位链表的位置不变 setTabAt(nextTab, i, ln); //设置高位链表,在原有长度上 +n setTabAt(nextTab, i + n, hn); //标记当前桶已完成迁移 setTabAt(tab, i, fwd); //继续向后推进 advance = true; } //这里是红黑树结构的迁移,逻辑与链表差不多 else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); //以下的 *** 作和链表相同,结果为0的放在低位 if ((h & n) == 0) { //为空的话,说明当前低位链表还没有数据 if ((p.prev = loTail) == null) lo = p; //否则低位链表存在数据了,把数据追加到低位链表的末尾 else loTail.next = p; //将低位链表尾指针指向p节点 loTail = p; ++lc; } else {//否则放在高位 if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果树的节点数小于等于6就转成链表,否则创建新的树 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; //设置低位树的位置不变 setTabAt(nextTab, i, ln); //设置高位树,在原有长度上 +n setTabAt(nextTab, i + n, hn); // 标记该桶已迁移 setTabAt(tab, i, fwd); //继续向后推进 advance = true; } } } } } } 以 runBit =0 为例,迁移后的新数组链表示意图:
addCount方法
addCount 方法的主要作用:
对table的长度+1。无论是通过修改 baseCount,还是通过使用CounterCell。当CounterCell被初始化了,就优先使用他,不再使用 baseCount。
检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。
//x为1,check表示链表上的元素个数 private final void addCount(long x, int check) { CounterCell[] as; long b, s; //判断counterCells是否为空. 为空有可能竞争的线程非常少,可通过cas *** 作尝试修改 baseCount 变量,对这个变量进行原子累加 *** 作 //如果cas失败说明存在竞争,这个时候不能再采用baseCount来累加,而是通过counterCells来记录 if ((as = counterCells) != null || !U.compareAndSwapLong(this, baseCOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; //是否冲突标识,标记为true,默认没有冲突 boolean uncontended = true; //条件一:as == null || (m = as.length - 1) < 0 //若数组为空,进fullAddCount方法 //条件二:(a = as[ThreadLocalRandom.getProbe() & m]) == null //表示方法会给当前线程生成一个随机数,然后用随机数与数组长度取模,计算它所在的表格。若当前线程所分配到的表格为空,需要当前线程进入fullAddCount方法 //条件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) //若CounterCell不为空,且线程所在表格不为空,则尝试CAS修改表格对应的value值加1。 //若修改成功,则执行下一步,否则把uncontended值设为fasle,说明产生了竞争,然后进fullAddCount方法 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //执行循环让当前线程一定把1加成功,这个会在稍后讲 fullAddCount(x, uncontended); return; } //这里若是返回了,那后边怎么判断扩容?搞不懂 if (check <= 1) return; //统计容量大小,主要用于下边的逻辑,是否达到了扩容阈值。 s = sumCount(); } //这里用于检查是否需要扩容 if (check >= 0) { //tab:表示table //nt:表示nextTable //n:表示数组的长度 //sc:表示sizeCtl的临时值 Node
fullAddCount方法[] tab, nt; int n, sc; //sc此时为阈值,容量达到阈值了且table不为空,且table数组长度小于最大长度(可以扩容) //条件一:s >= (long)(sc = sizeCtl) while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //根据数组长度得到一个标识 int rs = resizeStamp(n); //表示当前table正在扩容 if (sc < 0) { //这里是有bug的,感兴趣的小伙伴可以看一下: // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427 //sc == rs + 1想表达的是sc == (rs << RESIZE_STAMP_SHIFT) + 1 //sc == rs + MAX_RESIZERS想表达的是sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS //sc的高16位是数据校验标识,低16位代表当前有几个线程正在帮助扩容,RESIZE_STAMP_SHIFT=16 //如果sc == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1) //nextTable=null 说明需要扩容的新数组还未创建完成 //transferIndex这个参数小于等于0,说明已经不需要其它线程帮助扩容了 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //到这里说明当前线程可以帮助扩容,因此sc值+1,代表扩容的线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //rs << RESIZE_STAMP_SHIFT + 2,这个其实就是将rs的值赋值到高16位 //当sc大于0,说明sc代表扩容阈值,因此第一次扩容之前肯定走这个分支,用于初始化数组nextTable //rs<<16 1000 0000 0001 1011 0000 0000 0000 0000 +2 => 1000 0000 0001 1011 0000 0000 0000 0010 //这个值,转为十进制就是 -2145714174,用于标识,这是扩容时,初始化新数组的状态, //扩容时,需要用到这个参数校验是否所有线程都全部帮助扩容完成。 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //帮助扩容,第二个参数传入null,则说明是第一次初始化新数组 transfer(tab, null); s = sumCount(); } } } 该方法负责初始化CounterCells和更新计数,方法中第二个参数wasUncontended则表示是否不存在竞争,CAS失败之后调用该方法说明存在竞争所以传false。
private final void fullAddCount(long x, boolean wasUncontended) { int h; ///如果当前线程的随机数为0,则强制初始化一个值 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization// 初始化 //得到probe,用于counterCells数组寻址 h = ThreadLocalRandom.getProbe(); //把wasUncontended 设置为 true 表示不存在竞争 wasUncontended = true; } 冲突标志,若为true,表示可能需要扩容,以减少碰撞冲突 boolean collide = false; // True if last slot nonempty //死循环,表示一定要计数成功 for (;;) { CounterCell[] as; CounterCell a; int n; long v; //如果counterCells不为空,说明已经初始化了 if ((as = counterCells) != null && (n = as.length) > 0) { //当前线程所在的CounterCell对象为空 if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell //先创建一个CounterCell对象,把x保存进去 CounterCell r = new CounterCell(x); // Optimistic create //当前线程占用锁 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; //条件三:rs[j = (m - 1) & h] == null //这里又做了一层判断,如果为空才进入 //因为存在一种情况就是上面第一次判断cellsBusy为0时, //可能有一个线程在CAS修改cellsBusy并且初始化完成了, //然后复位cellsBusy //此时另一个线程进入,已经是初始化过了,所以需要再次判断 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //把新创建的对象r的元素放入对应下标的位置 rs[j] = r; //created=true表示创建成功 created = true; } } finally { //手动释放锁 cellsBusy = 0; } //创建成功,且上边的赋值成功,说明加1成功,退出循环 if (created) break; //说明存在竞争该位置已被其他线程放入了CounterCell,继续下次循环 continue; // Slot is now non-empty } } //若cellsBusy=1,说明有其它线程抢锁成功。或者若抢锁的CAS *** 作失败, //都会走到这里,到下面的h = ThreadLocalRandom.advanceProbe(h)中 //重新生成随机数,进行下次循环判断 collide = false; } //如果走到这,说明当前方法在被调用之前已经 CAS 失败过一次, //重新获取线程随机数,并把wasUncontended 设置为true,再循环一次 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //若wasUncontended为true表示不存在竞争,尝试cas修改,成功则退出,否则继续往下走 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; //如果发生过扩容或者数组长度已经达到虚拟机最大可以核心数 //因为长度超限,则说明已经无法扩容,只能认为无碰撞。 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale //数组长度小于CPU核心数并且collide为false,就把collide改为true, //说明下次循环可能需要扩容 else if (!collide) collide = true; //到了这里,说明竞争激烈,数组容量不够大,需要进行扩容了 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale //创建一个容量为原来两倍的数组,再通过for循环进行数据迁移。 CounterCell[] rs = new CounterCell[n << 1]; //转移旧数组的值 for (int i = 0; i < n; ++i) rs[i] = as[i]; //把扩容后的对象赋值给counterCells。 counterCells = rs; } } finally { // 释放锁 cellsBusy = 0; } //认为扩容后,下次不会产生冲突了 collide = false; //当次扩容后,就不需要重新生成随机数了 continue; // Retry with expanded table } //上面失败都会到这里来 //这里重新生成一个随机数,下一次循环判断 h = ThreadLocalRandom.advanceProbe(h); } //若cellsBusy为0,说明不存在锁,线程都可以抢锁,若为1,表示已经有线程拿到了锁, //则其它线程不能抢占锁 //上面是counterCells数组已初始化的情况,下面是未初始化情况的处理 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //这一步是初始化CounterCells数组 boolean init = false; try { // Initialize table //重新检查下counterCells数组引用是否有变化 if (counterCells == as) { //初始化一个长度为2的数组 CounterCell[] rs = new CounterCell[2]; //根据当前线程的随机数值,计算下标,只有两个结果0或1,并初始化对象 rs[h & 1] = new CounterCell(x); //赋值给counterCells counterCells = rs; //初始化成功的标志 init = true; } } finally { //释放锁 cellsBusy = 0; } //若初始化成功,则说明当前加1的操作也已经完成了,则退出整个循环 if (init) break; } //到这里标志着线程竞争激烈,其它线程占据着counterCells数组, //尝试更新baseCount的值,若成功,也说明加1操作成功,则退出循环。。 else if (U.compareAndSwapLong(this, baseCOUNT, v = baseCount, v + x)) // 更新成功直接返回 break; // Fall back on using base } }
最后到这里ConcurrentHashMap的put方法插入最主要的逻辑基本上都讲完了,相对于jdk1.7来说,jdk1.8ConcurrentHashMap代码实现相对复杂,但是锁的粒度降低了,效率也提高了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)