高薪程序员&面试题精讲系列49之说说ConcurrentHashMap#put方法的源码及数据添加流程

高薪程序员&面试题精讲系列49之说说ConcurrentHashMap#put方法的源码及数据添加流程,第1张

高薪程序员&面试题精讲系列49之说说ConcurrentHashMap#put方法源码及数据添加流程 一. 面试题及剖析 1. 今日面试题

ConcurrentHashMap的底层原理是什么?

ConcurrentHashMap中的key、value可以为空吗?为什么?

说说ConcurrentHashMap是怎么添加数据的?

ConcurrentHashMap#put添加数据的流程是怎么样的?

.......

2. 题目剖析

壹哥在前面4篇文章中,给大家介绍了ConcurrentHashMap的通用功能、特点,以及JDK 7、8中ConcurrentHashMap的底层数据结构,还有ConcurrentHashMap中的核心属性等内容,文章链接如下:

高薪程序员&面试题精讲系列45之你熟悉ConcurrentHashMap吗?

高薪程序员&面试题精讲系列46之说说JDK7中ConcurrentHashMap的底层原理,有哪些数据结构

高薪程序员&面试题精讲系列47之说说JDK8中ConcurrentHashMap的底层原理,HashMap与ConcurrentHashMap有什么区别?

高薪程序员&面试题精讲系列48之说说JDK8中ConcurrentHashMap的sizeCtl是什么意思?最大容量、负载因子是多少? 从本文开始,壹哥给大家重点分析JDK 8中ConcurrentHashMap#put()方法的源码,重点讲解数据的添加流程,这一块是咱们面试时的高频考点。

二. put(k,v)方法实现详解【重点】 1. put(k,v)源码

首先我们先把put(k,v)方法的源码贴出来,看看该方法是怎么定义的。

public V put(K key, V value){
    return putVal(key, value, false);
}


final V putVal(K key, V value, boolean onlyIfAbsent){
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node[] tab = table;;) {
        Node f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                new Node(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;                            }
                                Node pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node p;
                            binCount = 2;
                            if ((p = ((TreeBin)f).putTreeval(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
}

注意:

我们仔细看会发现,put()方法中,代码中加锁用的是 synchronized 关键字,而不是JDK 7 中的 ReentrantLock!

2. put源码分拆解读【重点】

我们会发现put()方法的源码很复杂,第一次看看你会有点懵圈,所以壹哥就把上面的源码,一步步拆分出来进行详细的解读。

2.1 对key、value进行非空验证

在ConcurrentHashMap的put(k,v)方法中,是不允许我们的key和value为空的,否则直接就会产生空指针异常!这是怎么实现的呢?看看put()方法的定义即可。put(k,v)方法内部实际上使用的是putVal()方法进行真正的添加 *** 作,这一点和HashMap是相同的。

所以我们进入到putVal()方法中,第一行代码就是对key、value进行非空判断,如果为空直接抛出空指针异常。

if (key == null || value == null) throw new NullPointerException();

有的小伙伴可能会很好奇,为啥它的key和value就不能为null,而HashMap中的key和value就可以为null?

这是因为ConcurrentHashMap是并发集合,如果key或value为null,就可能会产生这样一个问题。当我们通过get(key)方法获取对应的value时,如果获取到的是null,我们就无法判断,它是在put(k,v)时就把value置为了null,还是这个key本来就没有实现映射?这个就没办法判断了。而HashMap是非并发的,可以通过contains(key)方法来进行校验,但支持并发的Map在调用m.contains(key) 和 m.get(key)时,这个m可能已经不同了,所以为了保持一致性,就必须把key与value都设置成非空的!

2.2 计算key的hash值

接下来是计算要插入的key对应的hash值。

int hash = spread(key.hashCode());

这里使用了一个spread()方法对hashCode进行按位异或 *** 作,其源码如下:

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

我们发现这个spread()方法实际上和HashMap是类似的,只是多加了HASH_BITS进行与运算,这样就减少了哈希冲突,HASH_BITS的值如下所示:

static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

HASH_BITS是为了消除最高位上的负号,负的哈希值在ConcurrentHashMap中有特殊意义,表示正在扩容或者是树节点

2.3 初始化table数组

接下来会进入到一个死循环中,循环的第一步是判断table数组是否为空,为空则进行数组初始化。这一点和HashMap一样,这是一种lazy initialize的方法,直到插入数据时才会进行初始化。

if (tab == null || (n = tab.length) == 0)
    tab = initTable();
2.4 当前位置为空时插入新节点

接着会判断当前key所对应的位桶上是否有数据存在,如果为null的话,就会新建一个Node节点,采用CAS的方式插入到当前位桶上。

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
        new Node(hash, key, value, null)))
        break;  // no lock when adding to empty bin
}

注意这里出现了两个新的方法,tabAt()和casTabAt(),其源码如下:

static final  Node tabAt(Node[] tab, int i) {
    return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + Abase);
}

static final  boolean casTabAt(Node[] tab, int i,Node c, Node v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + Abase, c, v);
}

tabAt()方法中使用了getObjectVolatile()方法,该方法可以直接获取指定内存中的数据,这保证了每次拿到的数据都是最新的。

casTabAt()方法中会插入一个Node节点,其底层调用了Unsafe包中的compareAndSwapObject函数,这是利用CAS乐观锁实现并发同步的方法。

由于compareAndSwapObject()方法的第2个参数是内存地址,因此使用
(long)i <进行操作,这是通过 起始地址+段内偏移量 来计算内存地址的方式。我们来看看为什么这样可以算出偏移地址,其源码如下:

Class ak = Node[].class;

Abase = U.arraybaseOffset(ak);

