水一篇 flink 1.14 上传文件的源码流程
为了方便查找,用 ⭐️⭐️⭐️ 标注了。
上传文件核心源码就是 io 流的读写。
1.10 和 1.14 大致相同,只有细微细节不同。
// YarnClusterDescriptor 类里面,有启动 startAppMaster 方法。 private ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception { ... fileUploader.registerSingleLocalResource( jobGraphFilename, new Path(tmpJobGraphFile.toURI()), "", LocalResourceType.FILE, true, false); classPathBuilder.append(jobGraphFilename).append(File.pathSeparator); // ⭐️⭐️⭐️ ... } YarnLocalResourceDescriptor registerSingleLocalResource( final String key, final Path resourcePath, final String relativeDstPath, final LocalResourceType resourceType, final boolean whetherToAddToRemotePaths, final boolean whetherToAddToEnvShipResourceList) throws IOException { addToRemotePaths(whetherToAddToRemotePaths, resourcePath); if (Utils.isRemotePath(resourcePath.toString())) { final FileStatus fileStatus = fileSystem.getFileStatus(resourcePath); LOG.debug("Using remote file {} to register local resource", fileStatus.getPath()); final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor.fromFileStatus( key, fileStatus, LocalResourceVisibility.APPLICATION, resourceType); addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor); localResources.put(key, descriptor.toLocalResource()); return descriptor; } final File localFile = new File(resourcePath.toUri().getPath()); final Tuple2remoteFileInfo = uploadLocalFileToRemote(resourcePath, relativeDstPath); // ⭐️⭐️⭐️ final YarnLocalResourceDescriptor descriptor = new YarnLocalResourceDescriptor( key, remoteFileInfo.f0, localFile.length(), remoteFileInfo.f1, LocalResourceVisibility.APPLICATION, resourceType); addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor); localResources.put(key, descriptor.toLocalResource()); return descriptor; } private Path copyToRemoteApplicationDir( final Path localSrcPath, final String relativeDstPath, final int replicationFactor) throws IOException { final Path applicationDir = getApplicationDirPath(homeDir, applicationId); final String suffix = (relativeDstPath.isEmpty() ? "" : relativeDstPath + "/") + localSrcPath.getName(); final Path dst = new Path(applicationDir, suffix); LOG.debug( "Copying from {} to {} with replication factor {}", localSrcPath, dst, replicationFactor); fileSystem.copyFromLocalFile(false, true, localSrcPath, dst); // ⭐️⭐️⭐️ fileSystem.setReplication(dst, (short) replicationFactor); return dst; } public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { Configuration conf = getConf(); FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf); } public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { FileStatus fileStatus = srcFS.getFileStatus(src); return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf); } public static boolean copy(FileSystem srcFS, FileStatus srcStatus, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { Path src = srcStatus.getPath(); dst = checkDest(src.getName(), dstFS, dst, overwrite); if (srcStatus.isDirectory()) { checkDependencies(srcFS, src, dstFS, dst); if (!dstFS.mkdirs(dst)) { return false; } FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf); } } else { InputStream in=null; OutputStream out = null; try { in = srcFS.open(src); out = dstFS.create(dst, overwrite); IOUtils.copyBytes(in, out, conf, true); // ⭐️⭐️⭐️ } catch (IOException e) { IOUtils.closeStream(out); IOUtils.closeStream(in); throw e; } } if (deleteSource) { return srcFS.delete(src, true); } else { return true; } } public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) throws IOException { copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); } public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException { try { copyBytes(in, out, buffSize); // ⭐️⭐️⭐️ if(close) { out.close(); out = null; in.close(); in = null; } } finally { if(close) { closeStream(out); closeStream(in); } } } public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); } }