通过Python SDK开发Spark应用

本文主要介绍如何通过Python SDK提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业。

前提条件

  • 已安装Python环境,且Python版本为3.7及以上版本。

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已安装Python SDK。具体操作,请参见AnalyticDB MySQL SDK for Python

  • 已设置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量

  • 已配置Spark日志的存储地址。

    说明

    配置Spark日志存储地址的两种方法如下:

    • AnalyticDB for MySQL控制台的Spark Jar开发页面,单击页面右上角的日志配置,设置Spark日志的存储地址。

    • 使用配置项spark.app.log.rootPath指定一个OSS路径来存储Spark作业的执行日志。

示例

以下为提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业的完整示例代码。

from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
    GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
    KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

import os


def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
    """
    提交Spark SQL作业

    :param client:             阿里云客户端
    :param cluster_id:         集群ID
    :param rg_name:            资源组名称
    :param sql:                SQL
    :return:                   Spark作业ID
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=sql,
        app_type="SQL",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # 提交SQL获取结果
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # 获取Spark作业ID
    print(response)
    return response.body.data.app_id


def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
    """
    提交Spark作业

    :param client:             阿里云客户端
    :param cluster_id:         集群ID
    :param rg_name:            资源组名称
    :param json_conf:          JSON配置
    :return:                   Spark作业ID
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=json_conf,
        app_type="BATCH",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # 提交SQL获取结果
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # 获取Spark作业ID
    print(response)
    return response.body.data.app_id


def get_status(client: Client, app_id):
    """
    查询Spark作业的状态

    :param client:             阿里云客户端
    :param app_id:             Spark作业ID
    :return:                   Spark作业的状态
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    print(app_id)
    request = GetSparkAppStateRequest(app_id=app_id)
    # 获取Spark作业的状态
    response: GetSparkAppStateResponse = client.get_spark_app_state(request)
    print(response)
    return response.body.data.state


def get_log(client: Client, app_id):
    """
    查询Spark作业的日志信息

    :param client:             阿里云客户端
    :param app_id:             Spark作业ID
    :return:                   Spark作业的日志信息
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    request = GetSparkAppLogRequest(app_id=app_id)

    # 获取Spark作业的日志信息
    response: GetSparkAppLogResponse = client.get_spark_app_log(request)
    print(response)
    return response.body.data.log_content


def kill_app(client: Client, app_id):
    """
    结束Spark作业

    :param client:             阿里云客户端
    :param app_id:             Spark作业ID
    :return:                   Spark作业的状态
    :exception                 ClientException
    """

    # 初始化请求内容
    request = KillSparkAppRequest(app_id=app_id)

    # 获取Spark作业的状态
    response: KillSparkAppResponse = client.kill_spark_app(request)
    print(response)
    return response.body.data.state


def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
    """
    查询Spark历史作业

    :param client:             阿里云客户端
    :param cluster_id:         集群ID
    :param page_number:        页码,取值为正整数,默认值为1
    :param page_size:          每页记录数
    :return:                   Spark作业详细信息
    :exception                 ClientException
    """

    # 初始化请求内容
    request = ListSparkAppsRequest(
        dbcluster_id=cluster_id,
        page_number=page_number,
        page_size=page_size
    )

    # 获取Spark作业详细信息
    response: ListSparkAppsResponse = client.list_spark_apps(request)
    print("Total App Number:", response.body.data.page_number)
    for app_info in response.body.data.app_info_list:
        print(app_info.app_id)
        print(app_info.state)
        print(app_info.detail)


if __name__ == '__main__':
    # client config
    config = Config(
        # 从环境变量ALIBABA_CLOUD_ACCESS_KEY_ID中获取AccessKey ID
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        # 从环境变量ALIBABA_CLOUD_ACCESS_KEY_SECRET中获取AccessKey Secret
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
        # 连接地址,cn-hangzhou为集群所在的地域ID
        endpoint="adb.cn-hangzhou.aliyuncs.com"
    )

    # new client
    adb_client = Client(config)

    sql_str = """
        -- Here is just an example of SparkSQL. Modify the content and run your spark program.
        set spark.driver.resourceSpec=medium;
        set spark.executor.instances=2;
        set spark.executor.resourceSpec=medium;
        set spark.app.name=Spark SQL Test;
        -- Here are your sql statements
        show databases;
    """

    json_str = """
    {
        "comments": [
            "-- Here is just an example of SparkPi. Modify the content and run your spark program."
        ],
        "args": [
            "1000"
        ],
        "file": "local:///tmp/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }
    }
    """
    """
    提交Spark SQL作业

    cluster_id:    集群ID
    rg_name:       资源组名称
    """

    sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
    print(sql_app_id)

    """
    提交Spark作业

    cluster_id:    集群ID
    rg_name:       资源组名称
    """

    json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
                                   rg_name="test", json_conf=json_str)
    print(json_app_id)

    # 查询Spark作业的状态
    get_status(client=adb_client, app_id=sql_app_id)
    get_status(client=adb_client, app_id=json_app_id)

    """
    查询Spark历史作业
    cluster_id:      集群ID     
    page_number:     页码,取值为正整数。默认值为1
    page_size:       每页记录数
    """

    list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)

    # 结束Spark作业
    kill_app(client=adb_client, app_id=json_app_id)