2021SC@SDUSC
1.FileSystem.open()使用Java IO读取本地文件类似,读取HDFS文件是创建一个文件输入流,在Hadoop中使用FileSystem.open()方法来创建输入流。
public static void readFile(String filePath) throws IOException{ FileSystem fs = getFileSystem(filePath); InputStream in=null; try{ in=fs.open(new Path(filePath)); IOUtils.copyBytes(in, System.out,4096,false); }catch(Exception e){ System.out.println(e.getMessage()); }finally{ IOUtils.closeStream(in); } }
创建FileSystem:
public static void main(String[] args) throws Exception{ String local="D:\word2.txt"; String dest="hdfs://192.168.80.131:9000/user/root/input/word2.txt"; Configuration cfg=new Configuration(); FileSystem fs= FileSystem.get(URI.create(dest),cfg,"root"); fs.copyFromLocalFile(new Path(local), new Path(dest)); fs.close(); }进入open方法:
该方法返回的是一个FSDataInputStream对象。
public FSDataInputStream open(Path f) throws IOException { return open(f, getConf().getInt("io.file.buffer.size", 4096)); }进入 open(Path f, int bufferSize)方法:
抽象方法
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;进入dfs.open(String src, int buffersize, boolean verifyChecksum)方法:
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException, UnresolvedlinkException { checkOpen(); // Get block info from namenode TraceScope scope = getPathTraceScope("newDFSInputStream", src); try { return new DFSInputStream(this, src, verifyChecksum); } finally { scope.close(); } }进入该DFSInputStream构造方法:
该方法调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取要打开的文件的数据块信息。
public class DFSClient implements java.io.Closeable, RemotePeerFactory, DataEncryptionKeyFactory { ... DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum ) throws IOException, UnresolvedlinkException { this.dfsClient = dfsClient; this.verifyChecksum = verifyChecksum; this.src = src; synchronized (infoLock) { this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); } openInfo(); }进入openInfo()方法:
该方法中如果读取数据块信息失败,则会再次读取3次,主要调用了方法fetchLocatedBlocksAndGetLastBlockLength()方法来读取数据块的信息。
void openInfo() throws IOException, UnresolvedlinkException { synchronized(infoLock) { lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength; while (retriesForLastBlockLength > 0) { // Getting last block length as -1 is a special case. When cluster // restarts, DNs may not report immediately. At this time partial block // locations will not be available with NN for getting the length. Lets // retry for 3 times to get the length. if (lastBlockBeingWrittenLength == -1) { DFSClient.LOG.warn("Last block locations not available. " + "Datanodes might not have reported blocks completely." + " Will retry for " + retriesForLastBlockLength + " times"); waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); } else { break; } retriesForLastBlockLength--; } if (retriesForLastBlockLength == 0) { throw new IOException("Could not obtain the last block locations."); } } }进入getLocatedBlocks(String src, long start)方法:
public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException { return getLocatedBlocks(src, start, dfsClientConf.prefetchSize); }
进入callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法:
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException { try { //调用namenode对象,进行远程调用 return namenode.getBlockLocations(src, start, length); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)