2021SC@SDUSC
目录
一、简介
二、缓存级别
三、LruBlockCache实现分析
四、淘汰缓存实现分析
一、简介
如果每次读数据时都访问hfile的话,效率是非常低的,尤其是随机小数据量读时。为了提高IO的性能,Hbase提供了缓存机制BlockCache,LruBlockCache是它的方案之一。
二、缓存级别有三个缓存级别,在blockPriority中定义:
public enum BlockPriority { SINGLE, MULTI, MEMORY }
- SINGLE:用于scan等,避免大量的一次访问导致缓存替换
- MULTI:多次缓存
- MEMORY:常驻缓存的
cacheBlock()为该缓存的实现:
// BlockCache implementation @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, final boolean cacheDataInL1) { if (buf.heapSize() > maxBlockSize) { // If there are a lot of blocks that are too // big this can make the logs way too noisy. // So we log 2% if (stats.failInsert() % 50 == 0) { LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ " + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than " + maxBlockSize); } return; } LruCachedBlock cb = map.get(cacheKey); if (cb != null) { // compare the contents, if they are not equal, we are in big trouble if (compare(buf, cb.getBuffer()) != 0) { throw new RuntimeException("Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey); } String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey(); msg += ". This is harmless and can happen in rare cases (see Hbase-8547)"; LOG.warn(msg); return; } long currentSize = size.get(); long currentAcceptableSize = acceptableSize(); long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); if (currentSize >= hardLimitSize) { stats.failInsert(); if (LOG.isTraceEnabled()) { LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + " too many." + " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); } if (!evictionInProgress) { runEviction(); } return; } cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); long val = elements.incrementAndGet(); if (LOG.isTraceEnabled()) { long size = map.size(); assertCounterSanity(size, val); } if (newSize > currentAcceptableSize && !evictionInProgress) { runEviction(); } }
方法逻辑如下:
- 判断需要缓存的数据的大小是否比最大块大,如果大于则按照2%的频率记录log然后返回
- 尝试用缓存map中根据cacheKey获取以缓存数据块
- 如果获取成功:内容不一致,抛出异常,并记录和返回日志
- 获取当前缓存大小和可以接受的缓存大小,计算硬性限制大小hardLimitSize
- 如果当前的缓存大小超过了硬性限制的大小,当回收没执行时,执行回收并return,否则直接return
- 构造LruBlockCache实例cb
- 将cb放入map缓存中
- 元素个数原子性增一
- 如果新的缓存大小超过了可接受大小,且未执行回收过程时,执行内存回收
淘汰缓存的实现方式有两种:
- 在主线程中执行缓存淘汰
- 在一个专门的淘汰线程中通过持有对外部类LruBlockCache的弱引用WeakReference来执行缓存淘汰
用那种方式是通过构造函数中的evictionThread决定的:
if(evictionThread) { this.evictionThread = new EvictionThread(this); this.evictionThread.start(); // FindBugs SC_START_IN_CTOR } else { this.evictionThread = null; }
在执行淘汰缓存的runEviction()方法中:
private void runEviction() { if(evictionThread == null) { evict(); } else { evictionThread.evict(); } }
其中evict()的实现如下:
public void evict() { synchronized(this) { this.notifyAll(); } }
即通过synchronized获取此线程的对象锁,然后主线程通过回收线程对象的notifyAll来唤醒此线程。
未完……
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)