JUC重点

JUC重点,第1张

JUC重点 ConcurrentHashMap  1.7 特点 

图片来源:ConcurrentHashMap中有十个提升性能的细节,你都知道吗?ConcurrentHashMap的源码常读常新!https://mp.weixin.qq.com/s?__biz=MzkyMjIzOTQ3NA==&mid=2247484633&idx=1&sn=87cb3d1f0670f598e4c6b72f0e188132&source=41#wechat_redirect

1.使用分段锁 Segment,继承自  ReentrantLock;只对某一段进行加锁,不会进行全局加锁;锁之间互不影响,提高并行度。

2.Segment 内部为 HashEntry 数组,等同于 HashMap 内部结构。

3.发生哈希冲突,连接成链表,采用头插法。

初始化

1. 重要属性 

// 默认 map 大小,即 HashEntry 的默认数量
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默认负载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默认并行度,即 segment 数组大小(分段锁数量)
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 每个分段最小容量,即最少 HashEntry 数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 每个分段最大容量
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
// 默认自旋次数,当自旋次数超过此值,则阻塞等待锁释放;
// 避免执行 size() 时,map 被频繁更新,导致无限进行自旋,影响性能
static final int RETRIES_BEFORE_LOCK = 2;
// 用于索引 segment 的掩码值,key 哈希值的高位用于选择 segment
final int segmentMask;
// 用于索引 segment 时,使用的偏移值,以此得到 key 哈希值的高位
final int segmentShift;
// Segment 数组 
final Segment[] segments;

2. 构造方法

// initialCapacity HashEntry 的数量;
// loadFactor 负载因子;
// concurrencyLevel 并行度(segment 数量,分段锁数量)
public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // 幂次
    int sshift = 0;
    // 并行度(segment 数量,分段锁数量),为 2 ^ sshift 
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 索引 segment 时,使用的偏移值;
    // 右移 segmentShift 位,得到 key 哈希值高 sshift 位
    this.segmentShift = 32 - sshift;
    // 索引 segment时,使用的掩码
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 每个 segment 里,HashEntry 的数量;
    // 等于,大于 initialCapacity / ssize 的,2 的幂次
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // 创建第一个 segment;其他均为懒加载,并以 segments[0] 为模板
    Segment s0 =
        new Segment(loadFactor, (int)(cap * loadFactor),
                         (HashEntry[])new HashEntry[cap]);
    // 初始化 segments
    Segment[] ss = (Segment[])new Segment[ssize];
    // 保证可见性
    UNSAFE.putOrderedObject(ss, Sbase, s0); // ordered write of segments[0]
    this.segments = ss;
}

3.Segment 内重要属性

// scanAndLockForPut中自旋循环获取锁的最大自旋次数
static final int MAX_SCAN_RETRIES =
    Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
// 键值对,存储结构
transient volatile HashEntry[] table;
// 实际键值对个数;无 volatile 修饰,不保证可见性
transient int count;
// segment元素修改次数记录
transient int modCount;
// 扩容阈值
transient int threshold;
// 负载因子
final float loadFactor;

4.HashEntry 内重要属性

