2. 题目剖析ConcurrentHashMap的底层原理是什么?
ConcurrentHashMap中的key、value可以为空吗?为什么?
说说ConcurrentHashMap是怎么添加数据的?
ConcurrentHashMap#put添加数据的流程是怎么样的?
.......
壹哥在前面4篇文章中,给大家介绍了ConcurrentHashMap的通用功能、特点,以及JDK 7、8中ConcurrentHashMap的底层数据结构,还有ConcurrentHashMap中的核心属性等内容,文章链接如下:
高薪程序员&面试题精讲系列45之你熟悉ConcurrentHashMap吗?
高薪程序员&面试题精讲系列46之说说JDK7中ConcurrentHashMap的底层原理,有哪些数据结构
高薪程序员&面试题精讲系列47之说说JDK8中ConcurrentHashMap的底层原理,HashMap与ConcurrentHashMap有什么区别?
高薪程序员&面试题精讲系列48之说说JDK8中ConcurrentHashMap的sizeCtl是什么意思?最大容量、负载因子是多少? 从本文开始,壹哥给大家重点分析JDK 8中ConcurrentHashMap#put()方法的源码,重点讲解数据的添加流程,这一块是咱们面试时的高频考点。
二. put(k,v)方法实现详解【重点】 1. put(k,v)源码首先我们先把put(k,v)方法的源码贴出来,看看该方法是怎么定义的。
public V put(K key, V value){ return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent){ if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin )f).putTreeval(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
注意:
我们仔细看会发现,put()方法中,代码中加锁用的是 synchronized 关键字,而不是JDK 7 中的 ReentrantLock!
2. put源码分拆解读【重点】我们会发现put()方法的源码很复杂,第一次看看你会有点懵圈,所以壹哥就把上面的源码,一步步拆分出来进行详细的解读。
2.1 对key、value进行非空验证在ConcurrentHashMap的put(k,v)方法中,是不允许我们的key和value为空的,否则直接就会产生空指针异常!这是怎么实现的呢?看看put()方法的定义即可。put(k,v)方法内部实际上使用的是putVal()方法进行真正的添加 *** 作,这一点和HashMap是相同的。
所以我们进入到putVal()方法中,第一行代码就是对key、value进行非空判断,如果为空直接抛出空指针异常。
if (key == null || value == null) throw new NullPointerException();
有的小伙伴可能会很好奇,为啥它的key和value就不能为null,而HashMap中的key和value就可以为null?
这是因为ConcurrentHashMap是并发集合,如果key或value为null,就可能会产生这样一个问题。当我们通过get(key)方法获取对应的value时,如果获取到的是null,我们就无法判断,它是在put(k,v)时就把value置为了null,还是这个key本来就没有实现映射?这个就没办法判断了。而HashMap是非并发的,可以通过contains(key)方法来进行校验,但支持并发的Map在调用m.contains(key) 和 m.get(key)时,这个m可能已经不同了,所以为了保持一致性,就必须把key与value都设置成非空的!
2.2 计算key的hash值接下来是计算要插入的key对应的hash值。
int hash = spread(key.hashCode());
这里使用了一个spread()方法对hashCode进行按位异或 *** 作,其源码如下:
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
我们发现这个spread()方法实际上和HashMap是类似的,只是多加了HASH_BITS进行与运算,这样就减少了哈希冲突,HASH_BITS的值如下所示:
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
HASH_BITS是为了消除最高位上的负号,负的哈希值在ConcurrentHashMap中有特殊意义,表示正在扩容或者是树节点。
2.3 初始化table数组接下来会进入到一个死循环中,循环的第一步是判断table数组是否为空,为空则进行数组初始化。这一点和HashMap一样,这是一种lazy initialize的方法,直到插入数据时才会进行初始化。
if (tab == null || (n = tab.length) == 0) tab = initTable();2.4 当前位置为空时插入新节点
接着会判断当前key所对应的位桶上是否有数据存在,如果为null的话,就会新建一个Node节点,采用CAS的方式插入到当前位桶上。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin }
注意这里出现了两个新的方法,tabAt()和casTabAt(),其源码如下:
static finalNode tabAt(Node [] tab, int i) { return (Node )U.getObjectVolatile(tab, ((long)i << ASHIFT) + Abase); } static final boolean casTabAt(Node [] tab, int i,Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + Abase, c, v); }
tabAt()方法中使用了getObjectVolatile()方法,该方法可以直接获取指定内存中的数据,这保证了每次拿到的数据都是最新的。
casTabAt()方法中会插入一个Node节点,其底层调用了Unsafe包中的compareAndSwapObject函数,这是利用CAS乐观锁实现并发同步的方法。
由于compareAndSwapObject()方法的第2个参数是内存地址,因此使用
(long)i <进行操作,这是通过 起始地址+段内偏移量 来计算内存地址的方式。我们来看看为什么这样可以算出偏移地址,其源码如下:
Class> ak = Node[].class; Abase = U.arraybaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
上面的源码中,有如下代码:
Abase=U.arraybaseOffset(ak);
这里的Abase就是起始地址。然后接下来的scale变量中存储的是Node节点大小,这里比较难理解的是ASHIFT函数,这里我先贴出ASHIFT的计算公式,如下图所示:
该公式是使用numberOfLeadingZeros函数 计算出来的,这个函数的作用是计算从左边开始,到右边为止,为0的元素个数。
然后我们再看看((long)i << ASHIFT) + Abase这个公式,就很好理解了。
当然,对于这个函数简单了解即可。
2.5 判断数组的扩容时机接下来会通过一个if判断,判断节点是否处于MOVED状态,从而判断数组是否应该进行扩容。
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
这里判断的意思是,如果发现table数组进行正在扩容,会帮助一起进行扩容。helpTransfer函数是一个比较复杂的函数,后面我们再单独讨论,这里先大致了解其功能即可。
2.6 对链表进行加锁处理接下来会对位桶中的链表进行处理,ConcurrentHashMap中的锁就加在这个代码块上,锁的粒度还是比较小的。
synchronized (f) { //如果当前位桶上已经有数据节点了,则利用链表来进行冲突的解决 if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Nodee = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } }
这里会使用sychronized互斥锁来锁住f节点,并对f节点所连接的链表进行遍历,如果发现它们的key相同,那么可能就会去更新value的值。这里是否会更新value是根据onlyIfAbsent的值来决定的。
2.7 判断是否把当前节点插入到红黑树中如果判断当前节点是TreeBin类型,属于是红黑树节点类型,此时就会采用红黑树的方式去做插入。
else if (f instanceof TreeBin) { Node2.8 判断是否需要进行树化p; binCount = 2; if ((p = ((TreeBin )f).putTreeval(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } }
根据binCount位桶上的节点数量是否>=TREEIFY_THRESHOLD来决定是否进行树化 *** 作。如果链表长度大于等于树化的阈值,则会转化为红黑树。
if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
以上就是壹哥对put()方法的源码解读,因为该方法的内部实现比较复杂,所以接下来我会把put()方法要点再重新梳理一遍,我们在面试时把下面put()方法的执行流程讲解清楚即可。
3. putVal()方法执行流程【重点】从上面的源码及解读中,我们可以知道ConcurrentHashMap在put()方法上的实现思路,整体上是和HashMap相同的,但在线程安全方面加了很多保障性的代码,其整体执行流程如下:
4. putVal()方法执行流程的简化图解【重点】
- ①. 先判断table数组是否为空,如果为空,则进行数组的初始化,初始化完成之后,执行第②步;
- ②. 判断当前位桶上的节点是否为null,如果为空则利用 CAS 创建,如果创建失败则继续自旋(for死循环),直到成功为止。如果当前位桶上的节点不为空,则执行第③步;
- ③. 如果当前位桶上的节点是转移节点(正在扩容,节点状态是MOVED),就会一直自旋等待扩容完成之后再新增,如果不是转移节点则执行第④步;
- ④. 如果当前位桶上已有值,则先锁定当前位桶,保证其余线程不能 *** 作。然后判断是否为链表,如果是链表,则将新增的值添加到链表的尾部;如果是红黑树,则使用红黑树的方法进行新增 *** 作;
- ⑤. 当新增完成之后,要check是否需要扩容,需要的话就进行扩容。
上面壹哥跟大家说了,putVal()方法的内部实现是比较复杂的,我们如果就是为了应付面试,其实可以结合上面的小节,再结合下图中的伪代码来简化对putVal()的理解,如下图所示:
理解上述put *** 作过程时要注意,只有在执行到break之后才会跳出循环,如果没有遇到break则会重试,而JDK代码中的重试功能几乎全都是用死循环来实现的。
5. put()方法梳理总结【重点】因为put()方法是在是太重要了,面试时如果问到ConcurrentHashMap的源码,十有八九就是考察这个put()方法,所以壹哥只能不厌其烦的跟大家梳理该方法的执行流程和逻辑。最后我再把put()存值的实现步骤进行简单归纳,方便我们记忆理解。如果面试官不深究put()源码,回答如下答案也行,如下所示:
三. 结语
- ①. 先进行参数(k-v)校验;
- ②. 若table[]数组未创建,则初始化;
- ③. 当table[i]后面无节点时,直接创建Node(以无锁方式 *** 作);
- ④. 如果当前数组正在扩容,则帮助扩容并返回到最新的table[]中;
- ⑤. 然后在链表或者红黑树中追加节点;
- ⑥. 最后再判断是否达到阀值,如果达到则变为红黑树结构。
本篇文章,壹哥 重点讲解了JDK 8版本ConcurrentHashMap#put()添加数据方法的源码。接下来 壹哥 会再通过另一篇文章,给大家讲解 JDK 8中的ConcurrentHashMap#transfer()方法的源码及扩容机制,请做好准备哦。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)