Hbase源码分析(六)Region定位(中)2021SC@SDUSC

Hbase源码分析(六)Region定位(中)2021SC@SDUSC,第1张

Hbase源码分析(六)Region定位(中)2021SC@SDUSC

文章目录
  • 前言
  • 介绍
  • 总结


前言

此篇文章将继续叙述Region的定位过程


介绍

在分析locateRegion()方法前,我们先折回去看看使用缓存情况的处理,它也是调用的locateRegion()方法,只不过传入的使用缓存标志位useCache为true这一个区别而已,好了,殊途同归,这里我们就只研究locateRegion()方法就行了,代码如下:

@Override
  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
      boolean retry) throws IOException {
    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
  }

  @Override
  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
      boolean retry, int replicaId) throws IOException {
    checkClosed();
    if (tableName == null || tableName.getName().length == 0) {
      throw new IllegalArgumentException("table name cannot be null or zero length");
    }
    if (tableName.equals(TableName.meta_TABLE_NAME)) {
      return locatemeta(tableName, useCache, replicaId);
    } else {
      // Region not in the cache - have to go to the meta RS
      return locateRegionInmeta(tableName, row, useCache, retry, replicaId);
    }
  }

locateRegion()方法上来先做一些必要的检查:
1、判断连接是否已关闭的标志位closed,为true则直接抛出IOException异常;

    2、判断表名tableName,表名为空的话直接抛出IllegalArgumentException异常。

    然后,根据表是否为meta表,做以下处理:

    1、如果是meta表,直接调用locatemeta()方法进行定位;

    2、如果不是meta表,cache中没有,需要访问meta RS,调用locateRegionInmeta()方法进行定位;

    我们今天先看非meta表,进入locateRegionInmeta()方法,代码如下:

