启动Spark任务

本文展示了如何通过调用阿里云EMR Serverless Spark Java SDK来启动Spark任务。

前提条件

  • 已创建AccessKey,详情请参见创建AccessKey

    说明

    为避免阿里云账号(主账号)泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户EMR Serverless Spark相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:

  • 已安装Java环境。Alibaba Cloud SDK for Java要求使用JDK 8或更高版本。

  • 请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见在Linux、macOS和Windows系统配置环境变量

参考示例

添加EMR Serverless Spark SDK依赖

添加以下依赖到您项目的pom.xml中以引入EMR Serverless Spark SDK。

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>emr_serverless_spark20230808</artifactId>
    <version>1.0.0</version>
</dependency>

确保artifactIdversion与EMR Serverless Spark SDK的最新版本一致。

示例代码

import java.util.Arrays;

import com.aliyun.emr_serverless_spark20230808.Client;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.Tag;
import com.aliyun.tea.TeaException;

public class AliyunServerlessSparkTaskExample {
    public static void main(String[] args) throws Exception {
        com.aliyun.emr_serverless_spark20230808.Client client = AliyunServerlessSparkTaskExample.createClient();
        com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest startJobRunRequest = new com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest();
        startJobRunRequest.setRegionId("cn-hangzhou");
        startJobRunRequest.setResourceQueueId("root_queue");
        startJobRunRequest.setCodeType("JAR");
        startJobRunRequest.setName("spark-task");
        startJobRunRequest.setReleaseVersion("esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)");
        Tag envTag = new Tag();
        envTag.setKey("environment");
        String envType = "production";
        envTag.setValue(envType);
        Tag workflowTag = new Tag();
        workflowTag.setKey("workflow");
        workflowTag.setValue("true");
        startJobRunRequest.setTags(Arrays.asList(envTag, workflowTag));
        com.aliyun.emr_serverless_spark20230808.models.JobDriver.JobDriverSparkSubmit jobDriverSparkSubmit = new com.aliyun.emr_serverless_spark20230808.models.JobDriver.JobDriverSparkSubmit()
        .setEntryPoint("oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar")
        .setEntryPointArguments(java.util.Arrays.asList(
            "1"
        ))
        .setSparkSubmitParameters("--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1");
        com.aliyun.emr_serverless_spark20230808.models.JobDriver jobDriver = new com.aliyun.emr_serverless_spark20230808.models.JobDriver()
        .setSparkSubmit(jobDriverSparkSubmit);
        startJobRunRequest.setJobDriver(jobDriver);
        com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
        java.util.Map<String, String> headers = new java.util.HashMap<>();
        try {
            StartJobRunResponse startJobRunResponse = client.startJobRunWithOptions("w-f7b841e8c732****", startJobRunRequest, headers, runtime);
            System.out.println(startJobRunResponse.getBody().getJobRunId());
        } catch (TeaException error) {
            System.out.println(error.getMessage());
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            System.out.println(error.getMessage());
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }


//  请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
//  工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取AccessKey的方式进行调用,仅供参考,建议使用更安全的STS方式。
//  将endpoint中的变量替换为EMR Serverless Spark支持的地域ID。
    public static Client createClient() throws Exception {
        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
            .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
            .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.endpoint = "emr-serverless-spark.cn-hangzhou.aliyuncs.com";
        return new com.aliyun.emr_serverless_spark20230808.Client(config);
    }
}