通过PySpark开发Spark应用
本文介绍了如何开发AnalyticDB for MySQL Spark Python作业,提供云端环境构建方案。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
集群与OSS存储空间位于相同地域。
已在企业版、基础版或湖仓版集群中创建Job型资源组。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。
PySpark的基本用法
-
编写如下示例程序,并将示例程序存储为
example.py。from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show() -
将
example.py程序上传到OSS中。具体操作,请参见上传文件。 -
进入Spark开发编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。
-
在左侧导航栏,单击 。
-
在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。
-
在编辑器中执行以下作业内容。
{ "name": "Spark Python Test", "file": "oss://testBucketName/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }参数说明请参见参数说明。
使用Python依赖
在运行 Python 作业(PySpark)时,通常需要安装 Pandas、Numpy 等第三方库。本文将提供两种无需本地打包的云端环境构建方案。
|
方案类型 |
适用场景 |
方案优缺点 |
|
实时安装 |
|
优点:配置简单,即配即用。 缺点:每次运行都要重新下载安装依赖。 |
|
云端构建 |
|
优点:一次打包,永久复用;运行时启动快,稳定性高。 缺点:需要额外运行一次打包任务。 |
前提条件
在配置云端环境之前,请确保满足以下条件:
-
集群状态:集群已初始化,且能够成功运行基础的 Spark Pi 示例。
-
版本要求:
-
Spark 版本:支持 3.5.1。
-
Python 版本:支持 3.9 或 3.11。
-
-
关键约束(Numpy 版本):
-
Apache Spark 必须运行在 numpy < 2.0.0 的环境下。
-
系统会强制固定安装 numpy==1.26.0。
-
操作注意:请确保您要安装的其他依赖包(如 Pandas、Scipy 等)能兼容 Numpy 1.26.0,否则任务会报错。
-
操作示例
实时安装
-
准备业务代码
编写 Python 脚本(例如 job.py),并将其上传到 OSS 路径,例如
oss://your-bucket/scripts/job.py。# 示例脚本,会打印出当前python环境中的所有依赖。 # print all modules in python environment import pkgutil if __name__ == "__main__": for module_info in pkgutil.iter_modules(): print(module_info.name) -
配置作业参数
## 示例作业 { "file": "oss://your-bucket/scripts/job.py", // 代码路径 "name": "Real-time Env Demo", "conf": { "spark.adb.version": "3.5", "spark.driver.resourceSpec": "medium", "spark.executor.instances": 1, "spark.executor.resourceSpec": "medium", // --- 核心配置开始 --- // 1. 设置 Python 版本 "spark.kubernetes.driverEnv.PYTHON_BIN": "python3.11", "spark.executorEnv.PYTHON_BIN": "python3.11", // 2. 设置需要安装的依赖包 (Driver 和 Executor 都要配) "spark.kubernetes.driverEnv.PYTHON_MODULES": "chinesecalendar>=1.10.0,pandas>=1.5.3,lunar_python", "spark.executorEnv.PYTHON_MODULES": "chinesecalendar>=1.10.0,pandas>=1.5.3,lunar_python" // --- 核心配置结束 --- } }重要Spark 分为 Driver(控制节点)和 Executor(执行节点)。为了保证环境一致, 必须同时为这两者配置相同的环境变量 。
参数
参数说明
是否必填
默认值
备注
spark.kubernetes.driverEnv.PYTHON_MODULES
需要安装的包名列表。
是
无
-
多个Python依赖使用逗号隔开。
-
python依赖的格式需要完全符合
PyPI社区要求的格式。 -
没有版本约束的Python依赖必须放置在最后。
示例:
chinesecalendar>=1.10.0,dynaconf>=3.2.10,pandas>=1.5.3,lunar_pythonspark.executorEnv.PYTHON_MODULES
spark.kubernetes.driverEnv.PYTHON_BIN
指定使用的Python版本。
否
python3.11
当前有两个可选项:
-
python3.11
-
python3.9
spark.executorEnv.PYTHON_BIN
spark.kubernetes.driverEnv.INDEX_URL
指定
PyPI仓库的地址。否
http://mirrors.cloud.aliyuncs.com/pypi/simple/
默认值为阿里云内部托管镜像的地址,可以使用阿里云内网访问。如果配置为公网才能访问的地址,如清华大学的PyPI镜像源,需配置公网访问能力,详情请参见Spark应用访问公网配置说明。
spark.executorEnv.INDEX_URL
spark.kubernetes.driverEnv.TRUSTED_HOST
指定
PyPI仓库的域名为可信任域名。否
mirrors.cloud.aliyuncs.com
当Python从PyPI仓库安装依赖包时,会对仓库的证书进行可信验证。如果指定的仓库证书没有被证书服务托管,可以通过此参数指定此仓库源是一个可信源。
重要在使用此参数时,请务必确认配置的PyPI源是可信的。PyPI源注入污染依赖包攻击是非常常见的攻击方式。
spark.executorEnv.TRUSTED_HOST
-
-
执行示例作业。可以查看日志信息中打印出来Python环境中,包含了声明的依赖及其级联依赖。
xxlimited_35 zlib numpy pandas _distutils_hack _virtualenv chinese_calendar dateutil lunar_python pip pkg_resources pytz setuptools six tzdata wheel >>>>>>>> stderr: 25/12/25 17:01:56 INFO ShutdownHookManager: Shutdown hook called
云端构建
此方案会启动一个特殊任务:将依赖包打成压缩文件上传到 OSS,供后续任务重复使用。
-
规划 OSS 路径
确定一个 OSS 路径用于存放打好的包,例如:
oss://your-bucket/envs/my_custom_env。 -
提交打包作业
重要-
请勿修改内置打包脚本路径
local:///opt/tools/build_venv.py。 -
args中填入需要安装的依赖。
## 示例作业 { // 1. 这里填写需要打包的所有依赖 "args": [ "chinesecalendar>=1.10.0", "pandas>=1.5.3", "pyarrow>=19.0.1", "lunar_python" ], // 2. 调用内置打包脚本 (不要修改) "file": "local:///opt/tools/build_venv.py", "name": "Build VirtualEnv Job", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 1, "spark.executor.resourceSpec": "medium", // 3. 指定 Python 版本 "spark.kubernetes.driverEnv.PYTHON_BIN": "python3.11", // 4. 指定打包结果上传到哪里(按实际路径修改) "spark.kubernetes.driverEnv.VENV_OSS_PATH": "oss://your-bucket/envs/my_custom_env", // 5. 指定构建时的临时目录 "spark.kubernetes.driverEnv.VENV_DIR": "/tmp/build_test" } }参数
参数说明
是否必填
默认值
备注
spark.kubernetes.driverEnv.VENV_OSS_PATH
环境包存储路径。
是
无
示例:
oss://your-bucket/envs/my_custom_env。spark.kubernetes.driverEnv.VENV_DIR
临时构建目录。
否
/tmp/venv
若环境包体积较大,建议挂载数据盘并修改此路径为:
/user_data_dir。spark.kubernetes.driverEnv.PYTHON_BIN
指定使用的Python版本。
否
python3.11
当前有两个可选项:
-
python3.11
-
python3.9
spark.kubernetes.driverEnv.INDEX_URL
指定
PyPI仓库的地址。否
http://mirrors.cloud.aliyuncs.com/pypi/simple/
默认值为阿里云内部托管镜像的地址,可以使用阿里云内网访问。如果配置为公网才能访问的地址,如清华大学的PyPI镜像源,需配置公网访问能力,详情请参见Spark应用访问公网配置说明。
spark.kubernetes.driverEnv.TRUSTED_HOST
指定
PyPI仓库的域名为可信任域名。否
mirrors.cloud.aliyuncs.com
当Python从PyPI仓库安装依赖包时,会对仓库的证书进行可信验证。如果指定的仓库证书没有被证书服务托管,可以通过此参数指定此仓库源是一个可信源。
重要在使用此参数时,请务必确认配置的PyPI源是可信的。PyPI源注入污染依赖包攻击是常见的攻击方式。
-
-
执行作业
可以查看日志信息中上传压缩包的信息,以及这个virtualenv中所有安装包的详细的版本说明。
------------------ -------------- chinesecalendar 1.11.0 lunar_python 1.4.8 numpy 1.26.0 pandas 2.3.3 pip 24.2 pyarrow 22.0.0 python-dateutil 2.9.0.post0 pytz 2025.2 setuptools 75.1.0 six 1.17.0 tzdata 2025.3 wheel 0.44.0 Uploading archive to oss://xxx/envs/my_custom_env/venv_20251225174458.tar.gz + ossutil cp /tmp/venv_20251225174458.tar.gz xxx xxx xxx xxx xxx xxx xxx xxx xxx xxx Succeed: Total num: 1, size: 115,655,687. OK num: 1(upload 1 files). 0.788886(s) elapsed Upload completed: oss://xxx/envs/my_custom_env/venv_20251225174458.tar.gz -
使用环境包
后续您在运行普通 PySpark 任务时,即可引用
oss://your-bucket/envs/my_custom_env中的压缩包。通过配置
pyFiles参数进行使用。## 使用示例 { "name": "Spark Python", "file": "oss://testBucketName/example.py", "pyFiles": ["oss://your-bucket/envs/my_custom_env/venv_*****.tar.gz"], "args": [ "oss://testBucketName/staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }
常见问题排查
-
报错 ModuleNotFoundError:
-
检查是否同时配置了
driverEnv和executorEnv。 -
检查包名拼写是否符合 PyPi 规范。
-
-
报错与 numpy 相关:
检查您的依赖包版本是否要求 numpy >= 2.0.0。如果是,请降级您的依赖包版本以适配 numpy 1.26.0。
-
下载超时:
如果使用默认内网源仍然超时,请检查是否配置了公网源但未开通公网访问权限。