离线数据集成(DataWorks+MaxCompute)

物联网平台数据服务中的平台系统表、产品属性时序表、产品属性快照表、产品事件表和自定义存储表等数据,通过大数据开发治理平台DataWorks集成到云原生大数据计算服务MaxCompute中构建数据仓库,以提升数据应用效率。

背景信息

  • DataWorks的数据集成是稳定高效、弹性伸缩的数据同步平台,致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动及同步能力。详细说明,请参见数据集成概述

  • 使用DataWorks中数据集成的IoT Reader插件可以读取物联网平台企业版实例中数据,IoT Reader支持的数据类型、字段映射和数据源等参数与配置的详细说明,请参见IoT数据源

  • MaxCompute是适用于数据分析场景的企业级SaaS(Software as a Service)模式云数据仓库,以Serverless架构提供快速、全托管的在线数据仓库服务,消除了传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您可以经济并高效地分析处理海量数据。基于DataWorks实现一站式的数据同步、业务流程设计、数据开发、管理和运维功能。

使用限制

限制项

说明

地域

华东2(上海)、华南1(深圳)、华北2(北京)、美国(弗吉尼亚)。

物联网平台企业版实例

标准型、尊享型。实例类型说明,请参见企业版实例类型说明

集成的物联网平台数据源

平台系统表、产品属性时序表、产品属性快照表、产品事件表自定义存储表

重要

如果需要将物联网平台的产品属性时序表、产品属性快照表和产品事件表的数据集成到MaxCompute,需先在物联网平台的数据服务中开启该产品的备份。具体操作,请参见备份设备数据源

独享资源组

DataWorks中数据集成的Reader插件仅支持独享数据集成资源组。详细说明,请参见独享数据集成资源组

读取时序数据

支持指定数据的时间范围。

数据过滤

不支持。

计费说明

物联网平台数据集成流出会消耗数据处理单元(CU)。详细说明,请参见数据服务计费说明的数据集成

DataWorks中数据集成的计费逻辑,请参见计费逻辑说明。MaxCompute的计费信息,请参见计费概述

使用流程

前提条件

如果集成产品属性时序表、产品属性快照表或产品事件表数据,需在物联网平台的数据服务中,开启对应产品的设备数据源备份。具体操作,请参见备份设备数据源

DataWorks中配置数据集成的离线同步任务

通过向导模式配置离线同步任务

  1. 步骤一:新建离线同步节点:根据界面提示创建离线同步节点。

  2. 步骤二:配置同步网络链接:选择离线同步任务的数据来源、用于执行同步任务的我的资源组,以及数据去向,并测试连通性。

    重要

    数据来源选择IoT我的资源组只支持独享数据集成资源组,数据去向由您自定义。

    image

  3. 步骤三:配置数据来源与去向:配置同步任务的数据来源数据去向的详情信息。数据去向可以配置为MaxCompute

    image

    参数

    说明

    数据存储表的表标识符。

    导出日期数据(时序表)

    日期格式为yyyyMMdd,例如20150101,表示导出该日的数据。

  4. 步骤四:配置字段映射关系:左侧的源头表字段和右侧的目标表字段为一一对应的关系。

    参数

    说明

    备注

    同名映射

    单击同名映射,可以根据名称建立相应的映射关系。

    重要

    数据类型需保证一一对应匹配。

    同行映射

    单击同行映射,可以在同行建立相应的映射关系。

    取消映射

    单击取消映射,可以取消建立的映射关系。

    您可以单击添加一行可以增加单个字段,您也可以将鼠标移动至需要删除的字段上,单击删除图标删除字段。

  5. 步骤五:配置通道:通过通道配置,控制数据同步过程相关属性。

    image

  6. 步骤六:配置调度属性

  7. 步骤七:提交并发布任务

通过脚本模式配置离线同步任务

  1. 步骤一:新建离线同步节点:根据界面提示创建离线同步节点。

  2. 步骤二:配置同步网络链接:选择离线同步任务的数据来源、用于执行同步任务的我的资源组,以及数据去向,并测试连通性。

  3. 步骤三:转脚本模式并导入模板

  4. 步骤四:编辑脚本,配置同步任务:编辑JSON脚本,指定IoT Reader以及对应Writer的参数。

    以下为一个完整的示例脚本,示例中从IoT Reader数据源的产品时序存储表中读取数据,写入到一张MaxCompute表中。

    重要

    实际场景中,需根据代码中注释修改对应参数值,且在运行时,删除代码中的注释。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "iot", // 请指定 "iot",表示IoT Reader数据源。
                "parameter": {
                    "accessId": "LT********", // 用于访问物联网企业实例的AccessKey ID。
                    "accessKey": "******", // 用于访问物联网企业实例的AccessKey Secret。
                    "regionId": "cn-shanghai", // 物联网平台企业版实例的地域ID。
                    "instanceId": "iot-*******", // 物联网平台企业版实例ID。
                    "column": [ // 读取IoT数据存储表的列信息。
                        "product_key",
                        "device_name",
                        "iot_id",
                        "event_time",
                        "event_date",
                        "items"
                    ],
                    "table": "product.h******", // 数据存储表的表标识符。
                    "date": "${bizdate}" // 日期格式为yyyyMMdd,例如“20151111”,表示导出该日的数据。
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "ds=${bizdate}",
                    "truncate": true,
                    "indexes": [],
                    "datasource": "odps_first",
                    "envType": 0,
                    "isSupportThreeModel": false,
                    "column": [
                        "product_key",
                        "device_name",
                        "iot_id",
                        "event_time",
                        "event_date",
                        "items"
                    ],
                    "emptyAsNull": false,
                    "table": "ods_product_timeline_dd"
                },
                "name": "Writer",
                "category": "writer"
            },
            {
                "copies": 1,
                "parameter": {
                    "nodes": [],
                    "edges": [],
                    "groups": [],
                    "version": "2.0"
                },
                "name": "Processor",
                "category": "processor"
            }
        ],
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "locale": "zh",
            "speed": {
                "throttle": false,
                "concurrent": 2
            }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  5. 步骤五:配置调度属性

  6. 步骤六:提交并发布任务