Spark对接DataHub

本文介绍如何在E-MapReduce的Hadoop集群,运行Spark作业消费DataHub数据、统计数据个数并打印出来。

Spark Streaming消费DataHub

  • 准备工作

    使用DataHub的订阅功能订阅Topic,详细信息请参见创建订阅

  • 消费DataHub数据

    运行Spark Streaming作业消费DataHub数据有两种使用方式:

    • 指定特定的ShardId,消费该ShardId的数据。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub的项目名。
                topic, // DataHub的topic名称。
                subId, // DataHub的订阅ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint。
                shardId, // DataHub Topic中的一个ShardId。
                read, // 处理DataHub数据的RecordEntry。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一个Field的数据。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • 消费所有Shard的数据。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub的项目名。
                topic, // DataHub的topic名称。
                subId, // DataHub的订阅ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint。
                read, // 处理DataHub数据的RecordEntry。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一个Field的数据。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    说明

    完整示例代码,请参见SparkDatahubDemo.scala

Spark Structured Streaming消费DataHub

  • Maven依赖

    • Spark2

       <dependency>
              <groupId>com.aliyun.emr</groupId>
              <artifactId>emr-datahub_2.11</artifactId>
              <version>2.0.0</version>
       </dependency>
    • Spark3

      请在集群/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/目录,下载emr-datasources_shaded_***.jar作为依赖。

      说明
      • 如果您的集群中没有以上目录,则使用/usr/lib/emrsdk-current/目录。

      • emr-datasources_shaded_***.jar,请根据您实际集群目录下的JAR包来替换。

  • 消费示例

    val spark =  SparkSession
      .builder()
      .appName("test datahub")
      .getOrCreate()
    
    
    //创建readstream。
    val datahubRows = spark
      .readStream
      .format("datahub")
      .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com")
      .option("project", "project_test")
      .option("startingoffsets", "earliest")
      .option("topic", "topic_test")
      .load
    
    //DataFrame处理逻辑。
    datahubRows.printSchema() // 当前实例中,schema有key和value两个字段。
    println("print schema" + datahubRows.schema.toString())
    val df = datahubRows.groupBy("key").count()
    
    
    //创建writestream,输出数据。
    val query = df
      .writeStream
      .format("console")
      .outputMode("complete")
      .start()
    
    //结束流任务。
    query.awaitTermination(100000)
    spark.close()

    核心流程如下:

    1. 创建readstream读取DataHub DataFrame数据。

    2. 自定义数据源DataFrame处理逻辑。

    3. 创建writestream输出数据。

    说明

    运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量

  • DataHub核心配置参数

    重要

    Structured Streaming消费DataHub不需要填写参数subId。主要是因为DataHub的消费offset由Spark管理,不需要DataHub管理,所以不需要提供subId

    参数

    描述

    是否必选

    access.key.id

    创建DataHub的阿里云AccessKey ID。

    access.key.secret

    创建DataHub的阿里云AccessKey Secret。

    endpoint

    DataHub API Endpoint。您可以在DataHub页面查看。

    project

    DataHub的项目名。

    topic

    DataHub的topic名称。

    decimal.precision

    当topic字段中包含decimal字段时,需要指定该参数。

    decimal.scale

    当topic字段中包含decimal字段时,需要指定该参数。

    startingoffsets

    开始消费点位。取值如下:

    • latest:表示最晚。

    • earliest:表示最早。

    • json字符串:json结构如下所示。

      {
         "project名称#topic名称" : {
                 "shardId" : "startingoffsets的值"
            }
      }

      示例如下。

      {
          "project_test#topic_test" : {
                 "0" : "100"
         }
      }

    endingoffsets

    结束消费点位。取值如下:

    • latest:表示最晚。

    • json字符串:json结构如下所示。

      {
         "project名称#topic名称" : {
                 "shardId" : "endingoffsets的值"
            }
      }