在PySpark程序中使用Python第三方库

PySpark任务往往需要借助Python第三方库来增强数据处理和分析能力。本文通过示例详细介绍了如何通过运行环境、Conda环境隔离与PEX轻量化打包方式,有效地将这些库集成到Serverless Spark环境中,确保任务在分布式计算场景下的稳定性和灵活性。

背景信息

在交互式PySpark开发过程中,可以使用Python第三方库以提升数据处理与分析的灵活性及易用性。以下三种方式均能帮助您实现这一目标,建议根据实际情况选择最适合的方式。

方式

适用场景

方式一:通过运行环境使用Python第三方库

在阿里云控制台配置包含所需库的标准化环境(如 numpypandas),系统会自动构建环境,新增任务时使用创建的运行环境即可。

方式二:通过Conda管理Python环境

Conda是一个跨平台的包管理和环境管理系统,它允许用户轻松创建、保存、加载和切换多个环境,每个环境都可以拥有独立的Python版本和库依赖。

方式三:通过PEX打包Python依赖

PEX (Python EXecutable) 是一个工具,它可以将Python应用及其所有依赖打包进一个可执行文件中。

前提条件

已创建工作空间,详情请参见创建工作空间

使用限制

已安装Python 3.8及以上版本。本文以Python 3.8为例介绍。

操作流程

方式一:通过运行环境使用Python第三方库

步骤一:创建运行环境

  1. 进入运行环境管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,选择左侧导航栏中的运行环境管理

  2. 单击创建运行环境

  3. 创建运行环境页面,单击添加库

    更多参数信息,请参见管理运行环境

  4. 新建库中,使用PyPI来源类型,配置PyPI Package参数,然后单击确定

    PyPI Package中填写库的名称及版本,不指定版本时,默认安装最新版本。

    本文示例添加的库为fakergeopy

  5. 单击创建

    创建后将开始初始化环境。

步骤二:上传资源文件至OSS

  1. 单击pyspark_third_party_libs_demo.py,下载所需资源文件。

    本文示例展示了如何使用PySpark和第三方库生成模拟数据并进行地理分析。其中,faker库用于生成包含用户信息和随机地理位置的模拟数据,geopy库用于计算每个用户位置与埃菲尔铁塔之间的地理距离,最终筛选出距离在10公里范围内的用户。

    您也可以创建示例脚本pyspark_third_party_libs_demo.py,内容如下所示。

    pyspark_third_party_libs_demo.py

    from pyspark.sql import SparkSession   
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    from faker import Faker
    import random
    from geopy.distance import geodesic
    
    spark = SparkSession.builder \
            .appName("PySparkThirdPartyLibsDemo") \
            .getOrCreate()
    
    # 使用第三方库faker生成模拟数据
    fake = Faker()
    landmark = (48.8584, 2.2945)  # 埃菲尔铁塔坐标
    
    # 创建模拟数据函数
    def generate_fake_data(num_records):
        data = []
        for _ in range(num_records):
            # 在巴黎附近生成随机坐标
            lat = 48.85 + random.uniform(-0.2, 0.2)
            lon = 2.30 + random.uniform(-0.2, 0.2)
            data.append((
                fake.uuid4(),        # 用户ID
                fake.name(),         # 姓名
                lat,                 # 纬度
                lon                  # 经度
            ))
        return data
    
    # 生成100条模拟记录
    fake_data = generate_fake_data(100)
    
    # 创建Spark DataFrame
    columns = ["user_id", "name", "latitude", "longitude"]
    df = spark.createDataFrame(fake_data, schema=columns)
    
    # 打印生成的样本数据
    print("生成的样本数据:")
    df.show(5)
    
    # 使用第三方库geopy计算距离
    def calculate_distance(lat, lon, landmark=landmark):
        """计算两点之间的地理距离(公里)"""
        user_location = (lat, lon)
        return geodesic(user_location, landmark).kilometers
    
    # 注册UDF(用户定义函数)
    distance_udf = udf(calculate_distance, FloatType())
    
    # 添加距离列
    df_with_distance = df.withColumn(
        "distance_km", 
        distance_udf("latitude", "longitude")
    )
    
    # 找出10公里范围内的用户
    nearby_users = df_with_distance.filter("distance_km <= 10")
    
    # 打印结果
    print(f"\n找到 {nearby_users.count()} 个在10公里范围内的用户:")
    nearby_users.select("name", "latitude", "longitude", "distance_km").show(10)
    
  2. 上传pyspark_third_party_libs_demo.pyOSS,上传操作可以参见简单上传

