本文以一个典型的开源时序型数据库(InfluxDB)为例,描述如何使用边缘函数计算应用,实现时序存储设备数据。

前提条件

  • 请您确保已根据专业版环境搭建内容完成边缘实例的创建。
  • 根据示例驱动内容创建光照度传感器产品以及该产品下的LightSensor设备,并将设备分配至边缘实例。
  • 请执行如下命令下载并安装Docker版的Influxdb,并获取其IP地址和端口。
    docker run -d -p 8083:8083 -p 8086:8086 --privileged=true -v /tmp/influxdb:/var/lib/influxdb --name influxDbService influxdb:latest

背景信息

在楼宇、工业、农业等物联网场景中,现场的各类传感器负责实时采集数据(如温、湿度、空气质量等数据)。这些数据会随着时间的变化不断更新,因此也称之为时序数据。若需要对此类数据进行大数据分析和看板展示数据等操作,您首先需要将其存储到时序数据库中。

物联网边缘计算提供的函数计算应用能力,可帮助您快速地将模型化的设备数据存储到时序数据库中。在此过程中,只需配置该函数计算应用的消息路由并部署应用,无需二次开发。

本文中以LightSensor传感器为例,将获取到的LightSensor传感器数据,按照时间顺序存储到InfluxDB中。

一、创建应用函数

  1. 下载应用函数代码包。
  2. 登录阿里云函数计算控制台
    如尚未开通该服务,请阅读并勾选我已阅读并同意内容,单击立即开通,开通服务。
  3. (可选)左侧导航栏单击服务-函数,在新建函数后的下拉框中选择新建服务,根据界面提示配置参数后,单击确定创建一个服务。
    其中,服务名称必须填写,此处设置为EdgeFC,其余参数可根据您的需求设置,也可以不设置。
    说明 若已操作过其他应用场景示例或小程序示例,即已创建EdgeFC服务,则无需重复创建。
  4. 创建服务成功后,在服务-函数页面单击新建函数,并使用事件函数方式创建函数。
  5. 设置应用函数的基础管理配置参数。
    参数 描述
    所在服务 选择已创建的EdgeFC服务。
    函数名称 设置为influxdbStore
    运行环境 设置函数的运行环境,此示例中选择python3
    函数入口 使用默认值index.handler
    函数执行内存 设置为512 MB。
    超时时间 设置为10秒。

    其余参数的值请根据您的需求,参见函数计算设置,也可以不设置。

    确认函数信息后,单击完成

  6. 创建函数完成后,系统自动跳转到函数详情页面。请在代码执行页签下选择代码包上传,单击选择文件,上传步骤1中下载的fcApp_influxdb_v1.0.zip代码包,然后单击保存
    代码包上传成功后,您可在在线编辑框中查看源码。在线编辑
    说明 influxdbStore样例代码逻辑分为三步。
    1. 从收到的设备上报数据(event参数)中解析出productKey、deviceName和attribute。
      event_json = json.loads(event)
      if(event_json["topic"].startswith('/sys'))
        pk, dn, attribute = parseTopic(event_json['topic'])
    2. 解析payload(设备的属性或者事件)。
      
      payload_json = json.loads(event_json["payload"])
    3. 将属性或者事件数据封装成InfluxDB存储的格式,数据格式定义请参见本文下方数据格式定义。此处依赖了Python封装的InfluxDB客户端接口。

二、分配函数到边缘实例

  1. 物联网平台控制台左侧导航栏,选择边缘计算 > 应用管理
  2. 参考函数计算应用内容,使用本文上方步骤一中已创建的函数,创建函数计算类型的边缘应用。

    应用信息参数说明如下:

    参数 描述
    应用名称 设置您应用的名称,例如influxdbStore
    应用类型 选择函数计算
    地域 选择您创建的服务所在的地域。
    服务 选择EdgeFC服务。
    函数 选择influxdbStore函数。
    授权 选择AliyunIOTAccessingFCRole
    应用版本 设置应用的版本,必须是该应用唯一的版本号,即一个应用不可以设置两个相同的版本号。

    函数配置说明如下:

    参数 描述
    运行模式 运行模式有两种。此处选择持续运行模式。程序部署后会立即执行。
    内存限制(MB) 函数运行可使用的内存资源上限,单位为MB。此处设置为512 MB。当函数使用内存超出该限制时,该函数计算程序会被强制重启。
    超时限制(秒) 函数收到事件后的最长处理时间,此处使用默认值5秒。如超过该时间函数仍未返回结果,该函数计算程序将会被强制重启。
    定时运行 使用默认配置:关闭
    环境变量 单击新增环境变量,添加如下方“环境变量配置”表格所示的5条环境变量。
    表 1. 环境变量配置
    名称
    host InfluxDB的访问地址。
    port InfluxDB的访问端口,默认值为8086。
    username InfluxDB的用户名,默认值为root。
    password InfluxDB的密码,默认值为root。
    database InfluxDB的数据库名称,默认值为example。

    其余参数无需配置。

  3. 左侧导航栏选择边缘计算 > 边缘实例
  4. 在本文“前提条件”中完成的边缘实例右侧,单击查看
  5. 实例详情页面,选择边缘应用,单击分配应用
  6. 将已创建的应用函数influxdbStore分配到边缘实例中,单击关闭

