DataScience集群自带Spark大数据处理框架,但如果您希望加快特征工程处理的速度,可以将Spark任务提交到Hadoop集群。本文为您介绍如何在DataScience集群的Notebook中提交Spark任务到Hadoop集群。
前提条件
- 已创建DataScience集群和Hadoop集群,详情请参见创建集群。
- 下载dsdemo代码:请已创建DataScience集群的用户,使用钉钉搜索钉钉群号32497587加入钉钉群以获取dsdemo代码。
操作步骤
- 获取目录并打包。
- 通过SSH方式连接Hadoop集群,详情请参见登录集群。
- 执行以下命令,获取以下三个目录。
cp -rf -L $SPARK_CONF_DIR . cp -rf -L $HADOOP_CONF_DIR . cp -rf -L $SPARK_HOME .
- 执行以下命令,将获取的目录打包。
tar zcvf hadoop_envs.tar.gz spark-conf spark-current hadoop-conf
- 将hadoop_envs.tar.gz传送到DataScience集群的ml_on_ds目录,或者由root管理员进行统一管理。
- 上传dsdemo代码至DataScience集群的header节点。
- 使用root账号或者子账号通过SSH方式连接DataScience集群,详情请参见登录集群。
- 解压dsdemo。
- 连接Hadoop集群,上传hadoop_envs.tar.gz至DataScience集群的ml_on_ds目录。
scp hadoop_envs.tar.gz root@192.168.**.**:/home/ml_on_ds/notebook/testfile/
说明命令中的192.168.**.**
为DataScience集群的内网IP地址。 - 连接DataScience集群,进入testfile目录。
cd /home/ml_on_ds/notebook/testfile
- 执行以下命令,解压缩hadoop_envs.tar.gz。
tar zxvf hadoop_envs.tar.gz
- 创建镜像。
- 执行以下命令,进入ml_on_ds目录。
cd /home/ml_on_ds/
说明本文示例中的ml_on_ds目录,请根据您实际获取到的dsdemo代码修改。 - 执行以下命令,创建镜像。
sh allinone.sh -c bnt
- 执行以下命令,进入ml_on_ds目录。
- 修改ml_on_ds目录下的config文件。文件代码片段如下所示。
#实验名称 EXP=dsexp1 #K8s命名空间,如果是子账号,则表示用户名称。 KUBERNETES_NAMESPACE=dstest #For some users who are running pyspark & meachine learning jobs in jupyter notebook. #ports for mapping when notebook enabled, multi-users will conflict on same node. HOSTNETWORK=true MAPPING_JUPYTER_NOTEBOOK_PORT=8888 MAPPING_NNI_PORT=38080 MAPPING_TENSORBOARD_PORT=6006
因为要在Notebook中使用Spark,所以Notebook pod使用了localnetwork。当两个子用户的Notebook实例被调度到同一个node上时,Jupyter、NNI和TensorBoard会存在冲突,您可以按照每新增一个用户,port加1的原则区分。例如:- user1
MAPPING_JUPYTER_NOTEBOOK_PORT=8889 MAPPING_NNI_PORT=38081 MAPPING_TENSORBOARD_PORT=6007
- user2
MAPPING_JUPYTER_NOTEBOOK_PORT=8890 MAPPING_NNI_PORT=38082 MAPPING_TENSORBOARD_PORT=6008
- user1
- 创建Notebook实例。
#使用相应的账号root或者子账号, 登录DataScience集群。本示例使用root账号。 cd /home/ml_on_ds/ sh allinone.sh -c dnt # 删除已有的notebook实例。 sh allinone.sh -c cnt # 创建新的notebook实例。 sh allinone.sh -c snt # 查看notebook nni tensorboard等端口。
- 在Notebook中使用PySpark。
#手工把Hadoop集群的hosts写到DataScience环境里。 !echo "192.168.**.** emr-header-1.cluster-613**" >>/etc/hosts !echo "192.168.**.** emr-worker-2.cluster-613**" >>/etc/hosts !echo "192.168.**.** emr-worker-1.cluster-613**" >>/etc/hosts !cat /etc/hosts | tail %env HADOOP_CONF_DIR=/train/hadoop-conf %env SPARK_CONF_DIR=/train/spark-conf %env SPARK_HOME=/train/spark-current # -*- coding: utf-8 -* #!/usr/bin/env python3 import sys import os import findspark findspark.init() from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark import SparkConf import pandas as pd SparkSession.builder.enableHiveSupport() # 因为使用了localnetwork,所以spark.driver.host必须设置为Notebook的IP地址。 # 此IP是VPC内IP地址,可以通过!ifconfig获取。 spark = SparkSession\ .builder\ .appName("PySpark-Test")\ .master("yarn")\ .config("spark.driver.host", "192.168.**.**")\ .enableHiveSupport()\ .getOrCreate() spark.sql("drop database if exists testdb CASCADE") spark.sql("create database if not exists testdb location 'oss://test**-huhehaote.oss-cn-huhehaote-internal.aliyuncs.com/testdb'") spark.sql("use testdb") spark.sql("create table table1(userid string, itemid string, score float)") spark.sql("insert into table1 values('user1','item1',0.1)") spark.sql("select * from table1").show() spark.stop()