本文以订单结算为例,详细说明如何通过流数据分析的维表Join功能,将物联网设备产生的数据(识别器识别的商品及订单编号)和数据库中的数据(商品价格表)进行连接。

示意图如下所示。


订单结算

前提条件

在开始操作本章内容前,请您确保已根据环境搭建内容创建完成边缘实例。

背景信息

物联网设备的产生的数据大多都是一些动态的属性信息,很多时候我们需要将设备的动态属性和更多维度的数据(如设备产地、型号,商品的颜色、价格等)进行关联,才能计算出我们想要的结果。

例如,某无人小店有一台商品自助结算机,包含商品识别器及价格显示屏。识别器每识别到一个商品会将商品及订单编号通过事件的方式上报,显示屏实时显示当前累积总价,而当前累积总价需要将识别器上报的数据和数据库里的商品价格表关联后计算得出。

一、添加设备

参考设备模拟器章节中使用设备模拟器驱动部分的内容,完成如下步骤。

  • 添加商品识别器设备:
    1. 新建商品识别器产品,在产品自定义功能处添加名为 recognizedGoods的事件,事件包含两个int型参数,商品编号goodsId和订单编号。
    2. 为商品识别器产品添加设备,并分配进边缘实例中。
    3. 将商品识别器设备的驱动设为设备模拟器驱动。
  • 添加显示器设备:
    1. 新建显示器产品,在产品自定义功能处添加名为 content的字符类型属性。
    2. 为显示器产品添加设备,并分配进边缘实例中。
    3. 将显示器设备的驱动设为设备模拟器驱动。
说明 将设备分配到边缘实例后,请先不要部署边缘实例,待完成其他操作后统一部署。

二、添加流数据分析任务

  1. 参考分配流数据分析到边缘实例,创建、设置并发布订单结算的流数据分析任务。

    其中,任务类型选择SQL执行任务选择边缘端


    创建任务
  2. 任务列表中,已创建的SQL任务名称右侧单击查看,进入SQL编辑器

    复制如下SQL内容到SQL编辑器编辑框中。

    create table event (
        eventCode varchar,
        params varchar,
        ts varchar,
        tstamp as to_timestamp (cast (ts as bigint)),
        WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
    ) with (
        type = 'edgebus',
        jsonParser = 'device_event'
    );
    
    CREATE TABLE goods_info(
        id bigint,
        name varchar,
        price double,
        PRIMARY KEY(id),
        PERIOD FOR SYSTEM_TIME)
    WITH(
        type= 'rds',
        url='jdbc:mysql://30.43.83.73:1234/iot_data', -- 请将your_db_host、your_db_port、your_db_name替换成真实值
        userName='config://local_db_username', -- 通过./tool_config -s local_db_username xxx 设置
        password='config://local_db_password', -- 通过./tool_config -s local_db_password xxx 设置
        tableName='table_case_06_goods_info',
        cache='LRU',
        cacheSize='1000',
        cacheTTLMs='600000'
    );
    
    CREATE TABLE edgebus_sink (
        orderId bigint,
        orderPrice double
    ) WITH (
        type='edgebus_sink',
        topic = '/sys/streamCompute/orderPrice'
    );
    
    create view scan_goods as
    select
        cast (json_value(params, '$.goodsId') as bigint) as goodsId,
        cast (json_value(params, '$.orderId') as bigint) as orderId
    from
        event
    where
        eventCode = 'recognizedGoods';
    
    insert into edgebus_sink
    select
        orderId,
        sum(goods_info.price)
    from
        scan_goods
    join
        LATERAL goods_info FOR SYSTEM_TIME AS OF PROCTIME()
    ON
        goods_info.id = scan_goods.goodsId
    group by
        orderId;
  3. 保存任务并发布。
  4. 参考分配流数据分析到边缘实例内容,将该SQL任务分配到边缘实例中。

三、添加函数

  1. 参考函数计算使用流程内容创建服务,并在该服务下创建名为DisplayPrice的使用Nodejs8运行环境的函数。
  2. 创建函数完成后,在线编辑代码,添加如下内容。
    说明 请将your_displaydevice_productKey以及your_displaydevice_deviceName换成实际网关设备的ProductKey和DeviceName。
    const leSdk = require('linkedge-core-sdk');
    
    const iotData = new leSdk.IoTData();
    
    module.exports.handler = function(event, context, callback) {
        const payload = JSON.parse(JSON.parse(event.toString()).payload);
        const time = payload.time;
        const params = payload.params;
        iotData.setThingProperties({
            productKey: 'your_displaydevice_productKey',
            deviceName: 'your_displaydevice_deviceName',
            payload: {
                content: `orderId: ${params.orderId}, orderPrice: ${params.orderPrice}, time: ${time}`
            }
        }, (err) => {
            if (err) {
                console.log(`display price err: ${err}`);
            }
        });
    };
  3. 物联网平台控制台边缘实例的实例详情页面,选择函数计算,单击分配函数
  4. 根据界面提示设置参数。
    函数信息配置说明如下 :
    参数 描述
    地域 选择您创建的服务所在的地域。
    服务 选择DisplayPrice函数所在的服务名称。
    函数 选择DisplayPrice函数。
    授权 选择AliyunIOTAccessingFCRole
    函数配置说明如下:
    参数 说明
    运行模式 选择按需运行
    内存限制(MB) 设置为50 MB。
    超时限制(秒) 设置为5秒。
    定时运行 使用默认配置:关闭

    其余参数无需配置。

