PySpark可直接调用Python的API运行Spark作业,PySpark作业需在特定Python环境中运行。EMR默认支持使用Python,若EMR支持的Python版本无法运行PySpark作业,则您可参考本实践配置可用的Python环境并在DataWorks上运行PySpark作业。
前提条件
执行本实践所使用的DataWorks及E-MapReduce(简称EMR)需部署在相同地域。产品各自需执行的前提条件如下:
DataWorks侧
在DataWorks运行PySpark作业时,需创建EMR Spark节点,并使用
spark-submit
命令提交作业。EMR侧
需准备如下EMR环境:
准备EMR实例。本实践示例使用EMR on ECS实例。
本实践需使用一个Python包进行示例验证,您可在本地或ECS进行自主打包;也可直接下载本实践的示例包(Python3.7)。使用自主打包时,本地或ECS需安装Docker运行环境及Python运行环境。
说明本实践仅以Python3.7演示相关操作,实际使用中可选择所需Python版本。EMR支持的Python版本可能和您使用的Python版本存在差异,建议使用本实践的Python3.7版本。
操作步骤
准备运行Python程序需要的虚拟环境。
您可选择直接下载本实践的示例包python3.7使用(推荐);或通过如下步骤自主打包Python环境。
制作Docker镜像。
您可选择直接下载本实践的示例Dockerfile文件至本地或ECS;或在安装了Docker环境的宿主机上新建一个Dockerfile文件。Dockerfile文件的内容如下。
FROM centos:centos7.9.2009 RUN set -ex \ # 预安装所需组件。 && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\ && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \ && tar -zxvf Python-3.7.0.tgz \ && cd Python-3.7.0 \ && ./configure prefix=/usr/local/python3 \ && make \ && make install \ && make clean \ && rm -rf /Python-3.7.0* \ && yum install -y epel-release \ && yum install -y python-pip # 设置默认为python3。 RUN set -ex \ # 备份旧版本python。 && mv /usr/bin/python /usr/bin/python27 \ && mv /usr/bin/pip /usr/bin/pip-python27 \ # 配置默认为python3。 && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \ && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip # 修复因修改python版本导致yum失效问题。 RUN set -ex \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \ && yum install -y deltarpm # 更新pip版本。 RUN pip install --upgrade pip
构建镜像并运行容器。
在Dockerfile文件所在路径下,执行如下命令。
sudo docker build -t python-centos:3.7 . sudo docker run -itd --name python3.7 python-centos:3.7
进入安装容器所需的Python依赖库并打包Python环境。
sudo docker exec -it python3.7 bash pip install [所需依赖库] # vi requirements.txt # pip install -r requirements.txt # numpy # pandas cd /usr/local/ zip -r python3.7.zip python3/
拷贝容器中的Python环境到宿主机。
# 在宿主机运行命令将虚拟环境拷贝到宿主机。 sudo docker cp python3.7:/usr/local/python3.7.zip .
上传虚拟环境。
您可根据需要,选择上传Python虚拟环境至OSS或HDFS。
说明本实践以上传至HDFS示例。如果您选择上传至OSS,操作详情请参见上传文件。
上传Python环境至HDFS命令如下。
# 上传至HDFS中。 hdfs dfs -copyFromLocal python3.7.zip /tmp/pyspark
测试并上传Python代码。
您可在本地或ECS中创建一个
py
文件,按照下述方法测试Python代码是否正确。本实践示例使用pyspark_test.py
文件测试。# -*- coding: utf-8 -*- import os from pyspark.sql import SparkSession def noop(x): import socket import sys host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ) print('host: ' + host) print('PYTHONPATH: ' + os.environ['PYTHONPATH']) print('PWD: ' + os.environ['PWD']) print(os.listdir('.')) return host if __name__ == '__main__': spark = SparkSession \ .builder \ .appName("test_pyspark") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext # 验证系统当前环境变量。 rdd = sc.parallelize(range(10), 2) hosts = rdd.map(noop).distinct().collect() print(hosts) # 验证UDF。 # https://docs.databricks.com/spark/latest/spark-sql/udf-python.html# # spark.udf.register("udf_squared", udf_squared) # spark.udf.register("udf_numpy", udf_numpy) tableName = "store" df = spark.sql("""select count(*) from %s """ % tableName) print("rdf count, %s\n" % df.count()) df.show()
说明您需将示例表名
store
替换成数据仓库中实际存在的表名。上传Python代码至HDFS中。
参考如下命令,在EMR实例中上传Python代码至HDFS。
说明本实践以上传至HDFS示例。如果您选择上传至OSS,操作详情请参见上传文件。
hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
在DataWorks中通过
spark-submit
命令提交作业。在创建的EMR Spark节点中,使用如下命令提交作业。
说明如果您选择上传Python代码至OSS,则需替换为实际使用的OSS路径。
spark-submit --master yarn \ --deploy-mode cluster \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \ --conf spark.executorEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \ --archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \ ## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \ --driver-memory 4g \ --driver-cores 1 \ --executor-memory 4g \ --executor-cores 1 \ --num-executors 3 \ --name TestPySpark \ hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py