步骤三:开发并运行任务

  1. EMR Serverless Spark页面,单击左侧的数据开发

  2. 开发目录页签下,单击image图标。

  3. 新建对话框中,输入名称,类型选择批任务 > PySpark,单击确定

  4. 在右上角选择队列。

  5. 在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行

    参数

    说明

    Python资源

    选择OSS资源,填写您上传pyspark_third_party_libs_demo.pyOSS的路径。例如,oss://<yourBucketName>/pyspark_third_party_libs_demo.py。

    运行环境

    在下拉框中选择您创建的运行环境。

  6. 运行任务后,在下方的运行记录区域,单击任务操作列的日志探查

  7. 日志探查页签,您可以查看相关的日志信息。

    例如,在Driver日志Stdout页签,可以查看到返回以下信息。

    生成的样本数据:
    +--------------------+-------------------+------------------+------------------+
    |             user_id|               name|          latitude|         longitude|
    +--------------------+-------------------+------------------+------------------+
    |73d4565c-8cdf-4bc...|  Garrett Robertson| 48.81845614776422|2.4087517234236064|
    |0fc364b1-6759-416...|      Dawn Gonzalez| 48.68654896170054|2.4708555780468013|
    |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246|
    |1cabbdde-e703-4a8...|       David Morris|48.656356532418116|2.2503952330408175|
    |8b7938a0-b283-401...|    Shannon Perkins| 48.82915001905855| 2.410743969589327|
    +--------------------+-------------------+------------------+------------------+
    only showing top 5 rows
    
    
    找到 24 个在10公里范围内的用户:
    +-----------------+------------------+------------------+-----------+
    |             name|          latitude|         longitude|distance_km|
    +-----------------+------------------+------------------+-----------+
    |Garrett Robertson| 48.81845614776422|2.4087517234236064|   9.490705|
    |  Shannon Perkins| 48.82915001905855| 2.410743969589327|   9.131355|
    |      Alex Harris| 48.82547383207313|2.3579336032430027|   5.923493|
    |      Tammy Ramos| 48.84668267431606|2.3606455536493574|   5.026109|
    |   Ivan Christian| 48.89224239228342|2.2811025348668195|  3.8897192|
    |  Vernon Humphrey| 48.93142188723839| 2.306957802222233|   8.171813|
    |  Shawn Rodriguez|48.919907710882654|2.2270993307836044|   8.439087|
    |    Robert Fisher|48.794216103154646|2.3699024070507906|   9.033209|
    |  Heather Collier|48.822957591865205|2.2993033803043454|   3.957171|
    |       Dawn White|48.877816307255586|2.3743880390928878|   6.246059|
    +-----------------+------------------+------------------+-----------+
    only showing top 10 rows

方式二:通过Conda管理Python环境

步骤一:Conda环境构建与部署

  1. 创建一台使用Alibaba Cloud Linux 3系统,且开启公网的ECS实例(需为x86架构),详情请参见自定义购买实例

    说明

    如果您在EMR on ECS页面已有的EMR集群中有空闲节点,也可以直接利用这些空闲节点(需确保节点为x86架构)。

  2. 通过以下命令安装Miniconda。

    wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
    chmod +x Miniconda3-latest-Linux-x86_64.sh
    
    ./Miniconda3-latest-Linux-x86_64.sh -b
    source miniconda3/bin/activate
  3. 构建使用Python 3.8numpyConda环境。

    conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz

