JDK1.7 ConcurrentHashMap源码QA解析

JDK1.7 ConcurrentHashMap源码QA解析,第1张

JDK1.7 ConcurrentHashMap源码QA解析

前面说到JDK1.7 HashMap是数组+链表的数据结构,但是这是线程不安全的。而ConcurrentHashMap则是HashMap的升级版解决了线程安全问题,而且底层大部分逻辑和HashMap是相似的。接下来还是通过QA方式解析源码。

1.ConcurrentHashMap的数据结构是怎么样的?初始化过程是怎么样的?

(1)首先,ConcurrentHashMap设计是作为线程安全的Map集合,那么就需要考虑怎么支持并发 *** 作,我们用HashMap进行对比,就可以知道HashMap要想并发安全 *** 作就不能不对整个HashMap进行加锁,这样大大影响了整个容器的性能。

作为解决这个问题的设计者们,在ConcurrentHashMap就想到了基于HashMap的数组+链表的思想,将每个数组都拆分成一个个小型各自独立的Segment(每个Segment相当于一个个小型HashMap),每个线程加锁只对各自独立的Segment去加锁 *** 作,这样就大大提升了性能。于是就引入了“分段Segment”的思想,每个分段还集成了ReentrantLock可以对内部HashEntry数据进行CAS *** 作,这样就保证了线程安全。基于此,ConcurrentHashMap的数据结构图大致可以如下:

上图可知:在JDK7的ConcurrentHashMap中,首先有一个Segment数组,存的是Segment对象,Segment相当于一个小HashMap,而Segment内部有一个HashEntry的数组用于存储实际数据。

 (2)ConcurrentHashMap有哪些核心属性,数据结构是怎么初始化的?

从上面知道ConcurrentHashMap与HashMap的属性相似,不同的是ConcurrentHashMap内部有一个Segment数组,而每个Segment内部存储HashEntry数组。

public class ConcurrentHashMap extends AbstractMap
        implements ConcurrentMap, Serializable {


    //默认初始化容量大小16
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    
    static final int MAXIMUM_CAPACITY = 1 << 30;

    
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    
    static final int MAX_SEGMENTS = 1 << 16;

    
    static final int RETRIES_BEFORE_LOCK = 2;

    
    private transient final int hashSeed = randomHashSeed(this);

    
    final int segmentMask;

    
    final int segmentShift;

    
    final Segment[] segments;


    
    static final class HashEntry {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry next;

        HashEntry(int hash, K key, V value, HashEntry next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        
        final void setNext(HashEntry n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // UNSAFE工具类
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }


 
  static final class Segment extends ReentrantLock implements Serializable {
        
        
        transient volatile HashEntry[] table;

        
        transient int count;

        
        transient int modCount;

        
        transient int threshold;

        
        final float loadFactor;

  }

}

从源码上看ConcurrentHashMap底层是由两层嵌套数组来实现的:

     1.ConcurrentHashMap对象中有一个属性segments,类型为Segment[];

     2.Segment对象中有一个属性table,类型为HashEntry[];

接下来看下数据结构的初始化过程。ConcurrentHashMap也有以下多种构造函数:

ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel); ConcurrentHashMap(int initialCapacity, float loadFactor);
ConcurrentHashMap(int initialCapacity); 
ConcurrentHashMap();
ConcurrentHashMap(Map m);

可以看出主要也是对初始化容量,加载因子,初始Segment长度进行初始化。这里要特别注意concurrencyLevel这个属性的作用,下面是构造函数核心代码和相关注释:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    //检查参数的合法性
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;


    //ssize是Segment数组的长度,
    //ssize通过位移 *** 作获得一个刚好比concurrencyLevel大一点的2幂次方整数
    int sshift = 0;
    int ssize = 1;  
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }

    
    this.segmentShift = 32 - sshift;
    //初始化segmentMask=segments数组最大的数组下标,用于后面计算角标位置
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    //根据初始化容量和segments数组长度进行均分,
    //通过均分方式获得一个大于等于均分值的整数,作为每个segment内部的table数组长度,
    //且每个segment内部table数组长度最小为2,最终值为cap用于初始化Segment对象
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY; //cap最小值为2
    while (cap < c)
        cap <<= 1;

    //创建一个Segment[0]对象,这个对象可以用于当其它Segment位置需要初始化时,
    //可以通过Segment[0]对象原型模式直接复制过去
    Segment s0 =
        new Segment(loadFactor, (int)(cap * loadFactor),
                         (HashEntry[])new HashEntry[cap]);

    //初始化Segment数组,数组长度就是取一个刚好比concurrencyLevel大一点的2幂次方整数
    Segment[] ss = (Segment[])new Segment[ssize];

    //将上面创建的Segment[0]通过UNSAFE使用CAS方式写入Segment数组内存
    UNSAFE.putOrderedObject(ss, Sbase, s0);

    //初始化完成的segment数组赋值到segments属性中
    this.segments = ss;
}

通过上述构造函数,一个ConcurrentHashMap对象就产生了,这个对象有Segment数组对象,且Segment[0]初始化了一个table数组长度固定的小型HashMap。

2.ConcurrentHashMap的put流程是怎么样的?

