初始化图片来源:ConcurrentHashMap中有十个提升性能的细节,你都知道吗?ConcurrentHashMap的源码常读常新!https://mp.weixin.qq.com/s?__biz=MzkyMjIzOTQ3NA==&mid=2247484633&idx=1&sn=87cb3d1f0670f598e4c6b72f0e188132&source=41#wechat_redirect
1.使用分段锁 Segment,继承自 ReentrantLock;只对某一段进行加锁,不会进行全局加锁;锁之间互不影响,提高并行度。
2.Segment 内部为 HashEntry 数组,等同于 HashMap 内部结构。
3.发生哈希冲突,连接成链表,采用头插法。
get()1. 重要属性
// 默认 map 大小,即 HashEntry 的默认数量 static final int DEFAULT_INITIAL_CAPACITY = 16; // 默认负载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; // 默认并行度,即 segment 数组大小(分段锁数量) static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; // 每个分段最小容量,即最少 HashEntry 数量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; // 每个分段最大容量 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative // 默认自旋次数,当自旋次数超过此值,则阻塞等待锁释放; // 避免执行 size() 时,map 被频繁更新,导致无限进行自旋,影响性能 static final int RETRIES_BEFORE_LOCK = 2; // 用于索引 segment 的掩码值,key 哈希值的高位用于选择 segment final int segmentMask; // 用于索引 segment 时,使用的偏移值,以此得到 key 哈希值的高位 final int segmentShift; // Segment 数组 final Segment[] segments; 2. 构造方法
// initialCapacity HashEntry 的数量; // loadFactor 负载因子; // concurrencyLevel 并行度(segment 数量,分段锁数量) public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 幂次 int sshift = 0; // 并行度(segment 数量,分段锁数量),为 2 ^ sshift int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 索引 segment 时,使用的偏移值; // 右移 segmentShift 位,得到 key 哈希值高 sshift 位 this.segmentShift = 32 - sshift; // 索引 segment时,使用的掩码 this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 每个 segment 里,HashEntry 的数量; // 等于,大于 initialCapacity / ssize 的,2 的幂次 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 创建第一个 segment;其他均为懒加载,并以 segments[0] 为模板 Segments0 = new Segment (loadFactor, (int)(cap * loadFactor), (HashEntry [])new HashEntry[cap]); // 初始化 segments Segment [] ss = (Segment [])new Segment[ssize]; // 保证可见性 UNSAFE.putOrderedObject(ss, Sbase, s0); // ordered write of segments[0] this.segments = ss; } 3.Segment 内重要属性
// scanAndLockForPut中自旋循环获取锁的最大自旋次数 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; // 键值对,存储结构 transient volatile HashEntry[] table; // 实际键值对个数;无 volatile 修饰,不保证可见性 transient int count; // segment元素修改次数记录 transient int modCount; // 扩容阈值 transient int threshold; // 负载因子 final float loadFactor; 4.HashEntry 内重要属性
final int hash; final K key; // 确保可见性 volatile V value; // 确保可见性 volatile HashEntrynext;
put()// 没有加锁 *** 作,弱一致性换取性能 public V get(Object key) { Segments; // manually integrate access methods to reduce overhead HashEntry [] tab; // 获取 key 对应的 hash 值 int h = hash(key); // 通过 hash 值高位计算得到 Segment 的偏移量 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase; // 使用 volatile 读,获取对应的 Segment if ((s = (Segment )UNSAFE.getObjectVolatile(segments, u)) != null && // HashEntry[] 不为空 (tab = s.table) != null) { // 1. 通过 hash 值低位计算得到 HashEntry 的偏移量; // 2. 使用 volatile 读,获取 HashEntry 的链表头结点;进行遍历 // 3. happens-before 原则:①volatile 写在读之前 ②解锁在加锁之前 // 4. 即使第一时间获取到的是新值,但是并发环境下,可能刚获取完,就被更新了 // (先读后写了);弱一致性,体现在这 for (HashEntry e = (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; } 没有加锁 *** 作,弱一致性换取性能
rehash()1. Segment -> put()
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 尝试获取锁,获取成功则 node = null; // 获取失败,则进入 scanAndLockForPut 方法,自旋获取锁 HashEntrynode = tryLock() ? null : scanAndLockForPut(key, hash, value); // 因为加了锁,同一时刻都只有一个线程执行下面的 *** 作; // 并且解锁前,数据都会同步到主存,所以无需使用 volatile 修饰变量; // 减小 volatile 对性能的影响 V oldValue; try { HashEntry [] tab = table; int index = (tab.length - 1) & hash; // --- 标记 --- HashEntry first = entryAt(tab, index); for (HashEntry e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { // 表示其他线程在自旋过程中,已经创建了新结点; // 得到锁时,不需要再创建一遍,减少了锁的占用时间 if (node != null) // 将 node 使用头插法 node.setNext(first); else node = new HashEntry (hash, key, value, first); // 先判断是否扩容,再插入新结点; // HashMap 是先插入,再判断扩容,可能导致扩容后再无新结点插入; // 造成无效扩容 int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 已经加锁,确保扩容安全 rehash(node); else // --- 标记 --- setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解锁 unlock(); } return oldValue; } 2. Segment -> scanAndLockForPut()
private HashEntryscanAndLockForPut(K key, int hash, V value) { HashEntry first = entryForHash(this, hash); HashEntry e = first; HashEntry node = null; int retries = -1; // negative while locating node // 获取锁失败,则遍历链表,将遍历过的 HashEntry 放入 CPU 高速缓存中; // 获取锁之后,再次定位速度会十分快,等待过程中的一种预热方式 while (!tryLock()) { HashEntry f; // to recheck first below // 首次进入,retries = -1 恒成立 if (retries < 0) { // e = null 的原因:①HashEntry[] 对应位置为 null②遍历表到最后,没有指定 key if (e == null) { // 再次进入循环,避免重复符值 if (node == null) // speculatively create node node = new HashEntry (hash, key, value, null); retries = 0; } // 找到指定 key else if (key.equals(e.key)) retries = 0; else e = e.next; } // 自旋次数超过最大次数,则直接进入阻塞; // 无限制的自旋,性能损耗比阻塞更大 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } // 遍历过程中,发现链表已经被改变,则重新进行遍历 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
size()private void rehash(HashEntrynode) { HashEntry [] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry [] newTable = (HashEntry []) new HashEntry[newCapacity]; // 新掩码,比原来的多一位 1;00001111 -> 00011111 int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; // 生成新下标 int idx = e.hash & sizeMask; // 没有后继节点,只有单个节点 if (next == null) // Single node on list // 新数组可以直接引用 newTable[idx] = e; else { // 非单节点 // Reuse consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; // 第一个后续所有节点在新数组中下标保持不变的节点 lastRun lastRun = last; } } //这个for循环就是找到第一个后续节点新的index不变的节点。 newTable[lastIdx] = lastRun; // Clone remaining nodes //lastRun 之前的结点均需复制一遍 for (HashEntry p = e; p != lastRun; p = p.next) { int h = p.hash; // 生成新下标 int k = h & sizeMask; // 头插法 HashEntry n = newTable[k]; newTable[k] = new HashEntry (h, p.key, p.value, n); } } } } // 为新加入的节点,在新数组生成下标 int nodeIndex = node.hash & sizeMask; // add the new node // 头插法 node.setNext(newTable[nodeIndex]); // 新数组下标指向新节点 newTable[nodeIndex] = node; // 赋值给原数组 table = newTable; } 1.put 时,会判断 Segment 里的 HashEntry 数量是否超过阈值(threshold);超过则进行扩容。
2.rehash() 是没有加锁的,因为外层已经加了锁,确保是单线程 *** 作。
3.过程:新建容量为原来两倍的新数组;每个桶,找到第一个后续所有节点在新数组中下标保持不变的节点 lastRun,新数组可直接引用 lastRun,之前的结点全部需要复制。
1.8 特点public int size() { final Segment[] segments = this.segments; // HashEntry 数量 int size; // 是否溢出 boolean overflow; // 本次的 modcount 值 long sum; // 上次的 modcount 值 long last = 0L; int retries = -1; // first iteration isn't retry try { for (;;) { // 当 retries = RETRIES_BEFORE_LOCK 时,全部 Segment 加锁 // 会导致本来延迟加载的 Segment 全部创建 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) // ensureSegment 确保该 Segment 已经创建; // segments[0] 为模板,懒加载其他结点 ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { // 本轮 modcount sum += seg.modCount; // 本 Segment 元素数量 int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } // modcount 没变,则计算结果准确 if (sum == last) break; last = sum; } } finally { // 只有当 retries = RETRIES_BEFORE_LOCK 时,才加锁;防止未加锁,进行解锁 if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
1.使用 synchronized + CAS 保证并发安全。
2.发生哈希冲突,链表 + 红黑树。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)