数据转发到DataHub

通过物联网平台云产品流转功能将设备数据流转到数据总线DataHub后,用户可以编写应用程序或者使用流计算引擎来处理写入到数据总线DataHub的流式数据(例如再流转至实时计算、MaxCompute等服务中),产出各种实时的数据处理结果,例如实时图表、报警信息、实时统计等。本文以物模型数据上报Topic为例,介绍流转消息数据的完整流程。

工作原理

image

云产品流转将设备的Topic消息转发到DataHub Project中的Topic,使用DataHub服务进行清洗、分析、归档数据等操作。详细内容,请参见数据总线DataHub

在上图中:

应用场景

将物联网平台数据写入DataHub进行统一管理:

  • 可投递到下游的分析、归档等系统,构建清晰的数据流,让您更好地释放数据的价值。

  • 可结合实时计算,把多种数据源的异构数据实时清洗成统一的结构化数据,为进一步分析做准备。

  • 可搭建原始数据层、实时明细层和实时汇总层,打造实时数据仓库。

详细说明,请参见DataHub的应用场景

使用限制

  • 物联网平台实例及所在地域支持将数据转发到DataHub。支持的地域详细信息,请参见各地域功能说明

  • 新版和旧版云产品流转功能均支持将数据流转到DataHub。旧版云产品流转使用示例,请参见数据转发到DataHub(旧版)

  • 物联网平台实例下设备消息转发的更多使用限制,请参见使用限制

前提条件

  • 已添加待转发的设备Topic数据源。例如:创建数据源DataSource,添加指定设备的物模型数据上报Topic。具体步骤,请参见添加待流转的数据源

  • 已创建DataHub Project和用于接收数据的Topic。DataHub使用方法,请参见Project操作

    重要

    企业版实例中,DataHub Project所在地域必须与企业版实例所在地域一致。

创建数据目的

  1. 登录物联网平台控制台

  2. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  3. 在左侧导航栏,选择消息转发 > 云产品流转

  4. 云产品流转页面,单击右上角体验新版,进入新版功能页面。

    说明

    如果您已执行过此操作,再次进入云产品流转页面,会直接进入新版功能页面。

  5. 单击数据目的页签,然后单击创建数据目的
  6. 创建数据目的对话框,输入数据目的名称,例如DataPurpose,按照以下参数说明,完成配置,然后单击确定

    数据流转到DataHub

    参数

    说明

    选择操作

    选择发送数据到DataHub中

    地域

    选择DataHub所在地域。

    Project

    选择DataHub Project。

    您可以单击创建Project,跳转到DataHub控制台,创建DataHub Project,请参见Project操作

    Topic

    选择接收数据的DataHub Topic。

    选择Topic后,规则引擎会自动获取Topic中的Schema,规则引擎筛选出来的数据将会映射到对应的Schema中。

    说明
    • 将数据映射到Schema时,需使用${},否则存入表中的将会是一个常量。

    • Schema与规则引擎的数据类型必须保持一致,否则无法存储。

    您可以单击创建Topic,跳转到DataHub控制台,创建DataHub Topic。

    角色

    授权物联网平台将数据写入DataHub。

    如您还未创建相关角色,单击创建RAM角色,跳转到RAM控制台,创建角色和授权策略,请参见创建RAM角色

配置并启动解析器

  1. 创建解析器,例如DataParser。具体操作,请参见步骤一:创建解析器
  2. 解析器详情页面,关联数据源。
    1. 在配置向导的数据源下,单击关联数据源
    2. 在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源DataSource,单击确定
  3. 解析器详情页面,关联数据目的。
    1. 单击配置向导的数据目的,然后单击数据目的列表右上方的关联数据目的
    2. 在弹出的对话框中,单击数据目的下拉列表,选择已创建的数据目的DataPurpose,单击确定
    3. 在数据目的列表,查看并保存数据目的ID,例如为1000
      后续解析脚本中,需使用此处的数据目的ID
  4. 解析器详情页面,单击解析器
  5. 在脚本输入框,输入解析脚本。

    解析脚本类似JavaScript语言,编辑脚本的语法参考JavaScript语法,详细的编辑方法,请参见脚本语法

    函数参数说明,请参见函数列表

    //通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。
    var data = payload("json");
    //直接流转物模型上报数据。
    writeDatahub(1000, data);
  6. 单击调试,根据页面提示,选择产品和设备,输入Topic和Payload数据,验证脚本可执行。

    参数示例如下:调试示例

    运行结果如下,表示脚本执行成功。

    调试结果

  7. 单击发布
  8. 回到云产品流转页面的解析器页签,单击解析器DataParser对应的启动按钮,启动解析器。

后续操作

  • 您可在物联网平台控制台对应实例下监控运维 > 日志服务页面的云端运行日志页签,查看设备到云消息云产品流转的运行日志。具体操作,请参见云端运行日志

  • 您可登录DataHub服务控制台,查看写入DataHub的数据。具体内容,请参见快速入门

  • 您可将对应Topic中的数据实时/准实时的同步到第三方阿里云产品中,打通阿里云产品间的数据流通。目前支持MaxCompute(原ODPS)、分析型数据库MySQL(ADS)、云数据库RDS、表格存储TableStore、对象存储OSS、ElasticSearch以及函数计算服务等。详细内容,请参见数据同步

相关文档