public V put(K key, V value) {
    Segment s;
    //ConcurrentHashMap是不能存储value=null的数据的
    if (value == null)
        throw new NullPointerException();
    //计算hash散列值,如果hashseed种子有值也会参与计算,和hashmap类似
    int hash = hash(key);
    //和segmentMask对应的segments长度与 *** 作计算要存入哪个segments数组下标。
    //这里使用hash值的高(32-segmentShift)位参与计算segments下标
    int j = (hash >>> segmentShift) & segmentMask;
    //判断segments[j]如果为空,则自旋执行ensureSegment生成该位置一个新segment对象
    //UNSAFE.getObject(segments, (j << SSHIFT) + Sbase)实际上是获取segments数组的第j个位置数据,SSHIFT是数组对象头,Sbase是数组的起始位置,两个变量都在static中定义了
    if ((s = (Segment)UNSAFE.getObject(segments, (j << SSHIFT) + Sbase)) == null)
        s = ensureSegment(j);
    //通过获取到的segment,将数据写入该segment对应的table数组中
    return s.put(key, hash, value, false);
}
private Segment ensureSegment(int k) {
    final Segment[] ss = this.segments;
    long u = (k << SSHIFT) + Sbase; // raw offset
    Segment seg;
    //检查此时segments的u位置处是不是已经有初始化好的segment对象,
    //因为可能这时已经被其它线程初始化出来了,通过UNSAFE *** 作内存对象并返回
    if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
        //还没被初始化过,则通过原型模式复制方式得到该新位置segment对象,
        //根据segment[0]位置的原型对象初始化一个HashEntry数组待用
        Segment proto = ss[0]; //use segment 0 as prototype使用segment[0]作为原型
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry[] tab = (HashEntry[])new HashEntry[cap];
        //初始化后UNSAFE再判断此刻内存中该位置是不是已经有segment被初始化出来了
        if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            //此刻还是没有被其它线程初始化segment出来,接下来传入属性构造出新的segment对象
            Segment s = new Segment(lf, threshold, tab);
            //通过自旋方式,假如始终没有其他线程初始化该位置segment对象,那么就通过CAS方式
            //写入到segments[u]位置的内存中,退出并返回该新segment对象。
            while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

得到一个Segment之后就调用Segment对象,执行复杂的加锁s.put(key, hash, value, false) *** 作

当tryLock()得不到锁,则继续尝试加锁,当加锁次数达到最大尝试次数,则转换为lock阻塞等待加锁:

private HashEntry scanAndLockForPut(K key, int hash, V value) {
    HashEntry first = entryForHash(this, hash);  //first记录当前链表的头结点,用于辅助判断本链表是否已经被改变了
    HashEntry e = first;
    HashEntry node = null;
    int retries = -1; // negative while locating node
    //不断尝试tryLock获取锁,在尝试期间会先遍历这个要加锁的链表,看是否有匹配的k-V数据,如果没有就提前将数据封装成HashEntry对象。
    //但是在尝试加锁遍历期间,很可能这个要加锁的segnemt内部链表数据已经被其它线程修改了,那么本线程就要重新检查遍历这条链表数据
    while (!tryLock()) {
        HashEntry f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) //本链表没有匹配k-v数据,则提前创建出hashEntry
                    node = new HashEntry(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) { //当超过尝试次数则阻塞lock方式等待锁
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; //当本链表的数据已经被其它线程改变了,那么最新本链表的头结点肯定就改变了不再等于方法入口声明的first结点,此时需要重新对最新改变后的链表进行遍历
            retries = -1;  //重置重试次数
        }
    }
    return node;
}

当加锁成功之后,后面的新增HashEntry逻辑和HashMap一样了,都是头插法添加到table后解锁。

总结:当调用ConcurrentHashMap的put方法时,先根据key计算出对应的Segment[]的数组下标j,确定好当前key,value应该插入到哪个Segment对象中,如果segments[j]为空,则利用自旋锁的方式在j位置生成一个Segment对象。

然后调用Segment对象的put方法。Segment对象的put方法会先加锁,然后也根据key计算出对应的HashEntry[]的数组下标i,然后将key,value封装为HashEntry对象放入该位置,此过程和JDK7的HashMap的put方法一样,然后解锁。在加锁的过程中逻辑比较复杂,先通过自旋加锁,如果超过一定次数就会直接阻塞等等加锁。

3.ConcurrentHashMap怎么保证线程安全?

主要利用Unsafe *** 作+ReentrantLock+分段思想。

主要使用了Unsafe *** 作中的:

  1. compareAndSwapObject:通过cas的方式修改对象的属性
  2. putOrderedObject:并发安全的给数组的某个位置赋值
  1. getObjectVolatile:并发安全的获取数组某个位置的元素

分段思想是为了提高ConcurrentHashMap的并发量,分段数越高则支持的最大并发量越高,程序员可以通过concurrencyLevel参数来指定并发量。ConcurrentHashMap的内部类Segment就是用来表示某一个段的。

每个Segment就是一个小型的HashMap的,当调用ConcurrentHashMap的put方法是,最终会调用到Segment的put方法,而Segment类继承了ReentrantLock,所以Segment自带可重入锁,当调用到Segment的put方法时,会先利用可重入锁加锁,加锁成功后再将待插入的key,value插入到小型HashMap中,插入完成后解锁。

具体代码体现可参考第2点源码注释。

4.ConcurrentHashMap扩容过程是怎么样的?

JDK7中的ConcurrentHashMap和JDK7的HashMap的扩容是不太一样的。

首先JDK7中也是支持多线程扩容的,原因是JDK7中的ConcurrentHashMap分段了,每一段叫做Segment对象,每个Segment对象相当于一个HashMap。分段之后,对于ConcurrentHashMap而言,能同时支持多个线程进行 *** 作,前提是这些 *** 作的是不同的Segment,而ConcurrentHashMap中的扩容是仅限于本Segment,也就是对应的小型HashMap进行扩容,换言之就是将每个Segement作为一把独立的可重入锁提供给不同线程竞争,所以是可以多线程扩容的。

对于每个Segment内部的扩容逻辑和HashMap中一样,都是创建新table,再将源数据转移到新table中。由于同一时间扩容时只会有一个线程得到锁进行扩容,就不会存在多线程循环链表的问题。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5722218.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存