我们希望很快能获得有关正式文档的更全面的指南,但要开始使用,请访问以下API概述:https : //developers.google.com/api-client-
library/java/apis/dataproc/v1
它包括到Dataproc javadocs的链接;如果您的服务器代表自己的项目而不是最终用户的Google项目进行调用,则您可能希望此处说明的基于密钥文件的服务帐户身份验证创建
Credential用于初始化
Dataproc客户端存根的对象。
对于特定于dataproc的部分,这仅意味着如果使用Maven,则将以下依赖项添加到Maven pomfile中:
<project> <dependencies> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-dataproc</artifactId> <version>v1-rev4-1.21.0</version> </dependency> </dependencies></project>
然后,您将获得如下代码:
Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential) .setApplicationName("my-webabb/1.0") .build();dataproc.projects().regions().jobs().submit( projectId, "global", new SubmitJobRequest() .setJob(new Job() .setPlacement(new JobPlacement() .setClusterName("my-spark-cluster")) .setSparkJob(new SparkJob() .setMainClass("FooSparkJobMain") .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar")) .setArgs(ImmutableList.of( "arg1", "arg2", "arg3"))))) .execute();
由于不同的中间服务器可能会进行低级重试,或者您的请求可能会引发IOException,而您不知道提交作业是否成功,因此您可能要执行的另一步骤是生成自己的作业
jobId;那么您知道要轮询哪个jobId,以弄清它是否已提交,即使您的请求超时或引发一些未知的异常:
import java.util.UUID;...Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential) .setApplicationName("my-webabb/1.0") .build();String curJobId = "json-agg-job-" + UUID.randomUUID().toString();Job jobSnapshot = null;try { jobSnapshot = dataproc.projects().regions().jobs().submit( projectId, "global", new SubmitJobRequest() .setJob(new Job() .setReference(new JobReference() .setJobId(curJobId)) .setPlacement(new JobPlacement() .setClusterName("my-spark-cluster")) .setSparkJob(new SparkJob() .setMainClass("FooSparkJobMain") .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar")) .setArgs(ImmutableList.of("arg1", "arg2", "arg3"))))) .execute();} catch (IOException ioe) { try { jobSnapshot = dataproc.projects().regions().jobs().get( projectId, "global", curJobId).execute(); logger.info(ioe, "Despite exception, job was verified submitted"); } catch (IOException ioe2) { // Handle differently; if it's a GoogleJsonResponseException you can inspect the error // pre, and if it's a 404, then it means the job didn't get submitted; you can add retry // logic in that case. }}// We can poll on dataproc.projects().regions().jobs().get(...) until the job reports being// completed or failed now.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)