三、配置消息路由

添加消息的详细步骤及各个参数的解释,请参见设置消息路由

  1. 实例详情页面,选择消息路由,单击添加路由,添加LightSensor设备到函数计算的消息路由。
  2. 按照界面提示,设置如下参数,参数设置完成后,单击确定
    参数 描述
    路由名称 设置一个消息路由名称。
    消息来源 此处选择设备,选择光照度传感器 > LightSensor
    消息主题过滤 此处选择全部
    消息目标 此处选择函数计算EdgeFC/influxdbStore函数。

四、部署边缘实例

  1. 实例详情页面,单击右上角部署后在弹出框中单击确定,将子设备、函数计算等资源下发到边缘端。
    您可以通过单击部署详情来查看部署进度及结果。
  2. 登录您的网关,执行tail -f /linkedge/run/logger/fc-base/influxdbStore/log.INFO命令,可以查看该函数的日志,来观察其实际运行情况。
    代码片段

数据查询

当数据存储到InfluxDB后,可通过InfluxDB API查询已存储的数据。部分数据查询示例如下所示,更多详情请参见InfluxDB API文档

  • 以产品为维度,查询某个属性所有时间点的值,返回结果会以时间顺序列出所有已存储的属性值。
    curl -G 'http://localhost:8086/query?db=example&u=root&p=root' --data-urlencode "epoch=ms" --data-urlencode 'q=SELECT * FROM "属性唯一标识符(产品下唯一)" WHERE "_productKey" = $pk and "_dataType" = $type' --data-urlencode 'params={"pk":"产品ProductKey", "type": "property"}'
    
    //响应结果
    {
    	"results": [{
    		"statement_id": 0,
    		"series": [{
    			"name": "temp",
    			"columns": ["time", "_dataType", "_deviceName", "_productKey", "value"],
    			"values": [
    				[1570782663561, "property", "0987654321", "产品ProductKey", 100.1],
    				[1570782688527, "property", "0987654321", "产品ProductKey", 100.2],
    				[1570782851708, "property", "0987654321", "产品ProductKey", 104.4]
    			]
    		}]
    	}]
    }
  • 以设备为维度,查询某个属性所有时间点的值,返回结果会以时间顺序列出所有已存储的属性值。
    curl -G 'http://localhost:8086/query?db=example&u=root&p=root' --data-urlencode "epoch=ms" --data-urlencode 'q=SELECT * FROM "属性唯一标识符(产品下唯一)" WHERE "_deviceName" = $dn and "_dataType" = $type' --data-urlencode 'params={"dn":"设备名称", "type": "property"}'
  • 以设备为维度,查询某个属性在一定时间范围内的值,返回结果会以时间顺序列出该时间范围内所有已存储的属性值。
    curl -G 'http://localhost:8086/query?db=example&u=root&p=root' --data-urlencode "epoch=ms" --data-urlencode 'q=SELECT * FROM "属性唯一标识符(产品下唯一)" WHERE "_productKey" = $pk and "_dataType" = $type and "time" > $timestamp' --data-urlencode 'params={"pk":"产品ProductKey", "type": "property", "timestamp": "2019-10-11T09:03:15.611Z"}'

数据格式定义

设备的属性或事件类型数据在InfluxDB的存储格式如下所示。

  • 单值式属性
    [
        {
          "measurement": "temp",            // 属性名
          "tags": {                         // 属性的标签
            "_productKey": "1234567890",    // 设备证书信息productKey
            "_deviceName": "0987654321",    // 设备证书信息deviceName
            "_dataType"  : "property"       // 类型:property表示属性,event表示事件
          },
          "time": 1346846400000,            // 时间戳,单位:ms
          "fields": {
            "value": 123                    // 属性的值
          }
        }
        ...
    ]
  • 结构体式属性
    [
        {
          "measurement": "temp",            // 属性名
          "tags": {                         // 属性的标签
            "_productKey": "1234567890",    // 设备证书信息productKey
            "_deviceName": "0987654321",    // 设备证书信息deviceName
            "_dataType"  : "property"       // 类型:property表示属性,event表示事件
          },
          "time": 1346846400000,            // 时间戳,单位:ms
          "fields": {
            "aa": 12,                       // 属性的值
            "bb": 12,                       // 属性的值
            "cc": 12,                       // 属性的值
            "dd": 12,                       // 属性的值
          }
        }
        ...
    ]
  • 事件
    [
        {
          "measurement": "event123",        // 事件名
          "tags": {                         // 事件的标签
            "_productKey": "1234567890",    // 设备证书信息productKey
            "_deviceName": "0987654321",    // 设备证书信息deviceName
            "_dataType"  : "event"          // 类型:property表示属性,event表示事件
          },
          "time": 1346846400000,            // 时间戳,单位:ms
          "fields": {
            "speed" : 20.8,
            "level" : 4,
            "direction" : "East",
          }
        }
        ...
    ]