Hadoop源码分析(十二)

Hadoop源码分析(十二),第1张

Hadoop源码分析(十二)

2021SC@SDUSC

1.FileSystem.open()

使用Java IO读取本地文件类似,读取HDFS文件是创建一个文件输入流,在Hadoop中使用FileSystem.open()方法来创建输入流。

  public static void readFile(String filePath) throws IOException{
        FileSystem fs = getFileSystem(filePath);
        InputStream in=null;
        try{
            in=fs.open(new Path(filePath));
            IOUtils.copyBytes(in, System.out,4096,false);
        }catch(Exception e){
            System.out.println(e.getMessage());
        }finally{
            IOUtils.closeStream(in);
        }
    }

创建FileSystem:

public static void main(String[] args) throws Exception{
        String local="D:\word2.txt";
        String dest="hdfs://192.168.80.131:9000/user/root/input/word2.txt";
        Configuration cfg=new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(dest),cfg,"root");
        fs.copyFromLocalFile(new Path(local), new Path(dest));
        fs.close();
    } 

进入open方法:

该方法返回的是一个FSDataInputStream对象。

  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

进入 open(Path f, int bufferSize)方法:

抽象方法

 public abstract FSDataInputStream open(Path f, int bufferSize)
    throws IOException;
进入dfs.open(String src, int buffersize, boolean verifyChecksum)方法:
 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException, UnresolvedlinkException {
    checkOpen();
    //    Get block info from namenode
    TraceScope scope = getPathTraceScope("newDFSInputStream", src);
    try {
      return new DFSInputStream(this, src, verifyChecksum);
    } finally {
      scope.close();
    }
  }

进入该DFSInputStream构造方法:

该方法调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取要打开的文件的数据块信息。

public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    DataEncryptionKeyFactory {
  ...

  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
                 ) throws IOException, UnresolvedlinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.src = src;
    synchronized (infoLock) {
      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
    }
    openInfo();
  }

进入openInfo()方法:

该方法中如果读取数据块信息失败,则会再次读取3次,主要调用了方法fetchLocatedBlocksAndGetLastBlockLength()方法来读取数据块的信息。

void openInfo() throws IOException, UnresolvedlinkException {
    synchronized(infoLock) {
      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
      while (retriesForLastBlockLength > 0) {
        // Getting last block length as -1 is a special case. When cluster
        // restarts, DNs may not report immediately. At this time partial block
        // locations will not be available with NN for getting the length. Lets
        // retry for 3 times to get the length.
        if (lastBlockBeingWrittenLength == -1) {
          DFSClient.LOG.warn("Last block locations not available. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retriesForLastBlockLength + " times");
          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        } else {
          break;
        }
        retriesForLastBlockLength--;
      }
      if (retriesForLastBlockLength == 0) {
        throw new IOException("Could not obtain the last block locations.");
      }
    }
  }

进入getLocatedBlocks(String src, long start)方法:
 public LocatedBlocks getLocatedBlocks(String src, long start)
      throws IOException {
    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
  }

进入callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法:

static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) 
      throws IOException {
    try {
      //调用namenode对象,进行远程调用
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class,
                                     UnresolvedPathException.class);
    }
  }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683245.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存