本文以一个典型的开源时序型数据库(InfluxDB)为例,描述如何使用边缘函数计算应用,实现时序存储设备数据。
前提条件
- 请您确保已完成边缘实例的创建,具体操作,请参见专业版环境搭建。
- 您已创建光照度传感器产品以及该产品下的LightSensor设备,并将设备分配至边缘实例,具体操作,请参见示例驱动。
- 您已创建容器镜像类型的边缘应用,具体操作,请参见容器镜像应用。
其中,部分参数设置按如下要求配置。
表 1. 应用信息参数说明 参数 说明 应用类型 此处选择容器镜像。 仓库类型 此处选择公共仓库 。 镜像地址 设置为 influxdb:latest
。应用版本 设置为当前使用的数据库版本号,例如1.7.9版本。 表 2. 容器配置参数说明 参数 描述 是否使用宿主机host模式 此处选择否。 网络端口映射 此处按如下说明,设置两条网络端口映射: - 第一条网络端口映射:
- 宿主机端口:8083
- 容器内端口:8083
- 类型:TCP
- 第二条网络端口映射:
- 宿主机端口:8086
- 容器内端口:8086
- 类型:TCP
是否启动特权模式 此处选择是。 卷映射 此处按如下说明,设置卷映射: - 源路径:/tmp/influxdb
- 目的路径:/var/lib/influxdb
- 读写权限:读写
- 第一条网络端口映射:
背景信息
在楼宇、工业、农业等物联网场景中,现场的各类传感器负责实时采集数据(如温、湿度、空气质量等数据)。这些数据会随着时间的变化不断更新,因此也称之为时序数据。若需要对此类数据进行大数据分析和看板展示数据等操作,您首先需要将其存储到时序数据库中。物联网边缘计算提供的函数计算应用能力,可帮助您快速地将模型化的设备数据存储到时序数据库中。在此过程中,只需配置该函数计算应用的消息路由并部署应用,无需二次开发。
本文中以LightSensor传感器为例,将获取到的LightSensor传感器数据,按照时间顺序存储到InfluxDB中。
步骤一:创建应用函数
- 下载应用函数代码包fcApp_influxdb_v1.0.zip。
- 登录函数计算控制台。如尚未开通该服务,请阅读并选中我已阅读并同意,单击立即开通,开通服务。
- (可选)在左侧导航栏单击服务及函数,在服务及函数页面服务列表区域,单击新增服务,创建一个服务。其中,服务名称必须填写,此处设置为EdgeFC,其余参数可根据您的需求设置,也可以不设置。说明
- 若您首次在函数计算中创建服务,请根据配置向导配置参数。
- 若已操作过其他应用场景示例或小程序示例,即已创建EdgeFC服务,则无需重复创建。
- 创建服务成功后,在服务及函数页面的EdgeFC区域,单击新增函数。
- 在新建函数页面,单击事件函数区域中的配置部署。
- 设置应用函数的基础管理配置参数。
参数 描述 函数类型保持默认选项。所在服务选择已创建的EdgeFC服务。函数名称设置为influxdbStore。 运行环境设置函数的运行环境并选择上传代码的方式。此示例中选择Python 3。 在上传代码右侧选择上传代码包,单击上传代码,上传步骤1中下载的fcApp_influxdb_v1.0.zip代码包。
函数入口使用默认值index.handler。其余参数的值请根据您的实际需求设置,也可以不设置。更多信息,请参见函数计算。
确认函数信息后,单击新建完成操作。
- 创建函数完成后,系统自动跳转到函数详情页面。您可在代码执行页签的代码执行管理区域下,选中在线编辑单选框,查看源码。说明 influxdbStore样例代码逻辑分为三步。
- 从收到的设备上报数据(event参数)中解析出productKey、deviceName和attribute。
event_json = json.loads(event) if(event_json["topic"].startswith('/sys')) pk, dn, attribute = parseTopic(event_json['topic'])
- 解析payload(设备的属性或者事件)。
payload_json = json.loads(event_json["payload"])
- 将属性或者事件数据封装成InfluxDB存储的格式,数据格式定义请参见本文下方附录:数据格式定义。此处依赖了Python封装的InfluxDB客户端接口。
- 从收到的设备上报数据(event参数)中解析出productKey、deviceName和attribute。
步骤二:分配函数到边缘实例
- 登录边缘计算控制台。
- 在左侧导航栏单击应用管理。
- 使用本文上方步骤一中已创建的函数,创建函数计算类型的边缘应用。具体操作,请参见函数计算应用。
应用信息参数说明如下:
参数 描述 应用名称 设置您应用的名称,例如influxdbStore。 应用类型 选择函数计算。 地域 选择您创建的服务所在的地域。 服务 选择EdgeFC服务。 函数 选择influxdbStore函数。 授权 选择AliyunIOTAccessingFCRole。 应用版本 设置应用的版本,必须是该应用唯一的版本号,即一个应用不可以设置两个相同的版本号。 函数配置说明如下:
参数 描述 启用默认配置 选择否。 运行模式 运行模式有两种。此处选择持续运行模式。程序部署后会立即执行。 超时限制(秒) 函数收到事件后的最长处理时间,此处使用默认值5秒。如超过该时间函数仍未返回结果,该函数计算程序将会被强制重启。 定时运行 使用默认配置:关闭。 环境变量 单击新增环境变量,添加如下方“环境变量配置”表格所示的环境变量。 表 3. 环境变量配置 名称 值 host InfluxDB的访问地址。 port InfluxDB的访问端口,默认值为8086。 username InfluxDB的用户名,默认值为root。 password InfluxDB的密码,默认值为root。 database InfluxDB的数据库名称,默认值为example。 其余参数无需配置。
- 在左侧导航栏单击边缘实例。
- 在本文“前提条件”中创建的边缘实例右侧,单击查看。
- 在实例详情页面的边缘应用页签,单击分配应用。
- 在分配应用面板,找到已创建的应用函数influxdbStore,单击对应操作栏中的分配,然后单击关闭。
步骤三:配置消息路由
添加消息的详细步骤及各个参数的解释。详细信息,请参见设置消息路由。
- 在实例详情页面,选择消息路由,单击添加路由,添加LightSensor设备到函数计算的消息路由。
- 按照界面提示,设置如下参数,参数设置完成后,单击确认。
参数 描述 路由名称 设置一个消息路由名称。 消息来源 此处选择设备,选择 。消息主题过滤 此处选择全部。 消息目标 此处选择边缘应用和EdgeFC/influxdbStore函数。
步骤四:部署边缘实例
- 在实例详情页面,单击右上角部署后,在弹出对话框中单击确定,将子设备、函数计算等资源下发到边缘端。
- 通过SSH登录您的网关,执行tail -f /linkedge/run/logger/fc-base/influxdbStore/log.INFO命令,可以查看该函数的日志,来观察其实际运行情况。
附录:数据查询
当数据存储到InfluxDB后,可通过InfluxDB API查询已存储的数据。部分数据查询示例如下所示,更多详情,请参见InfluxDB API文档。
- 以产品为维度,查询某个属性所有时间点的值,返回结果会以时间顺序列出所有已存储的属性值。
curl -G 'http://localhost:8086/query?db=example&u=root&p=root' --data-urlencode "epoch=ms" --data-urlencode 'q=SELECT * FROM "属性唯一标识符(产品下唯一)" WHERE "_productKey" = $pk and "_dataType" = $type' --data-urlencode 'params={"pk":"产品ProductKey", "type": "property"}' //响应结果 { "results": [{ "statement_id": 0, "series": [{ "name": "temp", "columns": ["time", "_dataType", "_deviceName", "_productKey", "value"], "values": [ [1570782663561, "property", "0987654321", "产品ProductKey", 100.1], [1570782688527, "property", "0987654321", "产品ProductKey", 100.2], [1570782851708, "property", "0987654321", "产品ProductKey", 104.4] ] }] }] }
- 以设备为维度,查询某个属性所有时间点的值,返回结果会以时间顺序列出所有已存储的属性值。
curl -G 'http://localhost:8086/query?db=example&u=root&p=root' --data-urlencode "epoch=ms" --data-urlencode 'q=SELECT * FROM "属性唯一标识符(产品下唯一)" WHERE "_deviceName" = $dn and "_dataType" = $type' --data-urlencode 'params={"dn":"设备名称", "type": "property"}'
- 以设备为维度,查询某个属性在一定时间范围内的值,返回结果会以时间顺序列出该时间范围内所有已存储的属性值。
curl -G 'http://localhost:8086/query?db=example&u=root&p=root' --data-urlencode "epoch=ms" --data-urlencode 'q=SELECT * FROM "属性唯一标识符(产品下唯一)" WHERE "_productKey" = $pk and "_dataType" = $type and "time" > $timestamp' --data-urlencode 'params={"pk":"产品ProductKey", "type": "property", "timestamp": "2019-10-11T09:03:15.611Z"}'
附录:数据格式定义
设备的属性或事件类型数据在InfluxDB的存储格式如下所示。
- 单值式属性
[ { "measurement": "temp", // 属性名 "tags": { // 属性的标签 "_productKey": "1234567890", // 设备证书信息productKey "_deviceName": "0987654321", // 设备证书信息deviceName "_dataType" : "property" // 类型:property表示属性,event表示事件 }, "time": 1346846400000, // 时间戳,单位:ms "fields": { "value": 123 // 属性的值 } } ... ]
- 结构体式属性
[ { "measurement": "temp", // 属性名 "tags": { // 属性的标签 "_productKey": "1234567890", // 设备证书信息productKey "_deviceName": "0987654321", // 设备证书信息deviceName "_dataType" : "property" // 类型:property表示属性,event表示事件 }, "time": 1346846400000, // 时间戳,单位:ms "fields": { "aa": 12, // 属性的值 "bb": 12, // 属性的值 "cc": 12, // 属性的值 "dd": 12, // 属性的值 } } ... ]
- 事件
[ { "measurement": "event123", // 事件名 "tags": { // 事件的标签 "_productKey": "1234567890", // 设备证书信息productKey "_deviceName": "0987654321", // 设备证书信息deviceName "_dataType" : "event" // 类型:property表示属性,event表示事件 }, "time": 1346846400000, // 时间戳,单位:ms "fields": { "speed" : 20.8, "level" : 4, "direction" : "East", } } ... ]
文档内容是否对您有帮助?