2021SC@SDUSC
本文来研究如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的。
上文中我们说到了flush处理线程时可能会调用flushoneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,来降低MemStore的大小,从而预防一些异常情况的发生。这次我们重点分析一下flushoneForGlobalPressure()方法:
private boolean flushoneForGlobalPressure() { SortedMapregionsBySize = server.getCopyOfonlineRegionsSortedBySize(); Set excludedRegions = new HashSet (); boolean flushedOne = false; while (!flushedOne) { // Find the biggest region that doesn't have too many storefiles // (might be null!) HRegion bestFlushableRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, true); // Find the biggest region, total, even if it might have too many flushes. HRegion bestAnyRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, false); if (bestAnyRegion == null) { LOG.error("Above memory mark but there are no flushable regions!"); return false; } HRegion regionToFlush; if (bestFlushableRegion != null && bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { // Even if it's not supposed to be flushed, pick a region if it's more than twice // as big as the best flushable one - otherwise when we're under pressure we make // lots of little flushes and cause lots of compactions, etc, which just makes // life worse! if (LOG.isDebugEnabled()) { LOG.debug("Under global heap pressure: " + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + " vs best flushable region's " + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; } else { if (bestFlushableRegion == null) { regionToFlush = bestAnyRegion; } else { regionToFlush = bestFlushableRegion; } } Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); flushedOne = flushRegion(regionToFlush, true); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); excludedRegions.add(regionToFlush); } } return true; }
此方法的处理过程大概如下:
- 获取RegionServer上的在线Region,然后根据Region的memstoreSize倒序排列,得到regionsBySize。
- 构造被排除的Region集合excludedRegions。
- flushedOne设置为false。
- 循环regionBySize,选择一个Menstore最大且不含太多storefiles的region作为bestFlushableRegion:有以下几种情况时直接跳过:当前region在excludedRegions列表中;当前region的写状态为正在flush;当前region的写状态不是写启用;需要检查StoreFile数目,且包含太多StoreFiles。其余情况返回该region。
- 循环regionsBySize,选择一个Memstore最大的region,即便是它包含太多storefiles,作为bestAnyRegion:有以下情况时直接跳过:当前region在excludedRegions列表中;当前region的写状态为正在flush,或者当前region的写状态不是写启用。其余情况返回该region。
- 在内存上阈值之上但是没有能够flush的region的话,直接返回false。
- 选择需要flush的region。
- 检测被选中region的memstoreSize是否大于零。
- 调用flushRegion(),针对单个region进行memstore的flush。
- flush失败则添加到excludedRegions集合中,避免在被选中。
以上是flushoneForGlobalPressure()方法,即按照一定策略选择一个HRegion进行memstore的flush以缓解memstore压力的方法。接下来是HRegion的flush如何发起的问题,首先看一下带一个参数的flushRegion()方法:
private boolean flushRegion(final FlushRegionEntry fqe) { HRegion region = fqe.region; if (!region.getRegionInfo().ismetaRegion() && isTooManyStoreFiles(region)) { if (fqe.isMaximumWait(this.blockingWaitTime)) { LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + "ms on a compaction to clean up 'too many store files'; waited " + "long enough... proceeding with flush of " + region.getRegionNameAsString()); } else { // If this is first time we've been put off, then emit a log message. if (fqe.getRequeueCount() <= 0) { // Note: We don't impose blockingStoreFiles constraint on meta regions LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { this.server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } catch (IOException e) { LOG.error( "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), RemoteExceptionHandler.checkIOException(e)); } } } // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); // Tell a lie, it's not flushed but it's ok return true; } } return flushRegion(region, false); }
方法流程如下:
- 如果region不是mataRegion且region上有太多storeFiles:
- isMaximumWait()判断阻塞时间,已阻塞达到或超过指定时间,记录日志并执行flush,跳到2,结束。
- 如果是第一次推迟,记录一条日志信息,然后对该HRegion请求分裂Split,分裂不成功的话再请求系统合并SystemCompaction。
- 将fqe放回到队列flushQueue,增加延迟时间900ms,到期后再从队列中取出来进行处理。
- 如果该Region被推迟进行flush,结果还不确定,应返回true。
- 调用2个参数的flushRegion()方法,通知HRegion执行flush。
接下来是带有两个参数的flushRegion()方法:
private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { long startTime = 0; synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); // Use the start time of the FlushRegionEntry if available if (fqe != null) { startTime = fqe.createTime; } if (fqe != null && emergencyFlush) { // Need to remove from region from delay queue. When NOT an // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } } if (startTime == 0) { // Avoid getting the system time unless we don't have a FlushRegionEntry; // shame we can't capture the time also spent in the above synchronized // block startTime = EnvironmentEdgeManager.currentTime(); } lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); HRegion.FlushResult flushResult = region.flushcache(); boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); server.metricsRegionServer.updateFlushTime(endTime - startTime); } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical // section, we get a DroppedSnapshotException and a replay of wal // is required. Currently the only way to do this is a restart of // the server. Abort because hdfs is probably bad (Hbase-644 is a case // where hdfs was bad but passed the hdfs check). server.abort("Replay of WAL required. Forcing server shutdown", ex); return false; } catch (IOException ex) { LOG.error("Cache flush failed" + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { return false; } } finally { lock.readLock().unlock(); wakeUpIfBlocking(); } return true; }
方法流程如下:
- 从regionsInQueue中移除对应的HRegion信息
- 获取flush的开始时间
- 如果是紧急刷新,需要从flushQueue队列中移除对应的fqe否则,fqe将通过flushQueue.poll()移除
- 如果开始时间为null,获取flush的开始时间
- 上读锁
- 通过监听器Listener通知flush请求者flush的type
- 调用HRegion的flushcache()方法执行MemStore的flush,获得flush结果
- 根据flush结果判断是否应该进行合并compact(标志位shouldCompact)
- 调用HRegion的checkSplit()方法检测是否应该进行分裂split(标志位shouldSplit)
- 通过两个标志位判断,必要的情况下,先进行split,再进行system compact
- 若flush成功,获取flush结束时间,计算耗时,记录HRegion上的度量信息
- 释放读锁,唤醒阻塞的其他线程。
以上。
如有错误,欢迎指正。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)