通过Python SDK开发Spark应用
更新时间:
本文主要介绍如何通过Python SDK提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业。
前提条件
已安装Python环境,且Python版本为3.7及以上版本。
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
已安装Python SDK。具体操作,请参见AnalyticDB MySQL SDK for Python。
已设置环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。已配置Spark日志的存储地址。
说明配置Spark日志存储地址的两种方法如下:
在AnalyticDB for MySQL控制台的Spark Jar开发页面,单击页面右上角的日志配置,设置Spark日志的存储地址。
使用配置项
spark.app.log.rootPath
指定一个OSS路径来存储Spark作业的执行日志。
示例
以下为提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业的完整示例代码。
from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
import os
def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
"""
提交Spark SQL作业
:param client: 阿里云客户端
:param cluster_id: 集群ID
:param rg_name: 资源组名称
:param sql: SQL
:return: Spark作业ID
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
# 提交SQL获取结果
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# 获取Spark作业ID
print(response)
return response.body.data.app_id
def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
"""
提交Spark作业
:param client: 阿里云客户端
:param cluster_id: 集群ID
:param rg_name: 资源组名称
:param json_conf: JSON配置
:return: Spark作业ID
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json_conf,
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
# 提交SQL获取结果
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# 获取Spark作业ID
print(response)
return response.body.data.app_id
def get_status(client: Client, app_id):
"""
查询Spark作业的状态
:param client: 阿里云客户端
:param app_id: Spark作业ID
:return: Spark作业的状态
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
print(app_id)
request = GetSparkAppStateRequest(app_id=app_id)
# 获取Spark作业的状态
response: GetSparkAppStateResponse = client.get_spark_app_state(request)
print(response)
return response.body.data.state
def get_log(client: Client, app_id):
"""
查询Spark作业的日志信息
:param client: 阿里云客户端
:param app_id: Spark作业ID
:return: Spark作业的日志信息
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
request = GetSparkAppLogRequest(app_id=app_id)
# 获取Spark作业的日志信息
response: GetSparkAppLogResponse = client.get_spark_app_log(request)
print(response)
return response.body.data.log_content
def kill_app(client: Client, app_id):
"""
结束Spark作业
:param client: 阿里云客户端
:param app_id: Spark作业ID
:return: Spark作业的状态
:exception ClientException
"""
# 初始化请求内容
request = KillSparkAppRequest(app_id=app_id)
# 获取Spark作业的状态
response: KillSparkAppResponse = client.kill_spark_app(request)
print(response)
return response.body.data.state
def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
"""
查询Spark历史作业
:param client: 阿里云客户端
:param cluster_id: 集群ID
:param page_number: 页码,取值为正整数,默认值为1
:param page_size: 每页记录数
:return: Spark作业详细信息
:exception ClientException
"""
# 初始化请求内容
request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=page_number,
page_size=page_size
)
# 获取Spark作业详细信息
response: ListSparkAppsResponse = client.list_spark_apps(request)
print("Total App Number:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print(app_info.app_id)
print(app_info.state)
print(app_info.detail)
if __name__ == '__main__':
# client config
config = Config(
# 从环境变量ALIBABA_CLOUD_ACCESS_KEY_ID中获取AccessKey ID
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 从环境变量ALIBABA_CLOUD_ACCESS_KEY_SECRET中获取AccessKey Secret
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# 连接地址,cn-hangzhou为集群所在的地域ID
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
# new client
adb_client = Client(config)
sql_str = """
-- Here is just an example of SparkSQL. Modify the content and run your spark program.
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- Here are your sql statements
show databases;
"""
json_str = """
{
"comments": [
"-- Here is just an example of SparkPi. Modify the content and run your spark program."
],
"args": [
"1000"
],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
"""
"""
提交Spark SQL作业
cluster_id: 集群ID
rg_name: 资源组名称
"""
sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
print(sql_app_id)
"""
提交Spark作业
cluster_id: 集群ID
rg_name: 资源组名称
"""
json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
rg_name="test", json_conf=json_str)
print(json_app_id)
# 查询Spark作业的状态
get_status(client=adb_client, app_id=sql_app_id)
get_status(client=adb_client, app_id=json_app_id)
"""
查询Spark历史作业
cluster_id: 集群ID
page_number: 页码,取值为正整数。默认值为1
page_size: 每页记录数
"""
list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)
# 结束Spark作业
kill_app(client=adb_client, app_id=json_app_id)
文档内容是否对您有帮助?