本文介绍在 E-MapReduce 的 Hadoop 集群运行 Spark Streaming 作业消费 DataHub 数据,统计数据个数并打印出来。

Spark 接入 DataHub

  • 准备工作

    使用 DataHub 的订阅功能订阅 Topic,详细步骤请参见 DataHub 订阅功能介绍

  • 消费 DataHub 数据
    运行 Spark Streaming 作业消费 DataHub 数据有两种使用方式:
    • 指定特定的 Shard ID,消费该 Shard ID 的数据:
      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub Project
                topic, // DataHub topic
                subId, // DataHub的订阅ID
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint
                shardId, // DataHub topic其中的一个ShardId
                read, // 对DataHub数据RecordEntry的处理
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 对 DataHub 数据 RecordEntry 的处理函数,取出 RecordEntry 中第一个 field 的数据
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • 消费所有 Shard 的数据:
      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub Project
                topic, // DataHub topic
                subId, // DataHub的订阅ID
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint
                read, // 对DataHub数据RecordEntry的处理
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 对DataHub数据RecordEntry的处理函数,取出RecordEntry中第一个field的数据
      def read(record: RecordEntry): String = {
        record.getString(0)
      }

附录

完整示例代码请参见: