2021SC@SDUSC
目录
compact源码分析
2021SC@SDUSC
接上一篇接续分析
compact源码分析在前几篇中,分析了HRegion.compact()方法和HRegion.compact()中调用的HStore.compact()方法,现在再来分析一下HStore.compact()中调用的CompactionContext.compact()方法。
首先来看一下CompactionContext类。CompactionContext是一个抽象类,表示compact的上下文,有两个类继承了CompactionContext,分别是DefaultCompactionContext类和StripeCompaction类,我们主要分析DefaultCompactionContext类。
查看DefaultCompactionContext.compact()方法:在该方法中调用了compactor.compact()方法,其中的compactor默认实现是DefaultCompactor,可以根据参数hbase.hstore.defaultengine.compactor.class配置
public Listcompact(CompactionThroughputController throughputController, User user) throws IOException { return compactor.compact(request, throughputController, user); }
查看DefaultCompactor.compact()方法:
- 调用getFileDetails()方法获取文件详情FileDetails,FileDetails包含a)合并之后总的keyvalue数目:maxKeyCount;b)如果是major合并,最早的Put时间戳earliestPutTs;c)合并时文件中最大的序列号maxSeqId;d)相关文件中最新的MemStore数据读取点maxMVCCReadpoint;e)最大的tag长度maxTagsLength;d)在major合并期间需要保持的最小序列号minSeqIdToKeep;
- 初始化合并过程追踪器CompactionProgress;
- 调用getSmallestReadPoint()方法找到所有scanners中的最小的可读点;
- 根据参数hbase.regionserver.compaction.private.readers判断是否使用私有readers,默认为false。如果为true,则在storeFiles、HFileFiles和readers的独立副本上做compact;如果为false,在直接在待合并文件本身上做compact;
- 确定scan类型scanType:a)如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES;b)如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES;
- 调用preCreateCoprocScanner()方法,获得scanner,如果协处理器未创建scanner,则调用createScanner()完成创建;
- 调用HStore的createWriterInTmp()方法,获取writer;
- 调用performCompaction()方法,执行合并;
- 如果compact没有完成,则关闭writer、删除writer中的临时文件并抛出异常;
- 关闭scanner;
- 如果没有异常,则关闭writer;如果有异常,写入元数据,关闭writer,并将写入地址加入newFiles;
- 关闭readersToClose中StoreFile的Reader;
- 返回newFiles。
public Listcompact(final CompactionRequest request, CompactionThroughputController throughputController, User user) throws IOException { //在CompactionRequest获取文件详情 FileDetails fd = getFileDetails(request.getFiles(), request.isMajor()); //初始化合并过程追踪器CompactionProgress this.progress = new CompactionProgress(fd.maxKeyCount); //找到scanners中的最小的可读点 long smallestReadPoint = getSmallestReadPoint(); List scanners; Collection readersToClose; //根据参数hbase.regionserver.compaction.private.readers判断是否使用私有readers if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { //克隆所有的StoreFiles,以便在StoreFiles、HFileFiles和readers的独立副本上做compact //根据CompactionRequest中待合并文件的数目创建一个StoreFile列表:readersToClose readersToClose = new ArrayList (request.getFiles().size()); //将待合并文件加入readersToClose列表,此处加入的是待合并文件的副本,并非待合并文件本身 for (StoreFile f : request.getFiles()) { readersToClose.add(new StoreFile(f)); } //根据readersToClose创建文件浏览器FileScanners scanners = createFileScanners(readersToClose, smallestReadPoint, store.throttleCompaction(request.getSize())); } else { //创建空列表 readersToClose = Collections.emptyList(); //根据CompactionRequest中的待合并文件列表创建文件浏览器FileScanners //此处要注意的是:上面FileScanners是通过待合并文件的副本创建的,而此处的FileScanners //是根据待合并文件本身创建的 scanners = createFileScanners(request.getFiles(), smallestReadPoint, store.throttleCompaction(request.getSize())); } StoreFile.Writer writer = null; List newFiles = new ArrayList (); IOException e = null; try { InternalScanner scanner = null; try { //确定scan类型: //1.如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES; //2.如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES ScanType scanType = request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; //调用preCreateCoprocScanner()方法,获取scanner scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user); if (scanner == null) { //如果协处理器未创建scanner,调用createScanner()完成创建 scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); } scanner = postCreateCoprocScanner(request, scanType, scanner, user); if (scanner == null) { return newFiles; } //调用HStore的createWriterInTmp()方法,获取writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize())); //调用performCompaction()方法,执行合并 boolean finished = performCompaction(scanner, writer, smallestReadPoint, throughputController); if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); writer = null; throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); } } finally { if (scanner != null) { scanner.close(); } } } catch (IOException ioe) { e = ioe; throw ioe; } finally { try { if (writer != null) { if (e != null) { writer.close(); } else { writer.appendmetadata(fd.maxSeqId, request.isMajor()); writer.close(); newFiles.add(writer.getPath()); } } } finally { for (StoreFile f : readersToClose) { try { f.closeReader(true); } catch (IOException ioe) { LOG.warn("Exception closing " + f, ioe); } } } } return newFiles; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)