Develop Spark applications using the Python SDK
This topic describes how to use the Python SDK to submit a Spark job, query its status and logs, terminate the job, and list historical Spark jobs.
Prerequisites
-
Python 3.7 or later is installed.
-
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
-
A job resource group has been created in your AnalyticDB for MySQL cluster.
-
The Python SDK is installed. For more information, see AnalyticDB MySQL SDK for Python.
-
The
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETenvironment variables are configured. For more information, see Configure environment variables on Linux, macOS, and Windows. -
A storage path for Spark logs is configured.
NoteYou can use one of the following methods to configure the storage path for Spark logs:
-
In the AnalyticDB for MySQL console, go to the Spark JAR Development page and click Log Settings in the upper-right corner to set the log storage path.
-
Use the
spark.app.log.rootPathparameter to specify an OSS path to store Spark logs.
-
Example
The following code shows how to submit a Spark job, query its status and logs, terminate the job, and list historical Spark jobs.
from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
import os
def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
"""
Submit a Spark SQL job.
:param client: The Alibaba Cloud client.
:param cluster_id: The cluster ID.
:param rg_name: The name of the resource group.
:param sql: The SQL statements.
:return: The Spark job ID.
:rtype: str
:exception ClientException
"""
# Initialize the request.
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
# Submit the SQL statements to get the result.
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# Obtain the Spark job ID.
print(response)
return response.body.data.app_id
def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
"""
Submit a Spark job.
:param client: The Alibaba Cloud client.
:param cluster_id: The cluster ID.
:param rg_name: The name of the resource group.
:param json_conf: The JSON configuration.
:return: The Spark job ID.
:rtype: str
:exception ClientException
"""
# Initialize the request.
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json_conf,
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
# Submit the job to get the result.
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# Obtain the Spark job ID.
print(response)
return response.body.data.app_id
def get_status(client: Client, app_id):
"""
Query the status of a Spark job.
:param client: The Alibaba Cloud client.
:param app_id: The Spark job ID.
:return: The status of the Spark job.
:rtype: str
:exception ClientException
"""
# Initialize the request.
print(app_id)
request = GetSparkAppStateRequest(app_id=app_id)
# Obtain the status of the Spark job.
response: GetSparkAppStateResponse = client.get_spark_app_state(request)
print(response)
return response.body.data.state
def get_log(client: Client, app_id):
"""
Query the logs of a Spark job.
:param client: The Alibaba Cloud client.
:param app_id: The Spark job ID.
:return: The logs of the Spark job.
:rtype: str
:exception ClientException
"""
# Initialize the request.
request = GetSparkAppLogRequest(app_id=app_id)
# Obtain the logs of the Spark job.
response: GetSparkAppLogResponse = client.get_spark_app_log(request)
print(response)
return response.body.data.log_content
def kill_app(client: Client, app_id):
"""
Terminate a Spark job.
:param client: The Alibaba Cloud client.
:param app_id: The Spark job ID.
:return: The status of the Spark job.
:exception ClientException
"""
# Initialize the request.
request = KillSparkAppRequest(app_id=app_id)
# Obtain the status of the Spark job.
response: KillSparkAppResponse = client.kill_spark_app(request)
print(response)
return response.body.data.state
def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
"""
Query historical Spark jobs.
:param client: The Alibaba Cloud client.
:param cluster_id: The cluster ID.
:param page_number: The page number. The value must be a positive integer. Default value: 1.
:param page_size: The number of entries per page.
:return: None
:exception ClientException
"""
# Initialize the request.
request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=page_number,
page_size=page_size
)
# Obtain the details of the Spark jobs.
response: ListSparkAppsResponse = client.list_spark_apps(request)
print("Total App Number:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print(app_info.app_id)
print(app_info.state)
print(app_info.detail)
if __name__ == '__main__':
# client config
config = Config(
# Obtain the AccessKey ID from the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Obtain the AccessKey Secret from the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# The endpoint. cn-hangzhou is the region ID of the cluster.
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
# new client
adb_client = Client(config)
sql_str = """
-- Here is just an example of SparkSQL. Modify the content and run your spark program.
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- Here are your sql statements
show databases;
"""
json_str = """
{
"comments": [
"-- Here is just an example of SparkPi. Modify the content and run your spark program."
],
"args": [
"1000"
],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
"""
"""
Submit a Spark SQL job.
cluster_id: The cluster ID.
rg_name: The name of the resource group.
"""
sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
print(sql_app_id)
"""
Submit a Spark job.
cluster_id: The cluster ID.
rg_name: The name of the resource group.
"""
json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
rg_name="test", json_conf=json_str)
print(json_app_id)
# Query the status of the Spark job.
get_status(client=adb_client, app_id=sql_app_id)
get_status(client=adb_client, app_id=json_app_id)
"""
Query historical Spark jobs.
cluster_id: The cluster ID.
page_number: The page number. The value must be a positive integer. Default value: 1.
page_size: The number of entries per page.
"""
list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)
# Terminate the Spark job.
kill_app(client=adb_client, app_id=json_app_id)