除控制台方式外,您还能通过API提交Spark任务。阿里云提供了多语言版本的SDK来封装API。本文基于Python语言介绍如何通过API提交Spark任务。
前提条件
已创建AccessKey,详情请参见创建AccessKey。
说明为避免阿里云账号(主账号)泄露AccessKey带来安全风险,建议您创建RAM用户,授予RAM用户EMR Serverless Spark相关的访问权限,再使用RAM用户的AccessKey调用SDK。相关文档请参见:
创建RAM用户以及对应AccessKey,请参见创建RAM用户或创建AccessKey。
为RAM用户授权,请参见RAM用户授权。
已准备Python3环境。
请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见在Linux、macOS和Windows系统配置环境变量。
安装EMR Serverless Spark Python SDK
运行以下命令,安装Python SDK。
pip install alibabacloud_emr_serverless_spark20230808==1.0.0
参考示例
本文的完整代码示例如下所示,您可以根据实际情况修改代码内容。
关于EMR Serverless Spark服务接入点的更多信息,请参见服务接入点。
# -*- coding: utf-8 -*-
import os
from typing import List
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_emr_serverless_spark20230808.client import Client
from alibabacloud_emr_serverless_spark20230808.models import (
StartJobRunRequest,
Tag,
JobDriver,
JobDriverSparkSubmit,
)
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
# 请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
# 工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取AccessKey的方式进行调用,仅供参考,建议使用更安全的STS方式。
# 将endpoint中的变量替换为EMR Serverless Spark支持的地域ID。
def create_client() -> Client:
config = open_api_models.Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
config.endpoint = f'emr-serverless-spark.cn-hangzhou.aliyuncs.com'
return Client(config)
def example_jar():
print("Let's run a simple test...")
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
["1"],
"--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="JAR",
name="emr-spark-task",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def example_sql():
print("Let's run a simple test...")
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/sql/show_db.sql",
["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
"--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
# configuration_overrides = StartJobRunRequestConfigurationOverrides([StartJobRunRequestConfigurationOverridesConfigurations("test", "test", "test")])
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-sql-test",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver,
# configuration_overrides=configuration_overrides
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def example_py():
print("Let's run a simple test...")
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
["50"],
"--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="PYTHON",
name="emr-spark-task",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
example_jar()
# example_sql()
# example_py()
文档内容是否对您有帮助?