2021SC@SDUSC Hbase(九)项目代码分析- LruBlockCache

2021SC@SDUSC Hbase(九)项目代码分析- LruBlockCache,第1张

2021SC@SDUSC Hbase(九)项目代码分析- LruBlockCache

2021SC@SDUSC

目录

一、简介

二、缓存级别

三、LruBlockCache实现分析

四、淘汰缓存实现分析


一、简介

        如果每次读数据时都访问hfile的话,效率是非常低的,尤其是随机小数据量读时。为了提高IO的性能,Hbase提供了缓存机制BlockCache,LruBlockCache是它的方案之一。

二、缓存级别

        有三个缓存级别,在blockPriority中定义:

public enum BlockPriority {
  
  SINGLE,
  
  MULTI,
  
  MEMORY
}
  1. SINGLE:用于scan等,避免大量的一次访问导致缓存替换
  2. MULTI:多次缓存
  3. MEMORY:常驻缓存的
三、LruBlockCache实现分析

        cacheBlock()为该缓存的实现:

  // BlockCache implementation
 
  
  @Override
  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
      final boolean cacheDataInL1) {
    if (buf.heapSize() > maxBlockSize) {
      // If there are a lot of blocks that are too
      // big this can make the logs way too noisy.
      // So we log 2%
      if (stats.failInsert() % 50 == 0) {
        LOG.warn("Trying to cache too large a block "
            + cacheKey.getHfileName() + " @ "
            + cacheKey.getOffset()
            + " is " + buf.heapSize()
            + " which is larger than " + maxBlockSize);
      }
      return;
    }

    LruCachedBlock cb = map.get(cacheKey);
    if (cb != null) {
      // compare the contents, if they are not equal, we are in big trouble
      if (compare(buf, cb.getBuffer()) != 0) {
        throw new RuntimeException("Cached block contents differ, which should not have happened."
          + "cacheKey:" + cacheKey);
      }
      String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
      msg += ". This is harmless and can happen in rare cases (see Hbase-8547)";
      LOG.warn(msg);
      return;
    }
    
    long currentSize = size.get();
   
    long currentAcceptableSize = acceptableSize();
    
    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
    
    if (currentSize >= hardLimitSize) {
      stats.failInsert();
      if (LOG.isTraceEnabled()) {
        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "  too many."
          + " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"
          + cacheKey + " into LruBlockCache.");
      }
      if (!evictionInProgress) {
        runEviction();
      }
      return;
    }
    
    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
    long newSize = updateSizeMetrics(cb, false);
    
    map.put(cacheKey, cb);
    
    long val = elements.incrementAndGet();
    if (LOG.isTraceEnabled()) {
      long size = map.size();
      assertCounterSanity(size, val);
    }

    if (newSize > currentAcceptableSize && !evictionInProgress) {
      runEviction();
    }
  }

        方法逻辑如下:

  1. 判断需要缓存的数据的大小是否比最大块大,如果大于则按照2%的频率记录log然后返回
  2. 尝试用缓存map中根据cacheKey获取以缓存数据块
  3. 如果获取成功:内容不一致,抛出异常,并记录和返回日志
  4. 获取当前缓存大小和可以接受的缓存大小,计算硬性限制大小hardLimitSize
  5. 如果当前的缓存大小超过了硬性限制的大小,当回收没执行时,执行回收并return,否则直接return
  6. 构造LruBlockCache实例cb
  7. 将cb放入map缓存中
  8. 元素个数原子性增一
  9. 如果新的缓存大小超过了可接受大小,且未执行回收过程时,执行内存回收
四、淘汰缓存实现分析

        淘汰缓存的实现方式有两种:

  1. 在主线程中执行缓存淘汰
  2. 在一个专门的淘汰线程中通过持有对外部类LruBlockCache的弱引用WeakReference来执行缓存淘汰

        用那种方式是通过构造函数中的evictionThread决定的:

    if(evictionThread) {
      this.evictionThread = new EvictionThread(this);
      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
    } else {
      this.evictionThread = null;
    }

        在执行淘汰缓存的runEviction()方法中:

  
  private void runEviction() {
    if(evictionThread == null) {
      evict();
    } else {
      evictionThread.evict();
    }
  }

        其中evict()的实现如下:

    public void evict() {
      synchronized(this) {
        this.notifyAll();
      }
    }

        即通过synchronized获取此线程的对象锁,然后主线程通过回收线程对象的notifyAll来唤醒此线程。

        未完……

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5605284.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存