全部产品
云市场

Spark MLlib 示例

更新时间:2020-05-25 17:12:10

本文介绍如何在DLA Serverless Spark中运行Spark MLlib任务。

场景

本示例将在DLA Serverless Spark中通过K-Means聚类算法,将以下数据分成两个族类,然后判断测试数据是否在族类中。

  1. 0.0 0.0 0.0
  2. 0.1 0.1 0.1
  3. 0.2 0.2 0.2
  4. 9.0 9.0 9.0
  5. 9.1 9.1 9.1
  6. 9.2 9.2 9.2

前提条件

在DLA Serverless Spark中运行Spark MLlib任务前,您需要完成以下准备工作。

操作步骤

  1. 登录Data Lake Analytics管理控制台

  2. 在页面左上角,选择DLA所在地域。

  3. 单击左侧导航栏中的Serverless Spark > 作业管理

  4. 作业编辑页面,单击创建作业

  5. 创建作业页面,按照页面提示进行参数配置。

    参数 说明
    文件名称 设置文件夹或者文件的名字。
    文件类型 可以设置为文件或者文件夹。
    父级 设置文件或者文件夹的上层目录。
    • 作业列表相当于根目录,所有的作业都在作业列表下创建。
    • 您可以在作业列表下创建文件夹,然后在文件夹下创建作业;也可以直接在作业列表根目录下创建作业。

    创建作业

  6. 完成上述参数配置后,单击确定创建Spark作业。

  7. 单击Spakr作业名,在Spark作业编辑框中输入Spark MLlib任务内容。

    1. {
    2. "name": "spark-mllib-test",
    3. "file": "oss://${your oss bucket}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
    4. "className": "com.aliyun.spark.SparkMLlib",
    5. "args": ["oss://${your oss bucket}/data/rawdata.csv"],
    6. "conf": {
    7. "spark.driver.resourceSpec": "medium",
    8. "spark.executor.instances": 2,
    9. "spark.executor.resourceSpec": "medium"
    10. }
    11. }
    参数 说明
    args Spark任务传入的参数,多个参数之间以英文逗号(,)分隔。

    示例中args值为OSS中rawdata.csv文件的存储路径。

    name Spark任务名称。
    file Spark任务所在主程序jar包的存储位置。

    注意:Spark任务依赖的所有jar包须存储在OSS中。

    className 入口类(主类)com.aliyun.spark.SparkMLlib,请参见示例代码
    conf
    • "spark.driver.resourceSpec":"medium"表示driver使用的规格为medium,对应的CPU为2vCPU Core,内存为8GB。
    • "spark.executor.resourceSpec":"medium"表示executor使用的规格为medium,对应的CPU为2vCPU Core,内存为8GB。

    若不填写conf,系统使用创建虚拟集群时设置的默认值。

  8. Spark任务编写完成后, 单击执行,执行Spark任务,状态栏中实时显示任务的执行状态。

    • STARTING:任务正在提交。
    • RUNNING:任务运行中。
    • SUCCESS:Spark作业执行成功。
    • DEAD:任务出错,可通过查看日志进行排错处理。
    • KILLED:任务被主动终止。

示例代码

以下为主类SparkMLlib对应的源码。

  1. package com.aliyun.spark
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.mllib.clustering.KMeans
  4. import org.apache.spark.mllib.linalg.Vectors
  5. import org.apache.spark.sql.SparkSession
  6. object SparkMLlib {
  7. def main(args: Array[String]): Unit = {
  8. val conf = new SparkConf().setAppName("Spark MLlib")
  9. val spark = SparkSession
  10. .builder()
  11. .config(conf)
  12. .getOrCreate()
  13. val rawDataPath = args(0)
  14. val data = spark.sparkContext.textFile(rawDataPath)
  15. val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
  16. val numClusters = 2
  17. val numIterations = 20
  18. val model = KMeans.train(parsedData, numClusters, numIterations)
  19. for (c <- model.clusterCenters) {
  20. println(s"cluster center: ${c.toString}")
  21. }
  22. val cost = model.computeCost(parsedData)
  23. //预测数据
  24. println("Vectors 0.2 0.2 0.2 is belongs to clusters:" +
  25. model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))
  26. println("Vectors 0.25 0.25 0.25 is belongs to clusters:" +
  27. model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))
  28. println("Vectors 8 8 8 is belongs to clusters:" +
  29. model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))
  30. }
  31. }

上述代码的运行结果如下所示,前两个值属于族0,后面一个值属于族1。

  1. Vectors 0.2 0.2 0.2 is belongs to clusters:0
  2. Vectors 0.25 0.25 0.25 is belongs to clusters:0
  3. Vectors 8 8 8 is belongs to clusters:1