全部产品
云市场
云游戏

Spark Python作业示例

更新时间:2020-08-04 10:18:47

当创建集群成功后, 可以提交Spark作业,本文展示一个Spark Python的示例。

注意:Spark Python在创建集群时,选择镜像版本为__spark_2_4_5_dla_0_0_2__。

当前在 华南一(深圳)对 Spark Python进行支持,其它区域将逐步上线并开放。

1. 准备测试数据

生成一个如下格式的CSV文件,命名为staff.csv,并上传到OSS存储中。

这张表反映了每个员工的信息和收入情况。

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200

2. 开发一个依赖方法

本示例的目标是计算这些人员的税后收入,创建一个文件func.py, 在其中写一个tax方法,并注册为Spark UDF, 以方便后续的操作。

示例代码如下:

def tax(salary):
    """
    convert string to int
    then cut 15% tax from the salary
    return a float number

    :param salary: The salary of staff worker
    :return:
    """
    return 0.15 * int(salary)

为了能够使用pyFiles 引入这个方法, 我们需要将这个方法打包在一个zip格式的压缩包中。

压缩包的目录结构如下图所示:

pyFiles压缩包内部结构示例

按照Python的规则, 这里我们创建了名叫tools的module, 在这个module下有func.tax这个方法。

将depend文件夹压缩为depend.zip后上传到OSS中。

3. 开发主程序

开发一个spark的python程序,将测试中的CSV从OSS中读取出来,注册为一个DataFrame。同时将依赖包中的tax方法注册为一个Spark UDF。然后使用该UDF对刚刚生成的DataFrame进行计算并打印结果。

示例代码如下, 需要注意的是请替换代码中的{your bucket} 为您的OSS Bucket名称

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

将代码写入到example.py中,并上传到OSS中。

4. 提交任务

创建一个新的集群,这里需要注意的是,Spark的镜像版本需要选择最新的__spark_2_4_5_dla_0_0_2__,此Spark镜像支持Spark Python的相关操作。

进入作业控制台,新建一个作业,并提交以下作业信息:

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/depend.zip"],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

请将{your bucket name}替换为您使用的OSS的Bucket名称, 这里我们使用Python 3来执行这个任务,和社区版的Spark相同,通过spark.kubernetes.pyspark.pythonVersion这个配置信息来控制使用的Python版本,如果不进行配置默认使用的版本为Python 2.7。

5. 示例中主要参数说明

参数说明是否必选
nameSpark 任务的名称。
fileSpark 任务所在的Python文件,需要注意的是这里必须是一个OSS的地址。
pyFilesSpark任务依赖的第三方module的压缩包,可以为多个,使用逗号隔开。
confSpark任务用到的配置参数,需要的配置项:"spark.dla.connectors": "oss"指定了这个任务需要有连接OSS的能力 "spark.kubernetes.pyspark.pythonVersion": "3"指定了这个任务需要使用Python 3来执行。