本文介绍在E-MapReduce的Hadoop集群运行Spark Streaming作业消费DataHub数据,统计数据个数并打印出来。

Spark接入DataHub

  • 准备工作

    使用 DataHub 的订阅功能订阅Topic,详细步骤请参见DataHub 订阅功能介绍

  • 消费DataHub 数据
    运行Spark Streaming作业消费DataHub数据有两种使用方式:
    • 指定特定的Shard ID,消费该Shard ID的数据:
      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub Project
                topic, // DataHub topic
                subId, // DataHub的订阅ID
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint
                shardId, // DataHub topic其中的一个ShardId
                read, // 对DataHub数据RecordEntry的处理
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 对DataHub数据RecordEntry的处理函数,取出RecordEntry中第一个field的数据
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • 消费所有Shard的数据:
      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub Project
                topic, // DataHub topic
                subId, // DataHub的订阅ID
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint
                read, // 对DataHub数据RecordEntry的处理
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 对DataHub数据RecordEntry的处理函数,取出RecordEntry中第一个field的数据
      def read(record: RecordEntry): String = {
        record.getString(0)
      }

附录

完整示例代码请参见Spark 对接 DataHub