在物联网场景中,经常需要根据设备的状态分析结果及时作出响应。例如某会场需要实时统计最近十秒人流量并立即更新显示屏上的数据。

本文以人流量实时显示为例,详细说明如何使用流数据分析进行人流量的统计,并将计算结果输出到函数计算中,在函数计算中更新显示屏内容。示意图如下所示。

人流量展示

前提条件

请您确保已创建边缘实例,具体操作请参见专业版环境搭建

一、添加设备

参考设备模拟器章节中使用设备模拟器驱动部分的内容,添加如下两个设备。

  • 添加闸口产品和设备:
    1. 新建闸口产品,添加如下表格自定义功能。 添加自定义功能
      参数 描述
      功能类型 选择事件
      标识符 设置为peoplePassed
      事件类型 选择信息
      输出参数 增加一个数据类型int32(整数型)标识符count的关于通过人数统计的输出参数。
    2. 为闸口产品添加gate01设备,并分配到边缘实例中。
    3. 将闸口产品gate01设备的驱动设为设备模拟器驱动。
  • 添加显示器设备:
    1. 新建显示器产品,添加如下表格自定义功能。
      参数 描述
      功能类型 选择属性
      标识符 设置为content
      数据类型 选择text(字符串)
    2. 为显示器产品添加设备,并分配到边缘实例中。
    3. 将显示器设备的驱动设为设备模拟器驱动。
说明 将设备分配到边缘实例后,请先不要部署边缘实例,待完成其他操作后统一部署。

二、添加流数据分析任务

  1. 创建、设置并发布流数据分析任务,具体操作请参见流数据任务开发

    其中,开发类型选择SQL运行环境选择边缘

    创建任务
  2. 系统自动进入流数据任务开发SQL工作台

    复制如下SQL内容到编辑框中。

    -- 计算每个闸口每十秒的人流量并显示到屏幕上
    
    -- 每个闸口检测到人会上报peoplePassed事件,并附带经过的人数count
    create table event (
        eventCode varchar,
        params varchar,
        productKey varchar,
        deviceName varchar,
        gmtCreate varchar,
        ts varchar,
        tstamp as to_timestamp (cast (ts as bigint)),
        WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
    ) with (
        type = 'edgebus',
        jsonParser = 'device_event'
    );
    
    create table edgebus_sink (
        productKey varchar,
        deviceName varchar,
        flowrate int,
        t_start timestamp,
        t_end timestamp
    ) with (
        type = 'edgebus_sink',
        topic = '/sys/streamCompute/peopleFlowrate'
    );
    
    insert into
        edgebus_sink
    select
        productKey,
        deviceName,
        sum (cast (json_value (params, '$.count') as int)),
        tumble_start (tstamp, interval '10' second),
        tumble_end (tstamp, interval '10' second)
    from
        event
    where
        eventCode = 'peoplePassed'
    group by
        tumble (tstamp, interval '10' second),
        productKey,
        deviceName;
  3. 保存任务并发布。
  4. 将该SQL任务分配到边缘实例中,具体操作请参见分配流数据分析到边缘实例

三、添加函数

  1. 登录阿里云函数计算控制台
  2. 创建服务、并在该服务下创建名为DisplayFlowrate的使用Nodejs8运行环境的函数,具体操作请参见使用控制台创建函数
  3. 创建函数完成后,在线编辑代码,添加如下内容。
    说明 请将your_displaydevice_productKey以及your_displaydevice_deviceName换成实际网关设备的ProductKey和DeviceName。
    const leSdk = require('linkedge-core-sdk');
    
    const iotData = new leSdk.IoTData();
    
    module.exports.handler = function(event, context, callback) {
        const payload = JSON.parse(JSON.parse(event.toString()).payload);
        const params = payload.params;
        iotData.setThingProperties({
            productKey: 'your_display_device_productKey',
            deviceName: 'your_display_device_deviceName',
            payload: {
                content: `flowrate: ${params.flowrate}`
            }
        }, (err) => {
            if (err) {
                console.log(`display flowrate err: ${err}`);
            }
        });
    };
  4. 登录物联网平台控制台,创建边缘应用,具体操作请参见函数计算应用
    应用信息部分参数说明如下 :
    参数 描述
    应用类型 此处选择函数计算
    地域 选择您创建的服务所在的地域。
    服务 选择DisplayFlowrate函数所在的服务名称。
    函数 选择DisplayFlowrate函数。
    授权 选择AliyunIOTAccessingFCRole
    函数配置部分参数说明如下:
    参数 说明
    启用默认设置 选择
    运行模式 选择按需运行
    超时限制(秒) 设置为5秒。
    定时运行 选择关闭

    其余参数无需配置。

  5. 将应用分配到边缘实例,具体操作请参见分配边缘应用到边缘实例

四、添加消息路由

  1. 在边缘实例的实例详情页面,选择消息路由,将闸口设备的数据路由到人流量计算的流数据分析任务中。
    添加消息路由
  2. 将人流量计算的流数据分析任务路由到DisplayFlowrate函数计算中。消息主题过滤设置为/sys/streamCompute/peopleFlowrate
    添加消息路由
  3. 将显示器的属性变换数据路由到IoTHub中,方便在云端查看显示器设备的状态。
    添加消息路由

五、部署边缘实例

  1. 在边缘实例的实例详情页面,单击部署,将子设备、流数据分析作业及消息路由下发到边缘端。
  2. 实例详情页面,选择网关页签,单击远程SSH终端,登录远程控制台。
    说明 需要您先打开网关名称右侧的远程访问按钮,远程SSH终端才可以使用。
  3. 进入/linkedge/gateway/build/bin目录,执行如下命令上报有人员通过的事件。
    ./ds_ctrl event a1Wn0t8**** gate01 peoplePassed '{"count": 1}'
  4. 返回物联网平台控制台,左侧导航栏选择设备管理 > 设备,在相应设备名称右侧单击查看
    设备详情页面,选择物模型数据 > 运行状态,查看显示器设备的运行状态以及数据。