2021SC@SDUSC Hbase(六)项目代码分析- flush

2021SC@SDUSC Hbase(六)项目代码分析- flush,第1张

2021SC@SDUSC Hbase(六)项目代码分析- flush

2021SC@SDUSC

        本文来研究如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的。

        上文中我们说到了flush处理线程时可能会调用flushoneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,来降低MemStore的大小,从而预防一些异常情况的发生。这次我们重点分析一下flushoneForGlobalPressure()方法:

  private boolean flushoneForGlobalPressure() {

    SortedMap regionsBySize =
        server.getCopyOfonlineRegionsSortedBySize();
 
    Set excludedRegions = new HashSet();
 
    boolean flushedOne = false;
    while (!flushedOne) {
      
      // Find the biggest region that doesn't have too many storefiles
      // (might be null!)
      HRegion bestFlushableRegion = getBiggestMemstoreRegion(
          regionsBySize, excludedRegions, true);
      
      // Find the biggest region, total, even if it might have too many flushes.
      HRegion bestAnyRegion = getBiggestMemstoreRegion(
          regionsBySize, excludedRegions, false);
 
      if (bestAnyRegion == null) {
        LOG.error("Above memory mark but there are no flushable regions!");
        return false;
      }
 
      HRegion regionToFlush;
      
      if (bestFlushableRegion != null &&
          bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
        // Even if it's not supposed to be flushed, pick a region if it's more than twice
        // as big as the best flushable one - otherwise when we're under pressure we make
        // lots of little flushes and cause lots of compactions, etc, which just makes
        // life worse!
        if (LOG.isDebugEnabled()) {
          LOG.debug("Under global heap pressure: " +
            "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
            "store files, but is " +
            StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
            " vs best flushable region's " +
            StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
            ". Choosing the bigger.");
        }
        regionToFlush = bestAnyRegion;
      } else {
        if (bestFlushableRegion == null) {
          regionToFlush = bestAnyRegion;
        } else {
          regionToFlush = bestFlushableRegion;
        }
      }
 
      Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
 
      LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
      
      flushedOne = flushRegion(regionToFlush, true);
      if (!flushedOne) {
        LOG.info("Excluding unflushable region " + regionToFlush +
          " - trying to find a different region to flush.");
        excludedRegions.add(regionToFlush);
      }
    }
    return true;
  }

此方法的处理过程大概如下:

  • 获取RegionServer上的在线Region,然后根据Region的memstoreSize倒序排列,得到regionsBySize。
  • 构造被排除的Region集合excludedRegions。
  • flushedOne设置为false。
  • 循环regionBySize,选择一个Menstore最大且不含太多storefiles的region作为bestFlushableRegion:有以下几种情况时直接跳过:当前region在excludedRegions列表中;当前region的写状态为正在flush;当前region的写状态不是写启用;需要检查StoreFile数目,且包含太多StoreFiles。其余情况返回该region。
  • 循环regionsBySize,选择一个Memstore最大的region,即便是它包含太多storefiles,作为bestAnyRegion:有以下情况时直接跳过:当前region在excludedRegions列表中;当前region的写状态为正在flush,或者当前region的写状态不是写启用。其余情况返回该region。
  • 在内存上阈值之上但是没有能够flush的region的话,直接返回false。
  • 选择需要flush的region。
  • 检测被选中region的memstoreSize是否大于零。
  • 调用flushRegion(),针对单个region进行memstore的flush。
  • flush失败则添加到excludedRegions集合中,避免在被选中。

以上是flushoneForGlobalPressure()方法,即按照一定策略选择一个HRegion进行memstore的flush以缓解memstore压力的方法。接下来是HRegion的flush如何发起的问题,首先看一下带一个参数的flushRegion()方法:

  private boolean flushRegion(final FlushRegionEntry fqe) {
    HRegion region = fqe.region;
    if (!region.getRegionInfo().ismetaRegion() &&
        isTooManyStoreFiles(region)) {
      
      if (fqe.isMaximumWait(this.blockingWaitTime)) {
        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
          "ms on a compaction to clean up 'too many store files'; waited " +
          "long enough... proceeding with flush of " +
          region.getRegionNameAsString());
      } else {
        // If this is first time we've been put off, then emit a log message.
        if (fqe.getRequeueCount() <= 0) {
          // Note: We don't impose blockingStoreFiles constraint on meta regions
          LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
            "store files; delaying flush up to " + this.blockingWaitTime + "ms");
          
          if (!this.server.compactSplitThread.requestSplit(region)) {
            try {
              this.server.compactSplitThread.requestSystemCompaction(
                  region, Thread.currentThread().getName());
            } catch (IOException e) {
              LOG.error(
                "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
                RemoteExceptionHandler.checkIOException(e));
            }
          }
        }
 
        // Put back on the queue.  Have it come back out of the queue
        // after a delay of this.blockingWaitTime / 100 ms.
        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
        // Tell a lie, it's not flushed but it's ok
        return true;
      }
    }
    
    return flushRegion(region, false);
  }

        方法流程如下:

  1. 如果region不是mataRegion且region上有太多storeFiles:
    1. isMaximumWait()判断阻塞时间,已阻塞达到或超过指定时间,记录日志并执行flush,跳到2,结束。
    2. 如果是第一次推迟,记录一条日志信息,然后对该HRegion请求分裂Split,分裂不成功的话再请求系统合并SystemCompaction。
    3. 将fqe放回到队列flushQueue,增加延迟时间900ms,到期后再从队列中取出来进行处理。
    4. 如果该Region被推迟进行flush,结果还不确定,应返回true。
  2. 调用2个参数的flushRegion()方法,通知HRegion执行flush。

       接下来是带有两个参数的flushRegion()方法:

  private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
    long startTime = 0;
    synchronized (this.regionsInQueue) {

      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null) {

    	startTime = fqe.createTime;
      }
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
     }
    }

    if (startTime == 0) {
      // Avoid getting the system time unless we don't have a FlushRegionEntry;
      // shame we can't capture the time also spent in the above synchronized
      // block
      startTime = EnvironmentEdgeManager.currentTime();
    }

    lock.readLock().lock();
    try {

      notifyFlushRequest(region, emergencyFlush);

      HRegion.FlushResult flushResult = region.flushcache();

      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      boolean shouldSplit = region.checkSplit() != null;

      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }

      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        server.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (Hbase-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" +
        (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      lock.readLock().unlock();

      wakeUpIfBlocking();
    }
    return true;
  }

        方法流程如下:

  1. 从regionsInQueue中移除对应的HRegion信息
  2. 获取flush的开始时间
  3. 如果是紧急刷新,需要从flushQueue队列中移除对应的fqe否则,fqe将通过flushQueue.poll()移除
  4. 如果开始时间为null,获取flush的开始时间
  5. 上读锁
  6. 通过监听器Listener通知flush请求者flush的type
  7. 调用HRegion的flushcache()方法执行MemStore的flush,获得flush结果
  8. 根据flush结果判断是否应该进行合并compact(标志位shouldCompact)
  9. 调用HRegion的checkSplit()方法检测是否应该进行分裂split(标志位shouldSplit)
  10. 通过两个标志位判断,必要的情况下,先进行split,再进行system compact
  11. 若flush成功,获取flush结束时间,计算耗时,记录HRegion上的度量信息
  12. 释放读锁,唤醒阻塞的其他线程。

        以上。

        如有错误,欢迎指正。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5117994.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存