通过PySpark开发Spark应用
本文介绍了如何开发AnalyticDB for MySQL Spark Python作业,以及如何通过VirtualEnv技术打包Python作业的运行环境。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
集群与OSS存储空间位于相同地域。
已在企业版、基础版或湖仓版集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
PySpark的基本用法
编写如下示例程序,并将示例程序存储为
example.py
。from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show()
将
example.py
程序上传到OSS中。具体操作,请参见控制台上传文件。进入Spark开发编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。
在左侧导航栏,单击作业开发>Spark Jar 开发。
在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。
在编辑器中执行以下作业内容。
{ "name": "Spark Python Test", "file": "oss://{your oss bucket path}/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
参数说明请参见参数说明。
使用Python依赖
使用方法
如果您使用自行开发或第三方开发的依赖开发Python程序时,需将使用的依赖上传至OSS中,并在提交Spark作业时配置pyFiles
参数。
示例
本文示例以引入自定义函数计算员工的税后收入为例。示例将数据文件staff.csv
上传至OSS中。staff.csv
中的示例数据如下:
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
开发依赖并上传至OSS中。
创建名为
tools
的文件夹,并在该文件夹下创建名为func.py
的程序。def tax(salary): """ convert string to int and cut 15% tax from the salary :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)
将
tools
文件夹压缩后上传至OSS中。本文示例为tools.tar.gz
。说明如果依赖多个Python文件,建议您使用gz压缩包进行压缩。您可以在Python代码中以module方式引用Python文件。
编写名为
example.py
的示例程序。from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # import third party file from tools import func if __name__ == "__main__": # init pyspark context spark = SparkSession.builder.appName("Python Example").getOrCreate() # read csv from oss to a dataframe, show the table cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # print schema and data to the console df.printSchema() df.show() # create an udf taxCut = udf(lambda salary: func.tax(salary), FloatType()) # cut tax from salary and show result df.select("name", taxCut("salary").alias("final salary")).show() spark.stop()
将
example.py
程序上传到OSS中。具体操作,请参见控制台上传文件。进入Spark开发编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。
在左侧导航栏,单击作业开发>Spark Jar 开发。
在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。
在编辑器中执行以下作业内容。
{ "name": "Spark Python", "file": "oss://<bucket name>/example.py", "pyFiles": ["oss://<bucket name>/tools.tar.gz"], "args": [ "oss://<bucket name>/staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }
参数说明:
file:Python程序所在的OSS路径。
pyFiles:PySpark依赖的Python文件所在的OSS路径,后缀可以是tar或tar.gz。多个压缩包使用英文逗号(,)分隔。
说明PySpark应用所依赖的所有Python文件必须存储在OSS中。
args:使用JAR包时需要使用的参数。本文为
staff.csv
示例数据所在的OSS路径。
更多参数,请参见参数说明。
使用Virtual Environments打包依赖环境
开发Python作业时,如果您遇到复杂的依赖环境,可以通过Python的Virtual Environments技术进行环境管理和隔离。AnalyticDB for MySQL Spark支持使用Virtual Environments将本地依赖的环境打包并上传到OSS中。关于Virtual Environments的更多信息,请参见Python官方社区文档。
AnalyticDB for MySQL Spark使用的glibc-devel版本为2.28,若Virtual Environments不兼容2.28版本,PySpark任务可能无法正常执行。
使用方法
使用Virtual Environments打包Python环境,需将压缩包上传至OSS中,并在提交Spark作业时配置相关参数,以指定Python环境压缩包所在的OSS路径和使用的Python解释器的本地路径。
指定Python环境压缩包所在的OSS路径:
若Python环境的压缩包较小,您可配置
archives
参数。若Python环境的压缩包较大,您可配置
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES
和spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES
参数。
指定使用的Python解释器的本地路径:
spark.pyspark.python
参数。
示例
准备Linux环境。
Virtual Environments需在Linux操作系统中打包Python环境,您可以通过以下三种方式准备Linux环境。本文以购买阿里云ECS实例为例。
购买操作系统为Centos 7或AnolisOS 8的阿里云ECS实例。具体操作,请参见自定义购买实例。
在本地安装Centos 7或者AnolisOS 8以上版本的操作系统。
使用Centos或AnolisOS的官方Docker镜像,在镜像内部打包Python环境。
使用Virtual Environments打包Python运行环境,并将压缩包上传至OSS中。
使用Virtualenv或Conda打包项目依赖的Python环境,打包时可自定义Python的版本。此处以Virtualenv打包为例。
# Create directory venv at current path with python3 # MUST ADD --copies ! virtualenv --copies --download --python python3.7 venv # active environment source venv/bin/activate # install third party modules pip install scikit-spark==0.4.0 # check the result pip list # compress the environment tar -czvf venv.tar.gz venv
说明如果您想通过Conda打包项目依赖,请参见Conda管理虚拟环境。
进入Spark开发编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表。在集群列表上方,选择产品系列,然后单击目标集群ID。
在左侧导航栏,单击作业开发>Spark Jar 开发。
在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。
在编辑器中执行以下作业内容。
{ "name": "venv example", "archives": [ "oss://testBucketname/venv.tar.gz#PY3" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
或
说明Python环境的压缩包过大时,可参考如下代码。
{ "name": "venv example", "conf": { "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3", "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
参数说明:
archives:Python环境压缩包所在的OSS路径。本文示例为
venv.tar.gz
压缩包所在的OSS路径。spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Executor节点参数,用于指定Python环境压缩包所在的OSS路径。
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Driver节点参数,用于指定Python环境压缩包所在的OSS路径。
spark.pyspark.python:指定要使用的Python解释器的本地路径。
其他参数,请参见参数说明。