本文介绍如何在E-MapReduce的Hadoop集群,运行Spark作业消费DataHub数据、统计数据个数并打印出来。
Spark Streaming消费DataHub
- 准备工作
使用DataHub的订阅功能订阅Topic,详细信息请参见创建订阅。
- 消费DataHub数据 运行Spark Streaming作业消费DataHub数据有两种使用方式:
- 指定特定的ShardId,消费该ShardId的数据。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的项目名。 topic, // DataHub的topic名称。 subId, // DataHub的订阅ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 shardId, // DataHub Topic中的一个ShardId。 read, // 处理DataHub数据的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一个Field的数据。 def read(record: RecordEntry): String = { record.getString(0) }
- 消费所有Shard的数据。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的项目名。 topic, // DataHub的topic名称。 subId, // DataHub的订阅ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 read, // 处理DataHub数据的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一个Field的数据。 def read(record: RecordEntry): String = { record.getString(0) }
说明 完整示例代码,请参见SparkDatahubDemo.scala。 - 指定特定的ShardId,消费该ShardId的数据。
Spark Structured Streaming消费DataHub
- Maven依赖
- Spark2
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datahub_2.11</artifactId> <version>2.0.0</version> </dependency>
- Spark3请在集群
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/
目录,下载emr-datasources_shaded_***.jar作为依赖。说明- 如果您的集群中没有以上目录,则使用
/usr/lib/emrsdk-current/
目录。 - emr-datasources_shaded_***.jar,请根据您实际集群目录下的JAR包来替换。
- 如果您的集群中没有以上目录,则使用
- Spark2
- 消费示例
val spark = SparkSession .builder() .appName("test datahub") .getOrCreate() //创建readstream。 val datahubRows = spark .readStream .format("datahub") .option("access.key.id", "LTA*") .option("access.key.secret", "HF*") .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com") .option("project", "project_test") .option("startingoffsets", "earliest") .option("topic", "topic_test") .load //DataFrame处理逻辑。 datahubRows.printSchema() // 当前实例中,schema有key和value两个字段。 println("print schema" + datahubRows.schema.toString()) val df = datahubRows.groupBy("key").count() //创建writestream,输出数据。 val query = df .writeStream .format("console") .outputMode("complete") .start() //结束流任务。 query.awaitTermination(100000) spark.close()
核心流程如下:- 创建readstream读取DataHub DataFrame数据。
- 自定义数据源DataFrame处理逻辑。
- 创建writestream输出数据。
- DataHub核心配置参数重要 Structured Streaming消费DataHub不需要填写参数subId。主要是因为DataHub的消费offset由Spark管理,不需要DataHub管理,所以不需要提供subId。
参数 描述 是否必选 access.key.id 创建DataHub的阿里云AccessKey ID。 是 access.key.secret 创建DataHub的阿里云AccessKey Secret。 是 endpoint DataHub API Endpoint。您可以在DataHub页面查看。 是 project DataHub的项目名。 是 topic DataHub的topic名称。 是 decimal.precision 当topic字段中包含decimal字段时,需要指定该参数。 否 decimal.scale 当topic字段中包含decimal字段时,需要指定该参数。 否 startingoffsets 开始消费点位。取值如下: - latest:表示最晚。
- earliest:表示最早。
- json字符串:json结构如下所示。
{ "project名称#topic名称" : { "shardId" : "startingoffsets的值" } }
示例如下。{ "project_test#topic_test" : { "0" : "100" } }
否 endingoffsets 结束消费点位。取值如下: - latest:表示最晚。
- json字符串:json结构如下所示。
{ "project名称#topic名称" : { "shardId" : "endingoffsets的值" } }
否