启动Spark任务

除控制台方式外,您还能通过API提交Spark任务。阿里云提供了多语言版本的SDK来封装API。本文基于Python语言介绍如何通过API提交Spark任务。

前提条件

  • 已创建AccessKey,详情请参见创建AccessKey

    说明

    为避免阿里云账号(主账号)泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户EMR Serverless Spark相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:

  • 已准备Python3环境。

  • 请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见在Linux、macOS和Windows系统配置环境变量

安装EMR Serverless Spark Python SDK

运行以下命令,安装Python SDK。

pip install alibabacloud_emr_serverless_spark20230808==1.0.0

参考示例

本文的完整代码示例如下所示,您可以根据实际情况修改代码内容。

关于EMR Serverless Spark服务接入点的更多信息,请参见服务接入点

# -*- coding: utf-8 -*-
import os

from typing import List
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_emr_serverless_spark20230808.client import Client
from alibabacloud_emr_serverless_spark20230808.models import (
    StartJobRunRequest,
    Tag,
    JobDriver,
    JobDriverSparkSubmit,
)

from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


# 请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
# 工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取AccessKey的方式进行调用,仅供参考,建议使用更安全的STS方式。
# 将endpoint中的变量替换为EMR Serverless Spark支持的地域ID。
def create_client() -> Client:
    config = open_api_models.Config(
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
    config.endpoint = f'emr-serverless-spark.cn-hangzhou.aliyuncs.com'
    return Client(config)


def example_jar():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        ["1"],
        "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_sql():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql",
        ["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        "--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    # configuration_overrides = StartJobRunRequestConfigurationOverrides([StartJobRunRequestConfigurationOverridesConfigurations("test", "test", "test")])
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-sql-test",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver,
        # configuration_overrides=configuration_overrides
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_py():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        ["50"],
        "--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)


example_jar()
# example_sql()
# example_py()