PySpark任务往往需要借助Python第三方库来增强数据处理和分析能力。本文通过示例详细介绍了如何通过运行环境、Conda环境隔离与PEX轻量化打包方式,有效地将这些库集成到Serverless Spark环境中,确保任务在分布式计算场景下的稳定性和灵活性。
背景信息
在交互式PySpark开发过程中,可以使用Python第三方库以提升数据处理与分析的灵活性及易用性。以下三种方式均能帮助您实现这一目标,建议根据实际情况选择最适合的方式。
方式 | 适用场景 |
在阿里云控制台配置包含所需库的标准化环境(如 | |
Conda是一个跨平台的包管理和环境管理系统,它允许用户轻松创建、保存、加载和切换多个环境,每个环境都可以拥有独立的Python版本和库依赖。 | |
PEX (Python EXecutable) 是一个工具,它可以将Python应用及其所有依赖打包进一个可执行文件中。 |
前提条件
已创建工作空间,详情请参见创建工作空间。
使用限制
已安装Python 3.8及以上版本。本文以Python 3.8为例介绍。
操作流程
方式一:通过运行环境使用Python第三方库
步骤一:创建运行环境
进入运行环境管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,选择左侧导航栏中的运行环境管理。
单击创建运行环境。
在创建运行环境页面,单击添加库。
更多参数信息,请参见管理运行环境。
在新建库中,使用PyPI来源类型,配置PyPI Package参数,然后单击确定。
在PyPI Package中填写库的名称及版本,不指定版本时,默认安装最新版本。
本文示例添加的库为
faker
和geopy
。单击创建。
创建后将开始初始化环境。
步骤二:上传资源文件至OSS
单击pyspark_third_party_libs_demo.py,下载所需资源文件。
本文示例展示了如何使用PySpark和第三方库生成模拟数据并进行地理分析。其中,
faker
库用于生成包含用户信息和随机地理位置的模拟数据,geopy
库用于计算每个用户位置与埃菲尔铁塔之间的地理距离,最终筛选出距离在10公里范围内的用户。您也可以创建示例脚本
pyspark_third_party_libs_demo.py
,内容如下所示。上传
pyspark_third_party_libs_demo.py
至OSS,上传操作可以参见简单上传。
步骤三:开发并运行任务
在EMR Serverless Spark页面,单击左侧的数据开发。
在开发目录页签下,单击
图标。
在新建对话框中,输入名称,类型选择
,单击确定。在右上角选择队列。
在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行。
参数
说明
主Python资源
选择OSS资源,填写您上传
pyspark_third_party_libs_demo.py
至OSS的路径。例如,oss://<yourBucketName>/pyspark_third_party_libs_demo.py。运行环境
在下拉框中选择您创建的运行环境。
运行任务后,在下方的运行记录区域,单击任务操作列的日志探查。
在日志探查页签,您可以查看相关的日志信息。
例如,在Driver日志的Stdout页签,可以查看到返回以下信息。
生成的样本数据: +--------------------+-------------------+------------------+------------------+ | user_id| name| latitude| longitude| +--------------------+-------------------+------------------+------------------+ |73d4565c-8cdf-4bc...| Garrett Robertson| 48.81845614776422|2.4087517234236064| |0fc364b1-6759-416...| Dawn Gonzalez| 48.68654896170054|2.4708555780468013| |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246| |1cabbdde-e703-4a8...| David Morris|48.656356532418116|2.2503952330408175| |8b7938a0-b283-401...| Shannon Perkins| 48.82915001905855| 2.410743969589327| +--------------------+-------------------+------------------+------------------+ only showing top 5 rows 找到 24 个在10公里范围内的用户: +-----------------+------------------+------------------+-----------+ | name| latitude| longitude|distance_km| +-----------------+------------------+------------------+-----------+ |Garrett Robertson| 48.81845614776422|2.4087517234236064| 9.490705| | Shannon Perkins| 48.82915001905855| 2.410743969589327| 9.131355| | Alex Harris| 48.82547383207313|2.3579336032430027| 5.923493| | Tammy Ramos| 48.84668267431606|2.3606455536493574| 5.026109| | Ivan Christian| 48.89224239228342|2.2811025348668195| 3.8897192| | Vernon Humphrey| 48.93142188723839| 2.306957802222233| 8.171813| | Shawn Rodriguez|48.919907710882654|2.2270993307836044| 8.439087| | Robert Fisher|48.794216103154646|2.3699024070507906| 9.033209| | Heather Collier|48.822957591865205|2.2993033803043454| 3.957171| | Dawn White|48.877816307255586|2.3743880390928878| 6.246059| +-----------------+------------------+------------------+-----------+ only showing top 10 rows
方式二:通过Conda管理Python环境
步骤一:Conda环境构建与部署
创建一台使用Alibaba Cloud Linux 3系统,且开启公网的ECS实例(需为x86架构),详情请参见自定义购买实例。
说明如果您在EMR on ECS页面已有的EMR集群中有空闲节点,也可以直接利用这些空闲节点(需确保节点为x86架构)。
通过以下命令安装Miniconda。
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh chmod +x Miniconda3-latest-Linux-x86_64.sh ./Miniconda3-latest-Linux-x86_64.sh -b source miniconda3/bin/activate
构建使用Python 3.8和numpy的Conda环境。
conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8 conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz
步骤二:上传资源文件至OSS
单击kmeans.py和kmeans_data.txt,下载所需资源文件。
您也可以创建示例脚本
kmeans.py
和数据文件kmeans_data.txt
,内容如下所示。上传
pyspark_conda_env.tar.gz
、kmeans.py
和kmeans_data.txt
至OSS,上传操作可以参见简单上传。
步骤三:开发并运行任务
在EMR Serverless Spark页面,单击左侧的数据开发。
在开发目录页签下,单击
图标。
在新建对话框中,输入名称,类型选择
,单击确定。在右上角选择队列。
在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行。
参数
说明
主Python资源
选择OSS资源,填写您上传
kmeans.py
至OSS的路径。例如,oss://<yourBucketName>/kmeans.py。运行参数
填写数据文件
kmeans_data.txt
上传到OSS的路径。填写格式为
oss://<yourBucketName>/kmeans_data.txt 2
。archives资源
选择OSS资源,填写您上传
pyspark_conda_env.tar.gz
至OSS的路径。填写格式为:
oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv
。Spark配置
spark.pyspark.driver.python ./condaenv/bin/python spark.pyspark.python ./condaenv/bin/python
运行任务后,在下方的运行记录区域,单击任务操作列的日志探查。
在日志探查页签,您可以查看相关的日志信息。
例如,在Driver日志的Stdout页签,可以查看到返回以下信息。
Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])] Total Cost: 0.11999999999999958
方式三:通过PEX打包Python依赖
步骤一:PEX文件打包与执行
创建一台使用Alibaba Cloud Linux 3系统,且开启公网的ECS实例(需为x86架构),详情请参见自定义购买实例。
说明如果您在EMR on ECS页面已有的EMR集群中有空闲节点,也可以直接利用这些空闲节点(需确保节点为x86架构)。
安装PEX与wheel工具。
pip3.8 install --user pex wheel \ --trusted-host mirrors.cloud.aliyuncs.com \ -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
下载所需库的wheel文件至临时目录。
pip3.8 wheel -w /tmp/wheel \ pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \ --trusted-host mirrors.cloud.aliyuncs.com \ -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
生成PEX文件。
pex -f /tmp/wheel --no-index \ pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \ -o spark331_pandas153.pex
步骤二:上传PEX文件至OSS
单击kmeans.py和kmeans_data.txt,下载所需资源文件。
上传
spark331_pandas153.pex
、kmeans.py
和kmeans_data.txt
至OSS,上传操作可以参见简单上传。说明本文示例使用的Spark版本是3.3.1,同时包含pandas、pyarrow以及numpy等第三方Python库。您也可以根据选择的引擎版本打包其他版本的PySpark环境。有关引擎版本的详细信息,请参见引擎版本介绍。
步骤三:开发并运行任务
在EMR Serverless Spark页面,单击左侧的数据开发。
在开发目录页签下,单击
图标。
在新建对话框中,输入名称,类型选择
,单击确定。在右上角选择队列。
在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行。
参数
说明
主Python资源
选择OSS资源,填写您上传
kmeans.py
至OSS的路径。例如,oss://<yourBucketName>/kmeans.py。运行参数
填写数据文件
kmeans_data.txt
上传到OSS的路径。填写格式为
oss://<yourBucketName>/kmeans_data.txt 2
。files资源
选择OSS资源,填写您上传
spark331_pandas153.pex
至OSS的路径。例如,oss://<yourBucketName>/spark331_pandas153.pex
。Spark配置
spark.pyspark.driver.python ./spark331_pandas153.pex spark.pyspark.python ./spark331_pandas153.pex
运行任务后,在下方的运行记录区域,单击任务操作列的日志探查。
在日志探查页签,您可以查看相关的日志信息。
例如,在Driver日志的Stdout页签,可以查看到返回以下信息。
Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])] Total Cost: 0.11999999999999958
相关文档
本文以PySpark开发为例,如果您想通过其他方式进行开发,可以参见批任务或流任务开发。