在PySpark程序中使用Python第三方库

PySpark作业往往需要借助Python第三方库来增强数据处理和分析能力。本文档详细介绍了如何利用Conda和PEX这两种方法,有效地将这些库集成到Serverless Spark环境中,确保作业在分布式计算场景下的稳定性和灵活性。

背景信息

Conda是一个跨平台的包管理和环境管理系统,它允许用户轻松创建、保存、加载和切换多个环境,每个环境都可以拥有独立的Python版本和库依赖。PEX (Python EXecutable) 是一个工具,它可以将Python应用及其所有依赖打包进一个可执行文件中。

前提条件

  • 已创建一台使用Alibaba Cloud Linux 3系统,且开启公网的ECS实例,详情请参见自定义购买实例

    说明

    如果您在EMR on ECS页面已有的EMR集群中有空闲节点,也可以直接利用这些空闲节点。

  • 已创建工作空间,详情请参见创建工作空间

使用限制

已安装Python 3.8及以上版本。本文以Python 3.8为例介绍。

使用Conda

步骤一:Conda环境构建与部署

  1. 通过以下命令安装Miniconda。

    wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
    chmod +x Miniconda3-latest-Linux-x86_64.sh
    
    ./Miniconda3-latest-Linux-x86_64.sh -b
    source miniconda3/bin/activate
  2. 构建使用Python 3.8和numpy的Conda环境。

    conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz

步骤二:上传资源文件至OSS

  1. 单击kmeans.pykmeans_data.txt,下载所需资源文件。

    您也可以创建示例脚本kmeans.py和数据文件kmeans_data.txt,内容如下所示。

    kmeans.py

    """
    A K-means clustering program using MLlib.
    
    This example requires NumPy (http://www.numpy.org/).
    """
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    
    
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kmeans <file> <k>", file=sys.stderr)
            sys.exit(-1)
        sc = SparkContext(appName="KMeans")
        lines = sc.textFile(sys.argv[1])
        data = lines.map(parseVector)
        k = int(sys.argv[2])
        model = KMeans.train(data, k)
        print("Final centers: " + str(model.clusterCenters))
        print("Total Cost: " + str(model.computeCost(data)))
        sc.stop()

    kmeans_data.txt

    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2
  2. 上传pyspark_conda_env.tar.gzkmeans.pykmeans_data.txt至OSS,上传操作可以参见简单上传

步骤三:开发并运行任务

  1. 在EMR Serverless Spark页面,单击左侧的数据开发

  2. 单击新建

  3. 输入名称,类型选择Application(批任务) > PySpark,单击确定

  4. 在右上角选择队列。

  5. 在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行

    参数

    说明

    主Python资源

    选择OSS资源,填写您上传kmeans.py至OSS的路径。例如,oss://<yourBucketName>/kmeans.py。

    运行参数

    填写数据文件kmeans_data.txt上传到OSS的路径。

    填写格式为oss://<yourBucketName>/kmeans_data.txt 2

    archives资源

    选择OSS资源,填写您上传pyspark_conda_env.tar.gz至OSS的路径。

    填写格式为:oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv

    Spark配置

    spark.pyspark.driver.python  ./condaenv/bin/python
    spark.pyspark.python         ./condaenv/bin/python
  6. 运行任务后,在下方的运行记录区域,单击任务操作列的详情

  7. 任务历史中的开发任务页面,您可以查看相关的日志信息。

    image

使用PEX

步骤一:PEX文件打包与执行

  1. 安装PEX与wheel工具。

    pip3.8 install --user pex wheel \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  2. 下载所需库的wheel文件至临时目录。

    pip3.8 wheel -w /tmp/wheel \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  3. 生成PEX文件。

    pex -f /tmp/wheel --no-index \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      -o spark331_pandas153.pex

步骤二:上传PEX文件至OSS

  1. 单击kmeans.pykmeans_data.txt,下载所需资源文件。

  2. 上传spark331_pandas153.pexkmeans.pykmeans_data.txt至OSS,上传操作可以参见简单上传

    说明

    本文示例使用的Spark版本是3.3.1,同时包含pandas、pyarrow以及numpy等第三方Python库。您也可以根据选择的引擎版本打包其他版本的PySpark环境。有关引擎版本的详细信息,请参见引擎版本介绍

步骤三:开发并运行任务

  1. 在EMR Serverless Spark页面,单击左侧的数据开发

  2. 单击新建

  3. 输入名称,类型选择Application(批任务) > PySpark,单击确定

  4. 在右上角选择队列。

  5. 在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行

    参数

    说明

    主Python资源

    选择OSS资源,填写您上传kmeans.py至OSS的路径。例如,oss://<yourBucketName>/kmeans.py。

    运行参数

    填写数据文件kmeans_data.txt上传到OSS的路径。

    填写格式为oss://<yourBucketName>/kmeans_data.txt 2

    files资源

    选择OSS资源,填写您上传spark331_pandas153.pex至OSS的路径。例如,oss://<yourBucketName>/spark331_pandas153.pex

    Spark配置

    spark.pyspark.driver.python            ./spark331_pandas153.pex
    spark.pyspark.python                   ./spark331_pandas153.pex
  6. 运行任务后,在下方的运行记录区域,单击任务操作列的详情

  7. 任务历史中的开发任务页面,您可以查看相关的日志信息。

    image

相关文档

本文以PySpark开发为例,如果您想通过其他方式进行开发,可以参见Application开发