int scale = U.arrayIndexScale(ak);

if ((scale & (scale - 1)) != 0)

    throw new Error("data type scale not a power of two");

ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);

上面的源码中,有如下代码:

Abase=U.arraybaseOffset(ak);

这里的Abase就是起始地址。然后接下来的scale变量中存储的是Node节点大小,这里比较难理解的是ASHIFT函数,这里我先贴出ASHIFT的计算公式,如下图所示:

该公式是使用numberOfLeadingZeros函数 计算出来的,这个函数的作用是计算从左边开始,到右边为止,为0的元素个数。

然后我们再看看((long)i << ASHIFT) + Abase这个公式,就很好理解了。

当然,对于这个函数简单了解即可。

2.5 判断数组的扩容时机

接下来会通过一个if判断,判断节点是否处于MOVED状态,从而判断数组是否应该进行扩容。

else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);

这里判断的意思是,如果发现table数组进行正在扩容,会帮助一起进行扩容。helpTransfer函数是一个比较复杂的函数,后面我们再单独讨论,这里先大致了解其功能即可。

2.6 对链表进行加锁处理

接下来会对位桶中的链表进行处理,ConcurrentHashMap中的锁就加在这个代码块上,锁的粒度还是比较小的。

synchronized (f) {
    //如果当前位桶上已经有数据节点了,则利用链表来进行冲突的解决
    if (tabAt(tab, i) == f) {
        if (fh >= 0) {
            binCount = 1;
            for (Node e = f;; ++binCount) {
                K ek;
                if (e.hash == hash &&
                    ((ek = e.key) == key ||
                     (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node pred = e;
                if ((e = e.next) == null) {
                    pred.next = new Node(hash, key,
                                              value, null);
                    break;
                }
            }
}

这里会使用sychronized互斥锁来锁住f节点,并对f节点所连接的链表进行遍历,如果发现它们的key相同,那么可能就会去更新value的值。这里是否会更新value是根据onlyIfAbsent的值来决定的。

2.7 判断是否把当前节点插入到红黑树中

如果判断当前节点是TreeBin类型,属于是红黑树节点类型,此时就会采用红黑树的方式去做插入。

else if (f instanceof TreeBin) {
    Node p;
    binCount = 2;
    if ((p = ((TreeBin)f).putTreeval(hash, key,
                                   value)) != null) {
        oldVal = p.val;
        if (!onlyIfAbsent)
            p.val = value;
    }
}
2.8 判断是否需要进行树化

根据binCount位桶上的节点数量是否>=TREEIFY_THRESHOLD来决定是否进行树化 *** 作。如果链表长度大于等于树化的阈值,则会转化为红黑树。

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

以上就是壹哥对put()方法的源码解读,因为该方法的内部实现比较复杂,所以接下来我会把put()方法要点再重新梳理一遍,我们在面试时把下面put()方法的执行流程讲解清楚即可。

3. putVal()方法执行流程【重点】

从上面的源码及解读中,我们可以知道ConcurrentHashMap在put()方法上的实现思路,整体上是和HashMap相同的,但在线程安全方面加了很多保障性的代码,其整体执行流程如下:

  • ①. 先判断table数组是否为空,如果为空,则进行数组的初始化,初始化完成之后,执行第②步;
  • ②. 判断当前位桶上的节点是否为null,如果为空则利用 CAS 创建,如果创建失败则继续自旋(for死循环),直到成功为止。如果当前位桶上的节点不为空,则执行第③步;
  • ③. 如果当前位桶上的节点是转移节点(正在扩容,节点状态是MOVED),就会一直自旋等待扩容完成之后再新增,如果不是转移节点则执行第④步;
  • ④. 如果当前位桶上已有值,则先锁定当前位桶,保证其余线程不能 *** 作。然后判断是否为链表,如果是链表,则将新增的值添加到链表的尾部;如果是红黑树,则使用红黑树的方法进行新增 *** 作;
  • ⑤. 当新增完成之后,要check是否需要扩容,需要的话就进行扩容。
4. putVal()方法执行流程的简化图解【重点】

上面壹哥跟大家说了,putVal()方法的内部实现是比较复杂的,我们如果就是为了应付面试,其实可以结合上面的小节,再结合下图中的伪代码来简化对putVal()的理解,如下图所示:

理解上述put *** 作过程时要注意,只有在执行到break之后才会跳出循环,如果没有遇到break则会重试,而JDK代码中的重试功能几乎全都是用死循环来实现的。

5. put()方法梳理总结【重点】

因为put()方法是在是太重要了,面试时如果问到ConcurrentHashMap的源码,十有八九就是考察这个put()方法,所以壹哥只能不厌其烦的跟大家梳理该方法的执行流程和逻辑。最后我再把put()存值的实现步骤进行简单归纳,方便我们记忆理解。如果面试官不深究put()源码,回答如下答案也行,如下所示:

  • ①. 先进行参数(k-v)校验;
  • ②. 若table[]数组未创建,则初始化;
  • ③. 当table[i]后面无节点时,直接创建Node(以无锁方式 *** 作);
  • ④. 如果当前数组正在扩容,则帮助扩容并返回到最新的table[]中;
  • ⑤. 然后在链表或者红黑树中追加节点;
  • ⑥. 最后再判断是否达到阀值,如果达到则变为红黑树结构。
三. 结语

本篇文章,壹哥 重点讲解了JDK 8版本ConcurrentHashMap#put()添加数据方法的源码。接下来 壹哥 会再通过另一篇文章,给大家讲解 JDK 8中的ConcurrentHashMap#transfer()方法的源码及扩容机制,请做好准备哦。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5686246.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存