DataScience集群自带Spark大数据处理框架,但如果您希望加快特征工程处理的速度,可以将Spark任务提交到Hadoop集群。本文为您介绍如何在DataScience集群的Notebook中提交Spark任务到Hadoop集群。

前提条件

  • 已创建DataScience集群和Hadoop集群,详情请参见创建集群
  • 下载dsdemo代码:请已创建DataScience集群的用户,使用钉钉搜索钉钉群号32497587加入钉钉群以获取dsdemo代码。

操作步骤

  1. 获取目录并打包。
    1. 通过SSH方式连接Hadoop集群,详情请参见登录集群
    2. 执行以下命令,获取以下三个目录。
      cp -rf -L $SPARK_CONF_DIR .
      cp -rf -L $HADOOP_CONF_DIR .
      cp -rf -L $SPARK_HOME .
    3. 执行以下命令,将获取的目录打包。
      tar zcvf hadoop_envs.tar.gz spark-conf spark-current hadoop-conf
  2. hadoop_envs.tar.gz传送到DataScience集群的ml_on_ds目录,或者由root管理员进行统一管理。
    1. 上传dsdemo代码至DataScience集群的header节点。
    2. 使用root账号或者子账号通过SSH方式连接DataScience集群,详情请参见登录集群
    3. 解压dsdemo。
    4. 连接Hadoop集群,上传hadoop_envs.tar.gz至DataScience集群的ml_on_ds目录。
      scp hadoop_envs.tar.gz root@192.168.**.**:/home/ml_on_ds/notebook/testfile/
      说明 命令中的192.168.**.**为DataScience集群的内网IP地址。
    5. 连接DataScience集群,进入testfile目录。
      cd /home/ml_on_ds/notebook/testfile
    6. 执行以下命令,解压缩hadoop_envs.tar.gz
      tar zxvf hadoop_envs.tar.gz
  3. 创建镜像。
    1. 执行以下命令,进入ml_on_ds目录。
      cd /home/ml_on_ds/
      说明 本文示例中的ml_on_ds目录,请根据您实际获取到的dsdemo代码修改。
    2. 执行以下命令,创建镜像。
      sh allinone.sh -c bnt
  4. 修改ml_on_ds目录下的config文件。
    文件代码片段如下所示。
    #实验名称
    EXP=dsexp1
    
    #K8s命名空间,如果是子账号,则表示用户名称。
    KUBERNETES_NAMESPACE=dstest
    
    #For some users who are running pyspark & meachine learning jobs in jupyter notebook.
    #ports for mapping when notebook enabled, multi-users will conflict on same node.
    HOSTNETWORK=true
    MAPPING_JUPYTER_NOTEBOOK_PORT=8888
    MAPPING_NNI_PORT=38080
    MAPPING_TENSORBOARD_PORT=6006
    因为要在Notebook中使用Spark,所以Notebook pod使用了localnetwork。当两个子用户的Notebook实例被调度到同一个node上时,Jupyter、NNI和TensorBoard会存在冲突,您可以按照每新增一个用户,port加1的原则区分。例如:
    • user1
      MAPPING_JUPYTER_NOTEBOOK_PORT=8889
      MAPPING_NNI_PORT=38081
      MAPPING_TENSORBOARD_PORT=6007
    • user2
      MAPPING_JUPYTER_NOTEBOOK_PORT=8890
      MAPPING_NNI_PORT=38082
      MAPPING_TENSORBOARD_PORT=6008
  5. 创建Notebook实例。
    #使用相应的账号root或者子账号, 登录DataScience集群。本示例使用root账号。
    cd /home/ml_on_ds/
    sh allinone.sh -c dnt # 删除已有的notebook实例。
    sh allinone.sh -c cnt # 创建新的notebook实例。
    sh allinone.sh -c snt # 查看notebook nni tensorboard等端口。
  6. 在Notebook中使用PySpark。
    #手工把Hadoop集群的hosts写到DataScience环境里。
    !echo "192.168.**.** emr-header-1.cluster-613**" >>/etc/hosts
    !echo "192.168.**.** emr-worker-2.cluster-613**" >>/etc/hosts
    !echo "192.168.**.** emr-worker-1.cluster-613**" >>/etc/hosts
    !cat /etc/hosts | tail
    
    %env HADOOP_CONF_DIR=/train/hadoop-conf
    %env SPARK_CONF_DIR=/train/spark-conf
    %env SPARK_HOME=/train/spark-current
    
    # -*- coding: utf-8 -*
    #!/usr/bin/env python3
    
    import sys
    import os
    import findspark
    findspark.init()
    
    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType
    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    from pyspark import SparkConf
    import pandas as pd
    
    SparkSession.builder.enableHiveSupport()
    
    # 因为使用了localnetwork,所以spark.driver.host必须设置为Notebook的IP地址。
    # 此IP是VPC内IP地址,可以通过!ifconfig获取。
    
    spark = SparkSession\
            .builder\
            .appName("PySpark-Test")\
            .master("yarn")\
            .config("spark.driver.host", "192.168.**.**")\
            .enableHiveSupport()\
            .getOrCreate()
    
    spark.sql("drop database if exists testdb CASCADE")
    spark.sql("create database if not exists testdb location 'oss://test**-huhehaote.oss-cn-huhehaote-internal.aliyuncs.com/testdb'")
    spark.sql("use testdb")
    spark.sql("create table table1(userid string, itemid string, score float)")
    spark.sql("insert into table1 values('user1','item1',0.1)")
    spark.sql("select * from table1").show()
    spark.stop()