步骤二:上传资源文件至OSS

  1. 单击kmeans.pykmeans_data.txt,下载所需资源文件。

    您也可以创建示例脚本kmeans.py和数据文件kmeans_data.txt,内容如下所示。

    kmeans.py

    """
    A K-means clustering program using MLlib.
    
    This example requires NumPy (http://www.numpy.org/).
    """
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    
    
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kmeans <file> <k>", file=sys.stderr)
            sys.exit(-1)
        sc = SparkContext(appName="KMeans")
        lines = sc.textFile(sys.argv[1])
        data = lines.map(parseVector)
        k = int(sys.argv[2])
        model = KMeans.train(data, k)
        print("Final centers: " + str(model.clusterCenters))
        print("Total Cost: " + str(model.computeCost(data)))
        sc.stop()

    kmeans_data.txt

    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2
  2. 上传pyspark_conda_env.tar.gzkmeans.pykmeans_data.txtOSS,上传操作可以参见简单上传

步骤三:开发并运行任务

  1. EMR Serverless Spark页面,单击左侧的数据开发

  2. 开发目录页签下,单击image图标。

  3. 新建对话框中,输入名称,类型选择批任务 > PySpark,单击确定

  4. 在右上角选择队列。

  5. 在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行

    参数

    说明

    Python资源

    选择OSS资源,填写您上传kmeans.pyOSS的路径。例如,oss://<yourBucketName>/kmeans.py。

    运行参数

    填写数据文件kmeans_data.txt上传到OSS的路径。

    填写格式为oss://<yourBucketName>/kmeans_data.txt 2

    archives资源

    选择OSS资源,填写您上传pyspark_conda_env.tar.gzOSS的路径。

    填写格式为:oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv

    Spark配置

    spark.pyspark.driver.python  ./condaenv/bin/python
    spark.pyspark.python         ./condaenv/bin/python
  6. 运行任务后,在下方的运行记录区域,单击任务操作列的日志探查

  7. 日志探查页签,您可以查看相关的日志信息。

    例如,在Driver日志Stdout页签,可以查看到返回以下信息。

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

方式三:通过PEX打包Python依赖

步骤一:PEX文件打包与执行

  1. 创建一台使用Alibaba Cloud Linux 3系统,且开启公网的ECS实例(需为x86架构),详情请参见自定义购买实例

    说明

    如果您在EMR on ECS页面已有的EMR集群中有空闲节点,也可以直接利用这些空闲节点(需确保节点为x86架构)。

  2. 安装PEXwheel工具。

    pip3.8 install --user pex wheel \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  3. 下载所需库的wheel文件至临时目录。

    pip3.8 wheel -w /tmp/wheel \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  4. 生成PEX文件。

    pex -f /tmp/wheel --no-index \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      -o spark331_pandas153.pex

步骤二:上传PEX文件至OSS

  1. 单击kmeans.pykmeans_data.txt,下载所需资源文件。

  2. 上传spark331_pandas153.pexkmeans.pykmeans_data.txtOSS,上传操作可以参见简单上传

    说明

    本文示例使用的Spark版本是3.3.1,同时包含pandas、pyarrow以及numpy等第三方Python库。您也可以根据选择的引擎版本打包其他版本的PySpark环境。有关引擎版本的详细信息,请参见引擎版本介绍

步骤三:开发并运行任务

  1. EMR Serverless Spark页面,单击左侧的数据开发

  2. 开发目录页签下,单击image图标。

  3. 新建对话框中,输入名称,类型选择批任务 > PySpark,单击确定

  4. 在右上角选择队列。

  5. 在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行

    参数

    说明

    Python资源

    选择OSS资源,填写您上传kmeans.pyOSS的路径。例如,oss://<yourBucketName>/kmeans.py。

    运行参数

    填写数据文件kmeans_data.txt上传到OSS的路径。

    填写格式为oss://<yourBucketName>/kmeans_data.txt 2

    files资源

    选择OSS资源,填写您上传spark331_pandas153.pexOSS的路径。例如,oss://<yourBucketName>/spark331_pandas153.pex

    Spark配置

    spark.pyspark.driver.python            ./spark331_pandas153.pex
    spark.pyspark.python                   ./spark331_pandas153.pex
  6. 运行任务后,在下方的运行记录区域,单击任务操作列的日志探查

  7. 日志探查页签,您可以查看相关的日志信息。

    例如,在Driver日志Stdout页签,可以查看到返回以下信息。

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

相关文档

本文以PySpark开发为例,如果您想通过其他方式进行开发,可以参见批任务或流任务开发