全部产品

PySpark

更新时间:2020-11-20 16:17:48

本文展示如何提交PySpark作业。

1. 准备测试数据

生成一个如下格式的CSV文件,命名为staff.csv,并上传到OSS存储中。

这张表反映了每个员工的信息和收入情况。

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200

2. 开发一个依赖方法

本示例的目标是计算这些人员的税后收入,创建一个文件func.py, 在其中写一个tax方法,并注册为Spark UDF, 以方便后续的操作。

示例代码如下:

def tax(salary):
    """
    convert string to int
    then cut 15% tax from the salary
    return a float number

    :param salary: The salary of staff worker
    :return:
    """
    return 0.15 * int(salary)

为了能够使用pyFiles 引入这个方法, 我们需要将这个方法打包在一个zip格式的压缩包中。

压缩包的目录结构如下图所示:

压缩文件夹

按照Python的规则, 这里我们创建了名叫tools的module, 在这个module下有func.tax这个方法。

将tools文件夹压缩为tools.zip后上传到OSS中。

注意

不同操作系统平台的ZIP压缩工具会略有区别,请保证解压后可以看到顶层目录是tools文件夹。

3. 开发主程序

开发一个spark的python程序,将测试中的CSV从OSS中读取出来,注册为一个DataFrame。同时将依赖包中的tax方法注册为一个Spark UDF。然后使用该UDF对刚刚生成的DataFrame进行计算并打印结果。

示例代码如下, 需要注意的是请替换代码中的{your bucket} 为您的OSS Bucket名称

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

将代码写入到example.py中,并上传到OSS中。

4. 提交任务

进入作业控制台,新建一个作业,并提交以下作业信息:

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/tools.zip"],
    "conf": {
        "spark.dla.roleArn": "acs:ram::11111111111:role/sparkramroletest",
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

请将{your bucket name}替换为您使用的OSS的Bucket名称, 这里我们使用Python 3来执行这个任务,和社区版的Spark相同,通过spark.kubernetes.pyspark.pythonVersion这个配置信息来控制使用的Python版本,如果不进行配置默认使用的版本为Python 2.7。

说明

spark.dla.roleArn

5. 示例中主要参数说明

参数

说明

是否必选

name

Spark 任务的名称。

file

Spark 任务所在的Python文件,需要注意的是这里必须是一个OSS的地址。

pyFiles

Spark任务依赖的第三方module的压缩包,可以为多个,使用逗号隔开。

conf

Spark任务用到的配置参数,需要的配置项:

"spark.dla.connectors": "oss"

指定了这个任务需要有连接OSS的能力

"spark.kubernetes.pyspark.pythonVersion": "3"

指定了这个任务需要使用Python 3来执行。