private RegionLocations locateRegionInmeta(TableName tableName, byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
// If we are supposed to be using the cache, look in the cache to see if we already have the
// region.
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}
// build the key of the meta region we should be looking for.
// the extra 9’s on the end are necessary to allow “exact” matches
// without knowing the precise region names.
byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
byte[] metaStopKey =
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, “”, false);
Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
.setReadType(ReadType.PREAD);

   switch (this.metaReplicaMode) {
      case LOAD_BALANCE:
        int metaReplicaId = this.metaReplicaSelector.select(tableName, row,
          RegionLocateType.CURRENT);
        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
          // If the selector gives a non-primary meta replica region, then go with it.
          // Otherwise, just go to primary in non-hedgedRead mode.
          s.setConsistency(Consistency.TIMELINE);
          s.setReplicaId(metaReplicaId);
        }
        break;
      case HEDGED_READ:
        s.setConsistency(Consistency.TIMELINE);
        break;
      default:
        // do nothing
    }
    int maxAttempts = (retry ? numTries : 1);
    boolean relocatemeta = false;
    for (int tries = 0; ; tries++) {
      if (tries >= maxAttempts) {
        throw new NoServerForRegionException("Unable to find region for "
            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
      }
      if (useCache) {
        RegionLocations locations = getCachedLocation(tableName, row);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;
        }
      } else {
        // If we are not supposed to be using the cache, delete any existing cached location
        // so it won't interfere.
        // We are only supposed to clean the cache for the specific replicaId
        metaCache.clearCache(tableName, row, replicaId);
      }
      // Query the meta region
      long pausebase = this.pause;
      takeUserRegionLock();
      try {
        // We don't need to check if useCache is enabled or not. Even if useCache is false
        // we already cleared the cache for this row before acquiring userRegion lock so if this
        // row is present in cache that means some other thread has populated it while we were
        // waiting to acquire user region lock.
        RegionLocations locations = getCachedLocation(tableName, row);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;
        }
        if (relocatemeta) {
          relocateRegion(TableName.meta_TABLE_NAME, HConstants.EMPTY_START_ROW,
            RegionInfo.DEFAULT_REPLICA_ID);
        }
        s.resetMvccReadPoint();
        try (ReversedClientScanner rcs =
          new ReversedClientScanner(conf, s, TableName.meta_TABLE_NAME, this, rpcCallerFactory,
            rpcControllerFactory, getmetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
          boolean tableNotFound = true;
          for (;;) {
            Result regionInfoRow = rcs.next();
            if (regionInfoRow == null) {
              if (tableNotFound) {
                throw new TableNotFoundException(tableName);
              } else {
                throw new IOException(
                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
              }
            }
            tableNotFound = false;
            // convert the row result into the HRegionLocation we need!
            locations = metaTableAccessor.getRegionLocations(regionInfoRow);
            if (locations == null || locations.getRegionLocation(replicaId) == null) {
              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
            }
            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
            if (regionInfo == null) {
              throw new IOException("RegionInfo null or empty in " + TableName.meta_TABLE_NAME +
                ", row=" + regionInfoRow);
            }
            // See Hbase-20182. It is possible that we locate to a split parent even after the
            // children are online, so here we need to skip this region and go to the next one.
            if (regionInfo.isSplitParent()) {
              continue;
            }
            if (regionInfo.isOffline()) {
              throw new RegionOfflineException("Region offline; disable table call? " +
                  regionInfo.getRegionNameAsString());
            }
            // It is possible that the split children have not been online yet and we have skipped
            // the parent in the above condition, so we may have already reached a region which does
            // not contains us.
            if (!regionInfo.containsRow(row)) {
              throw new IOException(
                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
            }
            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
            if (serverName == null) {
              throw new NoServerForRegionException("No server address listed in " +
                TableName.meta_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
                " containing row " + Bytes.toStringBinary(row));
            }
            if (isDeadServer(serverName)) {
              throw new RegionServerStoppedException(
                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
                  " is managed by the server " + serverName + ", but it is dead.");
            }
            // Instantiate the location
            cacheLocation(tableName, locations);
            return locations;
          }
        }
      } catch (TableNotFoundException e) {
        // if we got this error, probably means the table just plain doesn't
        // exist. rethrow the error immediately. this should always be coming
        // from the HTable constructor.
        throw e;
      } catch (LocalConnectionClosedException cce) {
        // LocalConnectionClosedException is specialized instance of DoNotRetryIOE.
        // Thrown when we check if this connection is closed. If it is, don't retry.
        throw cce;
      } catch (IOException e) {
        ExceptionUtil.rethrowIfInterrupt(e);
        if (e instanceof RemoteException) {
          e = ((RemoteException)e).unwrapRemoteException();
        }
        if (e instanceof CallQueueTooBigException) {
          // Give a special check on CallQueueTooBigException, see #Hbase-17114
          pausebase = this.pauseForCQTBE;
        }
        if (tries < maxAttempts - 1) {
          LOG.debug("locateRegionInmeta parentTable='{}', attempt={} of {} failed; retrying " +
            "after sleep of {}", TableName.meta_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
        } else {
          throw e;
        }
        // only relocate the parent region if necessary
        relocatemeta =
          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
      } finally {
        userRegionLock.unlock();
      }
      try{
        Thread.sleep(ConnectionUtils.getPauseTime(pausebase, tries));
      } catch (InterruptedException e) {
        throw new InterruptedIOException("Giving up trying to location region in " +
          "meta: thread is interrupted.");
      }
    }
  }

locateRegionInmeta()方法是对非meta表中特定行row所在Region位置信息的检索,它本质上是通过检索Hbase中meta表数据来获取对应非meta表中行row对应的Region位置信息的,其处理逻辑如下:
1、根据标志位useCache确定:如果我们支持在缓存中查找,先在缓存中看看是否我们已经有该Region,调用的是getCachedLocation()方法,传入tableName和row即可,存在即返回,否则继续;

2、缓存中没有,构造一个scan,先根据表名tableName、行row、字符串"99999999999999",调用HRegionInfo的createRegionName()方法,创建一个Region Name:metaKey;

3、构造一个Scan,scan的起始行为上述metaKey,并且是一个反向小scan,即reversed small Scan;

4、确定重试上限次数localNumRetries:如果标志位retry为true的话,重试上限次数localNumRetries取numTries,即取参数hbase.client.retries.number,参数未配置的话默认为31;

5、在一个循环内,当重试次数tries未达到上限localNumRetries且未定位到对应Region位置信息时:

5.1、先判断重试次数tries是否达到上限localNumRetries,达到的话,直接抛出NoServerForRegionException异常;
.2、根据是否支持从缓存中取来判断:

5.2.1、如果支持使用缓存的话,每次再从缓存中取一遍,存在即返回,否则继续;

5.2.2、如果我们不支持使用缓存,删除任何存在的相关缓存,以确保它不会干扰我们的查询,调用metaCache的clearCache()方法,根据tableName和row来删除;

5.3、构造ClientSmallReversedScanner实例rcs,从meta表中查找,而meta表的表名固定为hbase:meta,它的namespace为"meta",qualifier为"meta",获取scanner,注意,这一步实际上是一个内嵌的scan,它也需要根据表和行进行Region的定位,而这个表就是Hbase中的meta表,既然从meta表中查找数据,那么就又折回到上面针对meta表和非meta标的的if…else…判断了,关于meta表的定位我们稍等再讲;

5.4、通过scanner的next()方法,获取唯一的结果regionInfoRow;

5.5、关闭ClientSmallReversedScanner;

5.6、如果regionInfoRow为空,直接抛出TableNotFoundException异常;

5.7、将Result转换为我们需要的RegionLocations,即regionInfoRow->locations;

5.8、从locations中获取Region信息HRegionInfo;

5.9、做一些必要的数据和状态校验,比如:

5.9.1、验证表名是否一致;

5.9.2、验证Rgion是否已分裂;

5.9.3、验证Rgion是否已下线;

5.9.4、从locations中根据replicaId获取ServerName,验证ServerName是否已死亡;

5.10、调用cacheLocation()方法缓存获得的位置信息locations,并返回;

5.11、如果中间出现异常,则当前线程休眠一段时间,再次重试,休眠的时间与pause和tries有关,越往后,停顿时间一般越长(波动时间除外)。

总结

以上就是今天要讲的内容,本文介绍了将需要读写的行Row准确的定位到其所在Region和RegionServer上,Hbase是如何实现数据的检索的。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5154117.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存