Spark MLlib

本文介绍如何在DLA Serverless Spark中运行Spark MLlib任务。

场景

本示例将在DLA Serverless Spark中通过K-Means聚类算法,将以下数据分成两个族类,然后判断测试数据是否在族类中。

0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2

前提条件

在DLA Serverless Spark中运行Spark MLlib任务前,您需要完成以下准备工作。

操作步骤

  1. 登录Data Lake Analytics管理控制台

  2. 在页面左上角,选择DLA所在地域。

  3. 单击左侧导航栏中的Serverless Spark > 作业管理

  4. 作业编辑页面,单击创建作业

  5. 创建作业页面,按照页面提示进行参数配置。

  6. 完成上述参数配置后,单击确定创建Spark作业。

  7. 单击Spark作业名,在Spark作业编辑框中输入Spark MLlib任务内容。

            {
                    "name": "spark-mllib-test",
                    "file": "oss://${your oss bucket}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
                    "className": "com.aliyun.spark.SparkMLlib",
                    "args": ["oss://${your oss bucket}/data/rawdata.csv"],
                    "conf": {
                            "spark.driver.resourceSpec": "medium",
                            "spark.executor.instances": 2,
                            "spark.executor.resourceSpec": "medium",
                             "spark.dla.connectors": "oss"
                    }
            }

    注意:如果是子账号提交,还需要配置spark.dla.roleArn参数,参考文档使用RAM子账号开发Spark作业。

示例代码

以下为主类SparkMLlib对应的源代码。

package com.aliyun.spark
import org.apache.spark.SparkConf
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession
object SparkMLlib {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark MLlib")
    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val rawDataPath = args(0)
    val data = spark.sparkContext.textFile(rawDataPath)
    val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
    val numClusters = 2
    val numIterations = 20
    val model = KMeans.train(parsedData, numClusters, numIterations)
    for (c <- model.clusterCenters) {
      println(s"cluster center: ${c.toString}")
    }
    val cost = model.computeCost(parsedData)
    //预测数据
    println("Vectors 0.2 0.2 0.2 is belongs to clusters:" +
      model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))
    println("Vectors 0.25 0.25 0.25 is belongs to clusters:" +
      model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))
    println("Vectors 8 8 8 is belongs to clusters:" +
      model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))
  }
}

上述代码的运行结果如下所示,前两个值属于族0,后面一个值属于族1。

Vectors 0.2 0.2 0.2 is belongs to clusters:0
Vectors 0.25 0.25 0.25 is belongs to clusters:0
Vectors 8 8 8 is belongs to clusters:1