本文展示了如何通过调用阿里云EMR Serverless Spark Java SDK来启动Spark任务。
前提条件
已创建AccessKey,详情请参见创建AccessKey。
说明为避免阿里云账号(主账号)泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户EMR Serverless Spark相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:
创建RAM用户以及对应AccessKey,请参见创建RAM用户或创建AccessKey。
为RAM用户授权,请参见RAM用户授权。
已安装Java环境。Alibaba Cloud SDK for Java要求使用JDK 8或更高版本。
请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见在Linux、macOS和Windows系统配置环境变量。
参考示例
添加EMR Serverless Spark SDK依赖
添加以下依赖到您项目的pom.xml中以引入EMR Serverless Spark SDK。
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>emr_serverless_spark20230808</artifactId>
<version>1.0.0</version>
</dependency>
确保artifactId
和version
与EMR Serverless Spark SDK的最新版本一致。
示例代码
import java.util.Arrays;
import com.aliyun.emr_serverless_spark20230808.Client;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.Tag;
import com.aliyun.tea.TeaException;
public class AliyunServerlessSparkTaskExample {
public static void main(String[] args) throws Exception {
com.aliyun.emr_serverless_spark20230808.Client client = AliyunServerlessSparkTaskExample.createClient();
com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest startJobRunRequest = new com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest();
startJobRunRequest.setRegionId("cn-hangzhou");
startJobRunRequest.setResourceQueueId("root_queue");
startJobRunRequest.setCodeType("JAR");
startJobRunRequest.setName("spark-task");
startJobRunRequest.setReleaseVersion("esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)");
Tag envTag = new Tag();
envTag.setKey("environment");
String envType = "production";
envTag.setValue(envType);
Tag workflowTag = new Tag();
workflowTag.setKey("workflow");
workflowTag.setValue("true");
startJobRunRequest.setTags(Arrays.asList(envTag, workflowTag));
com.aliyun.emr_serverless_spark20230808.models.JobDriver.JobDriverSparkSubmit jobDriverSparkSubmit = new com.aliyun.emr_serverless_spark20230808.models.JobDriver.JobDriverSparkSubmit()
.setEntryPoint("oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar")
.setEntryPointArguments(java.util.Arrays.asList(
"1"
))
.setSparkSubmitParameters("--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1");
com.aliyun.emr_serverless_spark20230808.models.JobDriver jobDriver = new com.aliyun.emr_serverless_spark20230808.models.JobDriver()
.setSparkSubmit(jobDriverSparkSubmit);
startJobRunRequest.setJobDriver(jobDriver);
com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
java.util.Map<String, String> headers = new java.util.HashMap<>();
try {
StartJobRunResponse startJobRunResponse = client.startJobRunWithOptions("w-f7b841e8c732****", startJobRunRequest, headers, runtime);
System.out.println(startJobRunResponse.getBody().getJobRunId());
} catch (TeaException error) {
System.out.println(error.getMessage());
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
} catch (Exception _error) {
TeaException error = new TeaException(_error.getMessage(), _error);
System.out.println(error.getMessage());
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
}
}
// 请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
// 工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取AccessKey的方式进行调用,仅供参考,建议使用更安全的STS方式。
// 将endpoint中的变量替换为EMR Serverless Spark支持的地域ID。
public static Client createClient() throws Exception {
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
config.endpoint = "emr-serverless-spark.cn-hangzhou.aliyuncs.com";
return new com.aliyun.emr_serverless_spark20230808.Client(config);
}
}
文档内容是否对您有帮助?