Develop Spark applications using the Python SDK

更新时间:
复制 MD 格式

This topic describes how to use the Python SDK to submit a Spark job, query its status and logs, terminate the job, and list historical Spark jobs.

Prerequisites

  • Python 3.7 or later is installed.

  • An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.

  • A job resource group has been created in your AnalyticDB for MySQL cluster.

  • The Python SDK is installed. For more information, see AnalyticDB MySQL SDK for Python.

  • The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables on Linux, macOS, and Windows.

  • A storage path for Spark logs is configured.

    Note

    You can use one of the following methods to configure the storage path for Spark logs:

    • In the AnalyticDB for MySQL console, go to the Spark JAR Development page and click Log Settings in the upper-right corner to set the log storage path.

    • Use the spark.app.log.rootPath parameter to specify an OSS path to store Spark logs.

Example

The following code shows how to submit a Spark job, query its status and logs, terminate the job, and list historical Spark jobs.

from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
    GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
    KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

import os


def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
    """
    Submit a Spark SQL job.

    :param client:             The Alibaba Cloud client.
    :param cluster_id:         The cluster ID.
    :param rg_name:            The name of the resource group.
    :param sql:                The SQL statements.
    :return:                   The Spark job ID.
    :rtype:                    str
    :exception                 ClientException
    """

    # Initialize the request.
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=sql,
        app_type="SQL",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # Submit the SQL statements to get the result.
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # Obtain the Spark job ID.
    print(response)
    return response.body.data.app_id


def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
    """
    Submit a Spark job.

    :param client:             The Alibaba Cloud client.
    :param cluster_id:         The cluster ID.
    :param rg_name:            The name of the resource group.
    :param json_conf:          The JSON configuration.
    :return:                   The Spark job ID.
    :rtype:                    str
    :exception                 ClientException
    """

    # Initialize the request.
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=json_conf,
        app_type="BATCH",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # Submit the job to get the result.
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # Obtain the Spark job ID.
    print(response)
    return response.body.data.app_id


def get_status(client: Client, app_id):
    """
    Query the status of a Spark job.

    :param client:             The Alibaba Cloud client.
    :param app_id:             The Spark job ID.
    :return:                   The status of the Spark job.
    :rtype:                    str
    :exception                 ClientException
    """

    # Initialize the request.
    print(app_id)
    request = GetSparkAppStateRequest(app_id=app_id)
    # Obtain the status of the Spark job.
    response: GetSparkAppStateResponse = client.get_spark_app_state(request)
    print(response)
    return response.body.data.state


def get_log(client: Client, app_id):
    """
    Query the logs of a Spark job.

    :param client:             The Alibaba Cloud client.
    :param app_id:             The Spark job ID.
    :return:                   The logs of the Spark job.
    :rtype:                    str
    :exception                 ClientException
    """

    # Initialize the request.
    request = GetSparkAppLogRequest(app_id=app_id)

    # Obtain the logs of the Spark job.
    response: GetSparkAppLogResponse = client.get_spark_app_log(request)
    print(response)
    return response.body.data.log_content


def kill_app(client: Client, app_id):
    """
    Terminate a Spark job.

    :param client:             The Alibaba Cloud client.
    :param app_id:             The Spark job ID.
    :return:                   The status of the Spark job.
    :exception                 ClientException
    """

    # Initialize the request.
    request = KillSparkAppRequest(app_id=app_id)

    # Obtain the status of the Spark job.
    response: KillSparkAppResponse = client.kill_spark_app(request)
    print(response)
    return response.body.data.state


def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
    """
    Query historical Spark jobs.

    :param client:             The Alibaba Cloud client.
    :param cluster_id:         The cluster ID.
    :param page_number:        The page number. The value must be a positive integer. Default value: 1.
    :param page_size:          The number of entries per page.
    :return:                   None
    :exception                 ClientException
    """

    # Initialize the request.
    request = ListSparkAppsRequest(
        dbcluster_id=cluster_id,
        page_number=page_number,
        page_size=page_size
    )

    # Obtain the details of the Spark jobs.
    response: ListSparkAppsResponse = client.list_spark_apps(request)
    print("Total App Number:", response.body.data.page_number)
    for app_info in response.body.data.app_info_list:
        print(app_info.app_id)
        print(app_info.state)
        print(app_info.detail)


if __name__ == '__main__':
    # client config
    config = Config(
        # Obtain the AccessKey ID from the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable.
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        # Obtain the AccessKey Secret from the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable.
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
        # The endpoint. cn-hangzhou is the region ID of the cluster.
        endpoint="adb.cn-hangzhou.aliyuncs.com"
    )

    # new client
    adb_client = Client(config)

    sql_str = """
        -- Here is just an example of SparkSQL. Modify the content and run your spark program.
        set spark.driver.resourceSpec=medium;
        set spark.executor.instances=2;
        set spark.executor.resourceSpec=medium;
        set spark.app.name=Spark SQL Test;
        -- Here are your sql statements
        show databases;
    """

    json_str = """
    {
        "comments": [
            "-- Here is just an example of SparkPi. Modify the content and run your spark program."
        ],
        "args": [
            "1000"
        ],
        "file": "local:///tmp/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }
    }
    """
    """
    Submit a Spark SQL job.

    cluster_id:    The cluster ID.
    rg_name:       The name of the resource group.
    """

    sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
    print(sql_app_id)

    """
    Submit a Spark job.

    cluster_id:    The cluster ID.
    rg_name:       The name of the resource group.
    """

    json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
                                   rg_name="test", json_conf=json_str)
    print(json_app_id)

    # Query the status of the Spark job.
    get_status(client=adb_client, app_id=sql_app_id)
    get_status(client=adb_client, app_id=json_app_id)

    """
    Query historical Spark jobs.
    cluster_id:      The cluster ID.
    page_number:     The page number. The value must be a positive integer. Default value: 1.
    page_size:       The number of entries per page.
    """

    list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)

    # Terminate the Spark job.
    kill_app(client=adb_client, app_id=json_app_id)