flink1.14上传jar包源码

flink1.14上传jar包源码,第1张

flink1.14上传jar包源码

水一篇 flink 1.14 上传文件的源码流程

为了方便查找,用 ⭐️⭐️⭐️ 标注了。

上传文件核心源码就是 io 流的读写

1.10 和 1.14 大致相同,只有细微细节不同。

从网上借个图。


//  YarnClusterDescriptor 类里面,有启动 startAppMaster 方法。
private ApplicationReport startAppMaster(
        Configuration configuration,
        String applicationName,
        String yarnClusterEntrypoint,
        JobGraph jobGraph,
        YarnClient yarnClient,
        YarnClientApplication yarnApplication,
        ClusterSpecification clusterSpecification)
        throws Exception {
            ...

            fileUploader.registerSingleLocalResource(
                    jobGraphFilename,
                    new Path(tmpJobGraphFile.toURI()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    false);
            classPathBuilder.append(jobGraphFilename).append(File.pathSeparator); // ⭐️⭐️⭐️

            ...
}

YarnLocalResourceDescriptor registerSingleLocalResource(
        final String key,
        final Path resourcePath,
        final String relativeDstPath,
        final LocalResourceType resourceType,
        final boolean whetherToAddToRemotePaths,
        final boolean whetherToAddToEnvShipResourceList)
        throws IOException {

    addToRemotePaths(whetherToAddToRemotePaths, resourcePath);

    if (Utils.isRemotePath(resourcePath.toString())) {
        final FileStatus fileStatus = fileSystem.getFileStatus(resourcePath);
        LOG.debug("Using remote file {} to register local resource", fileStatus.getPath());

        final YarnLocalResourceDescriptor descriptor =
                YarnLocalResourceDescriptor.fromFileStatus(
                        key, fileStatus, LocalResourceVisibility.APPLICATION, resourceType);
        addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
        localResources.put(key, descriptor.toLocalResource());
        return descriptor;
    }

    final File localFile = new File(resourcePath.toUri().getPath());
    final Tuple2 remoteFileInfo =
            uploadLocalFileToRemote(resourcePath, relativeDstPath); // ⭐️⭐️⭐️
    final YarnLocalResourceDescriptor descriptor =
            new YarnLocalResourceDescriptor(
                    key,
                    remoteFileInfo.f0,
                    localFile.length(),
                    remoteFileInfo.f1,
                    LocalResourceVisibility.APPLICATION,
                    resourceType);
    addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
    localResources.put(key, descriptor.toLocalResource());
    return descriptor;
}


private Path copyToRemoteApplicationDir(
        final Path localSrcPath, final String relativeDstPath, final int replicationFactor)
        throws IOException {

    final Path applicationDir = getApplicationDirPath(homeDir, applicationId);
    final String suffix =
            (relativeDstPath.isEmpty() ? "" : relativeDstPath + "/") + localSrcPath.getName();
    final Path dst = new Path(applicationDir, suffix);

    LOG.debug(
            "Copying from {} to {} with replication factor {}",
            localSrcPath,
            dst,
            replicationFactor);

    fileSystem.copyFromLocalFile(false, true, localSrcPath, dst); // ⭐️⭐️⭐️
    fileSystem.setReplication(dst, (short) replicationFactor);
    return dst;
}


public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
                            Path src, Path dst)
throws IOException {
    Configuration conf = getConf();
    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
}

public static boolean copy(FileSystem srcFS, Path src, 
                            FileSystem dstFS, Path dst, 
                            boolean deleteSource,
                            boolean overwrite,
                            Configuration conf) throws IOException {
    FileStatus fileStatus = srcFS.getFileStatus(src);
    return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
}


public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
                            FileSystem dstFS, Path dst,
                            boolean deleteSource,
                            boolean overwrite,
                            Configuration conf) throws IOException {
    Path src = srcStatus.getPath();
    dst = checkDest(src.getName(), dstFS, dst, overwrite);
    if (srcStatus.isDirectory()) {
        checkDependencies(srcFS, src, dstFS, dst);
        if (!dstFS.mkdirs(dst)) {
        return false;
        }
        FileStatus contents[] = srcFS.listStatus(src);
        for (int i = 0; i < contents.length; i++) {
        copy(srcFS, contents[i], dstFS,
                new Path(dst, contents[i].getPath().getName()),
                deleteSource, overwrite, conf);
        }
    } else {
        InputStream in=null;
        OutputStream out = null;
        try {
        in = srcFS.open(src);
        out = dstFS.create(dst, overwrite);
        IOUtils.copyBytes(in, out, conf, true); // ⭐️⭐️⭐️
        } catch (IOException e) {
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
        throw e;
        }
    }
    if (deleteSource) {
        return srcFS.delete(src, true);
    } else {
        return true;
    }

}

  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
    throws IOException {
    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
  }

public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
    throws IOException {
    try {
      copyBytes(in, out, buffSize); // ⭐️⭐️⭐️
      if(close) {
        out.close();
        out = null;
        in.close();
        in = null;
      }
    } finally {
      if(close) {
        closeStream(out);
        closeStream(in);
      }
    }
}


public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException {
    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
    byte buf[] = new byte[buffSize];
    int bytesRead = in.read(buf);
    while (bytesRead >= 0) {
        out.write(buf, 0, bytesRead);
        if ((ps != null) && ps.checkError()) {
        throw new IOException("Unable to write to output stream.");
        }
        bytesRead = in.read(buf);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5684365.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存