全部产品

Spark对接DataHub快速入门

更新时间:2019-04-26 10:11:29

简介

阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish),订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。这里主要介绍通过“数据工作台”使用SparkStreaming对接DataHub的方法。

前置条件

  1. Datahub项目已创建。
    本例中DataHub的区域为“华南1(深圳)”,project名称为:project_test,topic名称为:topic01。
    Topic的属性如下图:
    注意:目前内置的SparkOnDataHub Connectors仅支持Topic类型TUPLE。
  2. 发送数据到Topic01。
    下载jar包到本地(本地具备外网的访问权限):
    1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/common_test/common-test-0.0.1-SNAPSHOT-shaded.jar
    jar包存放到本地目录:/opt/jars/common-test-0.0.1-SNAPSHOT-shaded.jar。然后运行如下命令,向Topic01发送数据。
    1. java -cp /opt/jars/common-test-0.0.1-SNAPSHOT-shaded.jar com.aliyun.datahub.DatahubWrite_java project_test topic01 xxx1 xxx2 https://dh-cn-shenzhen.aliyuncs.com
    显示如下内容说明发送成功:
    1. finish write the 1th record
    2. finish write the 2th record
    命令参数说明:
    参数 说明
    project_test topic01 分别是DataHub的project名称和topic名称。
    xxx1 xxx2 分别是访问阿里云API的AccessKey ID和AccessKey Secret。
    https://dh-cn-shenzhen.aliyuncs.com DataHub访问域名中“华南1(深圳)”的“外网Endpoint”。

使用“数据工作台”>“作业管理”运行样例

步骤 1:通过“资源管理”上传样例代码Jar包

下载样例代码jar包“sparkstreaming-0.0.1-SNAPSHOT.jar”以及依赖的jar包到本地目录。

  1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_example/sparkstreaming-0.0.1-SNAPSHOT.jar
  2. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/aliyun-sdk-datahub-2.9.2-public.jar
  3. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/datahub-spark-2.9.2-public_2.3.2-1.0.1.jar

在“数据工作台”>“资源管理”中添加文件夹“spark_on_datahub”。
上传下载的jar包到此文件夹。如下图:

步骤 2:通过“作业管理”创建并编辑作业内容

在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

  1. --class com.aliyun.spark.SparkStreamingOnDataHub
  2. --jars /spark_on_datahub/aliyun-sdk-datahub-2.9.2-public.jar,/spark_on_datahub/datahub-spark-2.9.2-public_2.3.2-1.0.1.jar
  3. --driver-memory 1G
  4. --driver-cores 1
  5. --executor-cores 2
  6. --executor-memory 2G
  7. --num-executors 1
  8. --name spark_on_datahub
  9. /spark_on_datahub/sparkstreaming-0.0.1-SNAPSHOT.jar
  10. http://dh-cn-shenzhen-int-vpc.aliyuncs.com xxx1 xxx2 xxx3 project_test topic01

作业内容参数说明:

参数 说明
http://dh-cn-shenzhen-int-vpc.aliyuncs.com DataHub访问域名中“华南1(深圳)”的“VPC ECS Endpoint”。
xxx1 xxx2 分别是访问阿里云API的AccessKey ID和AccessKey Secret。
xxx3 DataHub中topic01的“订阅ID”。
project_test topic01 分别是DataHub的project名称和topic名称。

如下图:

步骤 3:通过“作业管理”运行作业并查看结果

作业编辑完成后点击“运行”,选择Spark集群。运行状态会在下侧显示,如图:


运行成功后点击“YarnUI”,点击“stdout”日志链接。如下图:

日志显示类似如下内容说明SparkStreming访问DataHub成功。

  1. name_01191,value_01191
  2. name_01192,value_01192
  3. name_01193,value_01193
  4. name_01194,value_01194
  5. name_01195,value_01195

小结