本文介绍如何在E-MapReduce的Hadoop集群,运行Spark Streaming作业消费DataHub数据、统计数据个数并打印出来。

Spark接入DataHub

  • 准备工作

    使用DataHub的订阅功能订阅Topic,详细信息请参见DataHub订阅功能介绍

  • 消费DataHub数据
    运行Spark Streaming作业消费DataHub数据有两种使用方式:
    • 指定特定的ShardId,消费该ShardId的数据。
      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub Project
                topic, // DataHub topic
                subId, // DataHub的订阅ID
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint
                shardId, // DataHub Topic中的一个ShardId。
                read, // 处理DataHub数据的RecordEntry。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一个Field的数据。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • 消费所有Shard的数据。
      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub Project
                topic, // DataHub topic
                subId, // DataHub的订阅ID
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint
                read, // 处理DataHub数据的RecordEntry
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一个Field的数据。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }

附录

完整示例代码,请参见Spark对接DataHub