四、添加消息路由

  1. 物联网平台控制台边缘实例的实例详情页面,选择消息路由,将商品识别器的数据路由到订单结算的流数据分析任务中。

    添加消息路由
  2. 将订单结算的流数据分析任务路由到DisplayPrice函数计算中。消息主题过滤设置为/sys/streamCompute/orderPrice

    添加消息路由
  3. 将显示器的属性变换数据路由到IoTHub中,方便在云端查看显示器设备的状态。

    添加消息路由

五、创建商品价格表

若您已经操作过温度平均值计算示例并完成了该章节中5. 在数据库中创建表这一步内容,则在本步骤中可以直接跳到子步骤5. 创建商品价格表操作。

  1. (可选)新建一个RDS实例,或者在搭建边缘环境的机器上安装一个MySQL数据库(也可以跨机器安装,但必须保证两台机器网络互通)。安装命令如下所示。
    说明 若已有MySQL数据库(本地数据库或RDS),可直接跳过本步骤。
    sudo docker run --name mysql -p your_db_port:3306 -e MYSQL_ROOT_PASSWORD=your_db_password -d mysql:5.6.35

    其中,your_db_port为在宿主机(即启动mysql docker的机器)访问数据库的端口号,your_db_password为数据库root账户的密码,请根据实际情况更改。例如,端口号为1234,密码为xxxxxxxx,则实例执行的命令如下。

    sudo docker run --name mysql -p 1234:3306 -e MYSQL_ROOT_PASSWORD=xxxxxxxx -d mysql:5.6.35
  2. 为了保证数据库用户名密码的安全性,您需要把数据库用户名密码保存在配置中心(会加密后再存储)。按如下方式设置。
    1. 物联网平台控制台左侧导航栏中,单击边缘计算 > 边缘实例
    2. 在相应的边缘实例右侧单击查看,进入实例详情页面,选择网关页签,单击网关右侧的远程连接,登录远程控制台,执行如下命令。
      说明 需要您先在设置页签下打开远程调试远程连接才可以使用。
      cd /linkedge/gateway/build/bin
      ./tool_config -s local_db_username root
      ./tool_config -s local_db_password xxxxxxxx
  3. 查看数据库host,即安装MySQL的宿主机IP。在宿主机上执行ifconfig命令可看到如下结果。
    lo0: flags=8049<UP,LOOPBACK,RUNNING,MULTICAST> mtu 16384
      options=1203<RXCSUM,TXCSUM,TXSTATUS,SW_TIMESTAMP>
      inet 127.0.0.1 netmask 0xff000000
      inet6 ::1 prefixlen 128
      inet6 fe80::1%lo0 prefixlen 64 scopeid 0x1
      nd6 options=201<PERFORMNUD,DAD>
    gif0: flags=8010<POINTOPOINT,MULTICAST> mtu 1280
    stf0: flags=0<> mtu 1280
    XHC0: flags=0<> mtu 0
    XHC20: flags=0<> mtu 0
    en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
      ether f0:18:98:37:81:b4
      inet6 fe80::1c6e:7a87:1c27:724d%en0 prefixlen 64 secured scopeid 0x6
      inet 30.43.83.169 netmask 0xfffff000 broadcast 30.43.95.255
      nd6 options=201<PERFORMNUD,DAD>
      media: autoselect
      status: active

    其中,inet参数后的30.43.83.169为宿主机对外IP,即数据库host(用于流数据分析任务中替换your_db_host)。

  4. 下载mysql-client命令行工具或者mysql-client图形化工具(如NavicatSequel Pro),进行MySQL数据库的访问,并创建iot_data数据库,用于在流数据分析任务中替换your_db_name参数 。
    CREATE DATABASE iot_data;
    USE iot_data;
  5. 创建商品价格表。
    CREATE TABLE table_case_06_goods_info(
        id INT UNSIGNED  NOT NULL  PRIMARY KEY,
        name varchar(128),
        price double
    );
    INSERT INTO table_case_06_goods_info
    (id, name, price)
    VALUES 
    (1, 'Coke', 3.00),
    (2, 'Sprite', 4.00),
    (3, 'Juice', 5.00),
    (4, 'Coffee', 6.00),
    (5, 'Tea', 7.00);

六、部署边缘实例

  1. 在边缘实例的实例详情页面,单击部署,将子设备、流数据分析作业及消息路由下发到边缘端。
  2. 实例详情页面,选择网关页签,单击远程连接,登录远程控制台。
  3. 进入/linkedge/gateway/build/bin目录,执行如下命令上报识别到的商品识别器事件。
    ./ds_ctrl event a1ivc15wcrn goodsRecognizer01 recognizedGoods '{"orderId": 12, "goodsId": 1}'
  4. 物联网平台控制台,选择设备管理 > 设备,在相应设备名称右侧单击查看
    设备详情页面查看显示器设备的运行状态以及数据。
    查看数据