本文将介绍如何使用数据管理DMS任务编排调度Spark MLLib任务。

前提条件

  • 您的账号为主账号。
  • 您已开通DMS服务
  • 您已开通数据湖分析DLA(Data Lake Analytics)服务,详情请参见开通数据湖分析服务
  • 您已开通对象存储OSS(Object Storage Service)服务,详情请参见开通OSS服务

背景信息

近年来,随着大数据的兴起与算力的提升,机器学习和深度学习得到了广泛的应用,如千人千面的推荐系统、人脸支付、自动驾驶汽车等等。MLlib是Spark的机器学习库,包括分类、回归、聚类、协同过滤、降维等算法,本文介绍的是Kmeans聚类算法。您可以通过DMS任务编排中推出的Serverless Spark快速训练机器学习模型。

创建Spark虚拟集群

  1. 登录Data Lake Analytics管理控制台
  2. 创建虚拟集群,详情请参见创建虚拟集群
  3. 授予DLA删除OSS文件的权限,详情请参见写入数据

上传数据和代码

  1. 登录OSS管理控制台
  2. 本示例将准备如下数据,并保存至data.txt文件。
    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2
  3. 准备如下spark MLLib代码,并将该代码打包成FatJar文件。
    说明 示例代码功能:读取data.txt文件中的数据,训练Kmeans模型。
    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.mllib.clustering.KMeans
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.sql.SparkSession
    
    object SparkMLlib {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Spark MLlib Kmeans Demo")
        val spark = SparkSession
          .builder()
          .config(conf)
          .getOrCreate()
        val rawDataPath = args(0)
    
        val data = spark.sparkContext.textFile(rawDataPath)
        val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
        val numClusters = 2
        val numIterations = 20
        val model = KMeans.train(parsedData, numClusters, numIterations)
        for (c <- model.clusterCenters) {
          println(s"cluster center: ${c.toString}")
        }
        val modelOutputPath = args(1)
        model.save(spark.sparkContext, modelOutputPath)
      }
    }
  4. 将以上步骤的data.txt文件和FatJar文件上传至OSS中,操作详情可参见上传文件

使用DMS任务编排调度Spark任务

  1. 登录DMS控制台
  2. 在顶部菜单栏中,单击数据工厂 > 任务编排
  3. 任务编排页面的自由编排任务区域,单击新建任务流
  4. 新建任务流对话框,将任务流名称设置为Just_Spark,将描述设置为Just_Spark demo.,单击确认
    just_spark
  5. 任务编排页面中,将左侧任务类型中的Serverless Spark拖拽到页面中的空白区域。
    Serverless Spark按钮
  6. 单击Serverless Spark节点,在右侧面板中选中内容设置页签,并配置以下信息。
    1. 地域列表中,选择目标Spark集群所在的地域。
    2. Spark 集群列表中,选择目标Spark集群。
    3. 作业配置文本框中,输入以下代码。
      {
          "name": "spark-mllib-test",
          "file": "oss://oss-bucket-name/kmeans_demo/spark-mllib-1.0.0-SNAPSHOT.jar",
          "className": "com.aliyun.spark.SparkMLlib",
          "args": [
              "oss://oss-bucket-name/kmeans_demo/data.txt",
              "oss://oss-bucket-name/kmeans_demo/model/"
          ],
          "conf": {
              "spark.driver.resourceSpec": "medium",
              "spark.executor.instances": 2,
              "spark.executor.resourceSpec": "medium",
              "spark.dla.connectors": "oss"
          }
      }
      说明
      • file为FatJar文件在OSS中的绝对路径。
      • args为data.txt与model在OSS中的绝对路径。
    4. 完成以上配置后,单击保存按钮。
      保存Serverless Spark
  7. 单击页面左上方的试运行按钮进行测试。

执行结果

您可以在运维中心查看任务运行结果。查看Serverless Spark结果