IoT数据源

DataWorks数据集成支持使用IoT Reader插件读取物联网企业实例数据服务中提供的系统表、产品表、自定义存储表,本文为您介绍DataWorks的IoT数据读取能力。

支持的版本

仅华东2(上海)、华北2(北京)、华南1(深圳)地域的标准型和尊享型实例下支持IoT Reader。

使用限制

  • IoT Reader不支持使用数据过滤功能。

  • IoT Reader支持使用Serverless资源组(推荐)独享数据集成资源组

  • 目前仅华东2(上海)、华南1(深圳)、华北2(北京)地域支持IoT Reader插件。

  • 读取时序存储表时,支持指定读取数据的时间范围。

支持的字段类型

IoT Reader支持物联网企业实例存储表所有的数据类型,如下表所示。

类型分类

物联网平台存储表数据类型

数据集成配置类型

整数类

BIGINT

BIGINT

布尔类

BOOLEAN

BOOLEAN

日期时间类

TIMESTAMP

BIGINT

浮点类

DOUBLE

DOUBLE

字符串类

VARCHAR

STRING

支持的存储表

数据表

表标识符

描述

产品表

system.iotx_product

存储了物联网平台产品元信息,包括产品的ProductKey、名称、创建时间、修改时间等。详情请参见查看平台系统表和产品存储表

设备表

system.device

存储了物联网平台设备元信息,包括所属产品的设备唯一标识符IotId、激活时间、状态、物理地址、设备类型等。

设备分组表

system.device_group

存储了设备分组信息,包括分组类型、分组名称、描述等。

设备分组关系表

system.device_group_relation

存储了设备和分组的关系。

产品属性时序表

product.***********

设备上报的物模型属性历史数据。详情请参见查看平台系统表和产品存储表

产品事件表

event.***********

设备上报的物模型事件历史数据。详情请参见查看平台系统表和产品存储表

自定义存储表

******(用户自定义)

详情请参见创建和管理自定义存储表

产品属性时序表结构

产品属性时序表标识符为product.***********(***********为产品的产品Key)。存储已上报的物模型属性数据。

字段标识符

字段类型

样例

备注

product_key

STRING

al12345****

产品Key。

device_name

STRING

deviceName1234

设备名。

iot_id

STRING

4z819VQHk6VSLmmBJfrf00107e****

设备唯一标识。

event_time

BIGINT

1510799670074

消息时间戳。

event_date

STRING

20220101

消息上报日期,格式为yyyyMMdd。

items

STRING

{"Power":{"value":"on","time":1510799670074},"Position":{"time":1510292697470,"value":{"latitude":39.9,"longitude":116.38}}}

设备上报的属性。

产品事件表结构

产品事件表标识符为event.***********(***********为产品的产品Key)。存储已上报的物模型事件数据。

字段标识符

字段类型

样例

备注

product_key

STRING

al12345****

产品Key。

device_name

STRING

deviceName1234

设备名。

iot_id

STRING

4z819VQHk6VSLmmBJfrf00107e****

设备唯一标识。

event_time

BIGINT

1524448722000

消息时间戳。

event_date

STRING

20220101

消息上报日期,格式为yyyyMMdd。

event_code

STRING

Alarm

设备上报的事件标识符。

items

STRING

{ "value": { "Power": "on", "WF": "2" }, "time": 1524448722000 }

设备上报事件的输出参数。

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

从IoT Reader数据源的产品时序存储表中读取数据,写入到一张MaxCompute表中的脚本示例如下。

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "iot", // 请指定 "iot",表示IoT Reader数据源
            "parameter": {
                "accessId": "LT********", // 用于访问物联网企业实例的accessId。
                "accessKey": "******", // 用于访问物联网企业实例的accessKey
                "regionId": "cn-shanghai", // 物联网企业实例的地域ID
                "instanceId": "iot-*******", // 物联网企业实例ID
                "column": [ // 读取IoT数据存储表的列信息
                    "product_key",
                    "device_name",
                    "iot_id",
                    "event_time",
                    "event_date",
                    "items"
                ],
                "table": "product.h******", // 数据存储表的表标识符
                "date": "${bizdate}" // 日期格式为yyyyMMdd,如"20151111",表示导出该日的数据
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "ds=${bizdate}",
                "truncate": true,
                "indexes": [],
                "datasource": "odps_source",
                "envType": 0,
                "isSupportThreeModel": false,
                "column": [
                    "product_key",
                    "device_name",
                    "iot_id",
                    "event_time",
                    "event_date",
                    "items"
                ],
                "emptyAsNull": false,
                "table": "ods_product_timeline_dd"
            },
            "name": "Writer",
            "category": "writer"
        },
        {
            "copies": 1,
            "parameter": {
                "nodes": [],
                "edges": [],
                "groups": [],
                "version": "2.0"
            },
            "name": "Processor",
            "category": "processor"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "locale": "zh",
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Reader脚本参数

参数

描述

是否必选

默认值

accessId

用于访问物联网企业实例的AccessKey ID。

您可以在控制台AccessKey管理页面查看您的阿里云账号的AccessKey ID和AccessKey Secret。

说明

如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见授权RAM用户访问物联网平台

accessKey

用于访问物联网企业实例的AccessKey Secret。

您可以在控制台AccessKey管理页面查看您的阿里云账号的AccessKey ID和AccessKey Secret。

regionId

物联网企业实例的地域ID。必须与DataWorks所在的地域保持一致。

仅华东2(上海)、华北2(北京)、华南1(深圳)地域的标准型和尊享型实例下支持IoT Reader。

instanceId

物联网企业实例ID。您可在物联网平台控制台的实例概览页面,查看当前实例的ID。实例的更多信息请参见实例概述

table

数据存储表的表标识符。IoT Reader支持的数据存储表的更多信息请参见支持的存储表

说明

IoT Reader的表标识符不包含“${”与“}”。

column

读取IoT数据存储表的列信息。例如自定义存储表test的字段为idnameage

  • 如果您需要依次读取idnameage,则应该配置为"column":["id","name","age"]或者配置为"column":["*"]

    说明

    不推荐您配置抽取字段为(*),因为它表示依次读取表的每个字段。如果您的表字段顺序调整、类型变更或者个数增减,您的任务会存在源头表列和目的表列不能对齐的风险,则直接导致您的任务运行结果不正确甚至运行失败。

  • 如果您想依次读取nameid,则应该配置为"column":["name","id"]

date

日期格式为yyyyMMdd,例如20151111,表示导出该日的数据。

说明
  • 对于产品时序表,必须指定date参数。

  • 对于非产品时序表,date参数会被忽略。