您如何使用Google DataProc Java Client通过关联的GS存储桶中的jar文件和类来提交Spark作业?

您如何使用Google DataProc Java Client通过关联的GS存储桶中的jar文件和类来提交Spark作业?,第1张

您如何使用Google DataProc Java Client通过关联的GS存储桶中的jar文件和类来提交Spark作业

我们希望很快能获得有关正式文档的更全面的指南,但要开始使用,请访问以下API概述:https : //developers.google.com/api-client-
library/java/apis/dataproc/v1

它包括到Dataproc javadocs的链接;如果您的服务器代表自己的项目而不是最终用户的Google项目进行调用,则您可能希望此处说明的基于密钥文件的服务帐户身份验证创建

Credential
用于初始化
Dataproc
客户端存根的对象。

对于特定于dataproc的部分,这仅意味着如果使用Maven,则将以下依赖项添加到Maven pomfile中:

<project>  <dependencies>    <dependency>      <groupId>com.google.apis</groupId>      <artifactId>google-api-services-dataproc</artifactId>      <version>v1-rev4-1.21.0</version>    </dependency>  </dependencies></project>

然后,您将获得如下代码:

Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)    .setApplicationName("my-webabb/1.0")    .build();dataproc.projects().regions().jobs().submit(    projectId, "global", new SubmitJobRequest()        .setJob(new Job() .setPlacement(new JobPlacement()     .setClusterName("my-spark-cluster")) .setSparkJob(new SparkJob()     .setMainClass("FooSparkJobMain")     .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))     .setArgs(ImmutableList.of(         "arg1", "arg2", "arg3")))))    .execute();

由于不同的中间服务器可能会进行低级重试,或者您的请求可能会引发IOException,而您不知道提交作业是否成功,因此您可能要执行的另一步骤是生成自己的作业

jobId
;那么您知道要轮询哪个jobId,以弄清它是否已提交,即使您的请求超时或引发一些未知的异常:

import java.util.UUID;...Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)    .setApplicationName("my-webabb/1.0")    .build();String curJobId = "json-agg-job-" + UUID.randomUUID().toString();Job jobSnapshot = null;try {  jobSnapshot = dataproc.projects().regions().jobs().submit(      projectId, "global", new SubmitJobRequest()          .setJob(new Job()   .setReference(new JobReference()        .setJobId(curJobId))   .setPlacement(new JobPlacement()       .setClusterName("my-spark-cluster"))   .setSparkJob(new SparkJob()       .setMainClass("FooSparkJobMain")       .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))       .setArgs(ImmutableList.of("arg1", "arg2", "arg3")))))      .execute();} catch (IOException ioe) {  try {    jobSnapshot = dataproc.projects().regions().jobs().get(        projectId, "global", curJobId).execute();    logger.info(ioe, "Despite exception, job was verified submitted");  } catch (IOException ioe2) {    // Handle differently; if it's a GoogleJsonResponseException you can inspect the error    // pre, and if it's a 404, then it means the job didn't get submitted; you can add retry    // logic in that case.  }}// We can poll on dataproc.projects().regions().jobs().get(...) until the job reports being// completed or failed now.


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5501770.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-13
下一篇 2022-12-13

发表评论

登录后才能评论

评论列表(0条)

保存