final int hash;
final K key;
// 确保可见性
volatile V value;
// 确保可见性
volatile HashEntry next;
get()
// 没有加锁 *** 作,弱一致性换取性能
public V get(Object key) {
    Segment s; // manually integrate access methods to reduce overhead
    HashEntry[] tab;
    // 获取 key 对应的 hash 值
    int h = hash(key);
    // 通过 hash 值高位计算得到 Segment 的偏移量
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase;
    // 使用 volatile 读,获取对应的 Segment
    if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
        // HashEntry[] 不为空
        (tab = s.table) != null) {
        // 1. 通过 hash 值低位计算得到 HashEntry 的偏移量;
        // 2. 使用 volatile 读,获取 HashEntry 的链表头结点;进行遍历
        // 3. happens-before 原则:①volatile 写在读之前 ②解锁在加锁之前 
        // 4. 即使第一时间获取到的是新值,但是并发环境下,可能刚获取完,就被更新了
        //    (先读后写了);弱一致性,体现在这
        for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

没有加锁 *** 作,弱一致性换取性能

put()

1. Segment -> put()

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 尝试获取锁,获取成功则 node = null;
    // 获取失败,则进入 scanAndLockForPut 方法,自旋获取锁
    HashEntry node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    // 因为加了锁,同一时刻都只有一个线程执行下面的 *** 作;
    // 并且解锁前,数据都会同步到主存,所以无需使用 volatile 修饰变量;
    // 减小 volatile 对性能的影响
    V oldValue;
    try {
        HashEntry[] tab = table;
        int index = (tab.length - 1) & hash;
        // --- 标记 ---
        HashEntry first = entryAt(tab, index);
        for (HashEntry e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                // 表示其他线程在自旋过程中,已经创建了新结点;
                // 得到锁时,不需要再创建一遍,减少了锁的占用时间
                if (node != null)
                    // 将 node 使用头插法
                    node.setNext(first);
                else
                    node = new HashEntry(hash, key, value, first);
                // 先判断是否扩容,再插入新结点;
                // HashMap 是先插入,再判断扩容,可能导致扩容后再无新结点插入;
                // 造成无效扩容
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    // 已经加锁,确保扩容安全
                    rehash(node);
                else
                    // --- 标记 ---
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

2. Segment -> scanAndLockForPut()

private HashEntry scanAndLockForPut(K key, int hash, V value) {
    HashEntry first = entryForHash(this, hash);
    HashEntry e = first;
    HashEntry node = null;
    int retries = -1; // negative while locating node
    // 获取锁失败,则遍历链表,将遍历过的 HashEntry 放入 CPU 高速缓存中;
    // 获取锁之后,再次定位速度会十分快,等待过程中的一种预热方式
    while (!tryLock()) {
        HashEntry f; // to recheck first below
        // 首次进入,retries = -1 恒成立
        if (retries < 0) {
            // e = null 的原因:①HashEntry[] 对应位置为 null②遍历表到最后,没有指定 key
            if (e == null) {
                // 再次进入循环,避免重复符值
                if (node == null) // speculatively create node
                    node = new HashEntry(hash, key, value, null);
                retries = 0;
            }
            // 找到指定 key
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        // 自旋次数超过最大次数,则直接进入阻塞;
        // 无限制的自旋,性能损耗比阻塞更大
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        // 遍历过程中,发现链表已经被改变,则重新进行遍历
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}
rehash()
private void rehash(HashEntry node) {
    HashEntry[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    HashEntry[] newTable =
        (HashEntry[]) new HashEntry[newCapacity];
    // 新掩码,比原来的多一位 1;00001111 -> 00011111
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry e = oldTable[i];
        if (e != null) {
            HashEntry next = e.next;
            // 生成新下标
            int idx = e.hash & sizeMask;
            // 没有后继节点,只有单个节点
            if (next == null)   //  Single node on list 
                // 新数组可以直接引用
                newTable[idx] = e;
            else { 
                // 非单节点
                // Reuse consecutive sequence at same slot
                HashEntry lastRun = e;
                int lastIdx = idx;
                for (HashEntry last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        // 第一个后续所有节点在新数组中下标保持不变的节点 lastRun
                        lastRun = last;
                    }
                }
                //这个for循环就是找到第一个后续节点新的index不变的节点。
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                //lastRun 之前的结点均需复制一遍
                for (HashEntry p = e; p != lastRun; p = p.next) {
                    int h = p.hash;
                    // 生成新下标
                    int k = h & sizeMask;
                    // 头插法
                    HashEntry n = newTable[k];
                    newTable[k] = new HashEntry(h, p.key, p.value, n);
                }
            }
        }
    }
    // 为新加入的节点,在新数组生成下标
    int nodeIndex = node.hash & sizeMask; // add the new node
    // 头插法
    node.setNext(newTable[nodeIndex]);
    // 新数组下标指向新节点
    newTable[nodeIndex] = node;
    // 赋值给原数组
    table = newTable;
}

1.put 时,会判断 Segment 里的 HashEntry 数量是否超过阈值(threshold);超过则进行扩容。

2.rehash() 是没有加锁的,因为外层已经加了锁,确保是单线程 *** 作。

3.过程:新建容量为原来两倍的新数组;每个桶,找到第一个后续所有节点在新数组中下标保持不变的节点 lastRun,新数组可直接引用 lastRun,之前的结点全部需要复制。

size() 
public int size() {
    final Segment[] segments = this.segments;
    // HashEntry 数量
    int size;
    // 是否溢出
    boolean overflow;
    // 本次的 modcount 值
    long sum;
    // 上次的 modcount 值
    long last = 0L;
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // 当 retries = RETRIES_BEFORE_LOCK 时,全部 Segment 加锁
            // 会导致本来延迟加载的 Segment 全部创建
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    // ensureSegment 确保该 Segment 已经创建;
                    // segments[0] 为模板,懒加载其他结点
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment seg = segmentAt(segments, j);
                if (seg != null) {
                    // 本轮 modcount
                    sum += seg.modCount;
                    // 本 Segment 元素数量
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            // modcount 没变,则计算结果准确
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        // 只有当 retries = RETRIES_BEFORE_LOCK 时,才加锁;防止未加锁,进行解锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}
1.8 特点 

1.使用 synchronized + CAS 保证并发安全。 

2.发生哈希冲突,链表 + 红黑树。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5672960.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存