本文介绍如何在E-MapReduce的Hadoop集群,运行Spark作业消费DataHub数据、统计数据个数并打印出来。
Spark Streaming消费DataHub
准备工作
使用DataHub的订阅功能订阅Topic,详细信息请参见创建订阅。
消费DataHub数据
运行Spark Streaming作业消费DataHub数据有两种使用方式:
指定特定的ShardId,消费该ShardId的数据。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的项目名。 topic, // DataHub的topic名称。 subId, // DataHub的订阅ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 shardId, // DataHub Topic中的一个ShardId。 read, // 处理DataHub数据的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一个Field的数据。 def read(record: RecordEntry): String = { record.getString(0) }
消费所有Shard的数据。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的项目名。 topic, // DataHub的topic名称。 subId, // DataHub的订阅ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 read, // 处理DataHub数据的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一个Field的数据。 def read(record: RecordEntry): String = { record.getString(0) }
说明完整示例代码,请参见SparkDatahubDemo.scala。
Spark Structured Streaming消费DataHub
Maven依赖
Spark2
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datahub_2.11</artifactId> <version>2.0.0</version> </dependency>
Spark3
请在集群
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/
目录,下载emr-datasources_shaded_***.jar作为依赖。说明如果您的集群中没有以上目录,则使用
/usr/lib/emrsdk-current/
目录。emr-datasources_shaded_***.jar,请根据您实际集群目录下的JAR包来替换。
消费示例
val spark = SparkSession .builder() .appName("test datahub") .getOrCreate() //创建readstream。 val datahubRows = spark .readStream .format("datahub") .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com") .option("project", "project_test") .option("startingoffsets", "earliest") .option("topic", "topic_test") .load //DataFrame处理逻辑。 datahubRows.printSchema() // 当前实例中,schema有key和value两个字段。 println("print schema" + datahubRows.schema.toString()) val df = datahubRows.groupBy("key").count() //创建writestream,输出数据。 val query = df .writeStream .format("console") .outputMode("complete") .start() //结束流任务。 query.awaitTermination(100000) spark.close()
核心流程如下:
创建readstream读取DataHub DataFrame数据。
自定义数据源DataFrame处理逻辑。
创建writestream输出数据。
说明运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量。
DataHub核心配置参数
重要Structured Streaming消费DataHub不需要填写参数subId。主要是因为DataHub的消费offset由Spark管理,不需要DataHub管理,所以不需要提供subId。
参数
描述
是否必选
access.key.id
创建DataHub的阿里云AccessKey ID。
是
access.key.secret
创建DataHub的阿里云AccessKey Secret。
是
endpoint
DataHub API Endpoint。您可以在DataHub页面查看。
是
project
DataHub的项目名。
是
topic
DataHub的topic名称。
是
decimal.precision
当topic字段中包含decimal字段时,需要指定该参数。
否
decimal.scale
当topic字段中包含decimal字段时,需要指定该参数。
否
startingoffsets
开始消费点位。取值如下:
latest:表示最晚。
earliest:表示最早。
json字符串:json结构如下所示。
{ "project名称#topic名称" : { "shardId" : "startingoffsets的值" } }
示例如下。
{ "project_test#topic_test" : { "0" : "100" } }
否
endingoffsets
结束消费点位。取值如下:
latest:表示最晚。
json字符串:json结构如下所示。
{ "project名称#topic名称" : { "shardId" : "endingoffsets的值" } }
否