2021SC@SDUSC HBase项目分析:compact流程分析(四)

2021SC@SDUSC HBase项目分析:compact流程分析(四),第1张

2021SC@SDUSC HBase项目分析:compact流程分析(四)

2021SC@SDUSC

目录

compact源码分析 


2021SC@SDUSC

接上一篇接续分析

compact源码分析 

在前几篇中,分析了HRegion.compact()方法和HRegion.compact()中调用的HStore.compact()方法,现在再来分析一下HStore.compact()中调用的CompactionContext.compact()方法。

首先来看一下CompactionContext类。CompactionContext是一个抽象类,表示compact的上下文,有两个类继承了CompactionContext,分别是DefaultCompactionContext类和StripeCompaction类,我们主要分析DefaultCompactionContext类。

查看DefaultCompactionContext.compact()方法:在该方法中调用了compactor.compact()方法,其中的compactor默认实现是DefaultCompactor,可以根据参数hbase.hstore.defaultengine.compactor.class配置

    public List compact(CompactionThroughputController throughputController,
      User user) throws IOException {
      return compactor.compact(request, throughputController, user);
    }

查看DefaultCompactor.compact()方法:

  1. 调用getFileDetails()方法获取文件详情FileDetails,FileDetails包含a)合并之后总的keyvalue数目:maxKeyCount;b)如果是major合并,最早的Put时间戳earliestPutTs;c)合并时文件中最大的序列号maxSeqId;d)相关文件中最新的MemStore数据读取点maxMVCCReadpoint;e)最大的tag长度maxTagsLength;d)在major合并期间需要保持的最小序列号minSeqIdToKeep;
  2. 初始化合并过程追踪器CompactionProgress;
  3. 调用getSmallestReadPoint()方法找到所有scanners中的最小的可读点;
  4. 根据参数hbase.regionserver.compaction.private.readers判断是否使用私有readers,默认为false。如果为true,则在storeFiles、HFileFiles和readers的独立副本上做compact;如果为false,在直接在待合并文件本身上做compact;
  5. 确定scan类型scanType:a)如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES;b)如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES;
  6. 调用preCreateCoprocScanner()方法,获得scanner,如果协处理器未创建scanner,则调用createScanner()完成创建;
  7. 调用HStore的createWriterInTmp()方法,获取writer;
  8. 调用performCompaction()方法,执行合并;
  9. 如果compact没有完成,则关闭writer、删除writer中的临时文件并抛出异常;
  10. 关闭scanner;
  11. 如果没有异常,则关闭writer;如果有异常,写入元数据,关闭writer,并将写入地址加入newFiles;
  12. 关闭readersToClose中StoreFile的Reader;
  13. 返回newFiles。
  public List compact(final CompactionRequest request,
      CompactionThroughputController throughputController, User user) throws IOException {
    //在CompactionRequest获取文件详情
    FileDetails fd = getFileDetails(request.getFiles(), request.isMajor());
    //初始化合并过程追踪器CompactionProgress
    this.progress = new CompactionProgress(fd.maxKeyCount);

    //找到scanners中的最小的可读点
    long smallestReadPoint = getSmallestReadPoint();

    List scanners;
    Collection readersToClose;

    //根据参数hbase.regionserver.compaction.private.readers判断是否使用私有readers
    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
      //克隆所有的StoreFiles,以便在StoreFiles、HFileFiles和readers的独立副本上做compact
      //根据CompactionRequest中待合并文件的数目创建一个StoreFile列表:readersToClose
      readersToClose = new ArrayList(request.getFiles().size());
      //将待合并文件加入readersToClose列表,此处加入的是待合并文件的副本,并非待合并文件本身
      for (StoreFile f : request.getFiles()) {
        readersToClose.add(new StoreFile(f));
      }
      //根据readersToClose创建文件浏览器FileScanners
      scanners = createFileScanners(readersToClose, smallestReadPoint,
          store.throttleCompaction(request.getSize()));
    } else {
      //创建空列表
      readersToClose = Collections.emptyList();
      //根据CompactionRequest中的待合并文件列表创建文件浏览器FileScanners
      //此处要注意的是:上面FileScanners是通过待合并文件的副本创建的,而此处的FileScanners
      //是根据待合并文件本身创建的
      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
          store.throttleCompaction(request.getSize()));
    }
    StoreFile.Writer writer = null;
    List newFiles = new ArrayList();
    IOException e = null;
    try {
      InternalScanner scanner = null;
      try {
        //确定scan类型:
        //1.如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES;
        //2.如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES
        ScanType scanType =
            request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
        //调用preCreateCoprocScanner()方法,获取scanner
        scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
        if (scanner == null) {
          //如果协处理器未创建scanner,调用createScanner()完成创建
          scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
        }
        scanner = postCreateCoprocScanner(request, scanType, scanner, user);
        if (scanner == null) {
          return newFiles;
        }
        //调用HStore的createWriterInTmp()方法,获取writer
        writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
          fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
        //调用performCompaction()方法,执行合并
        boolean finished =
            performCompaction(scanner, writer, smallestReadPoint, throughputController);
        if (!finished) {
          writer.close();
          store.getFileSystem().delete(writer.getPath(), false);
          writer = null;
          throw new InterruptedIOException("Aborting compaction of store " + store +
              " in region " + store.getRegionInfo().getRegionNameAsString() +
              " because it was interrupted.");
         }
       } finally {
         if (scanner != null) {
           scanner.close();
         }
      }
    } catch (IOException ioe) {
      e = ioe;
      throw ioe;
    } finally {
      try {
        if (writer != null) {
          if (e != null) {
            writer.close();
          } else {
            writer.appendmetadata(fd.maxSeqId, request.isMajor());
            writer.close();
            newFiles.add(writer.getPath());
          }
        }
      } finally {
        for (StoreFile f : readersToClose) {
          try {
            f.closeReader(true);
          } catch (IOException ioe) {
            LOG.warn("Exception closing " + f, ioe);
          }
        }
      }
    }
    return newFiles;
  }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678119.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存