本文以订单结算为例,详细说明如何通过流数据分析的维表Join功能,将物联网设备产生的数据(识别器识别的商品及订单编号)和数据库中的数据(商品价格表)进行连接。
示意图如下所示。
背景信息
物联网设备的产生的数据大多都是一些动态的属性信息,很多时候我们需要将设备的动态属性和更多维度的数据(如设备产地、型号,商品的颜色、价格等)进行关联,才能计算出我们想要的结果。
例如,某无人小店有一台商品自助结算机,包含商品识别器及价格显示屏。识别器每识别到一个商品会将商品及订单编号通过事件的方式上报,显示屏实时显示当前累积总价,而当前累积总价需要将识别器上报的数据和数据库里的商品价格表关联后计算得出。
一、添加设备
参考设备模拟器章节中使用设备模拟器驱动部分的内容,添加如下两个设备。
- 添加商品识别器设备:
- 新建商品识别器产品,添加如下表格自定义功能。
参数 |
描述 |
功能类型 |
选择事件。
|
标识符 |
设置为recognizedGoods 。
|
事件类型 |
选择信息。
|
输出参数 |
增加如下两个数据类型为int32(整数型)的输出参数。
- 商品编号
goodsId 。
- 订单编号
orderId 。
|
- 为商品识别器产品添加设备,并分配进边缘实例中。
- 将商品识别器设备的驱动设为设备模拟器驱动。
- 添加显示器设备:
- 新建显示器产品,添加如下表格自定义功能。
参数 |
描述 |
功能类型 |
选择属性。
|
标识符 |
设置为content 。
|
数据类型 |
选择text(字符串)。
|
- 为显示器产品添加设备,并分配进边缘实例中。
- 将显示器设备的驱动设为设备模拟器驱动。
说明 将设备分配到边缘实例后,请先不要部署边缘实例,待完成其他操作后统一部署边缘实例。
二、添加流数据分析任务
- 创建、设置并发布流数据分析任务,具体操作请参见流数据任务开发。
其中,开发类型选择SQL,运行环境选择边缘。
- 系统自动进入流数据任务开发SQL工作台。
复制如下SQL内容到编辑框中。
create table event (
eventCode varchar,
params varchar,
ts varchar,
tstamp as to_timestamp (cast (ts as bigint)),
WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
type = 'edgebus',
jsonParser = 'device_event'
);
CREATE TABLE goods_info(
id bigint,
name varchar,
price double,
PRIMARY KEY(id),
PERIOD FOR SYSTEM_TIME)
WITH(
type= 'rds',
url='jdbc:mysql://your_db_host:your_db_port/your_db_name', -- 请将your_db_host、your_db_port、your_db_name替换成真实值
userName='config://local_db_username', -- 通过./tool_config -s local_db_username xxx 设置
password='config://local_db_password', -- 通过./tool_config -s local_db_password xxx 设置
tableName='table_case_06_goods_info',
cache='LRU',
cacheSize='1000',
cacheTTLMs='600000'
);
CREATE TABLE edgebus_sink (
orderId bigint,
orderPrice double
) WITH (
type='edgebus_sink',
topic = '/sys/streamCompute/orderPrice'
);
create view scan_goods as
select
cast (json_value(params, '$.goodsId') as bigint) as goodsId,
cast (json_value(params, '$.orderId') as bigint) as orderId
from
event
where
eventCode = 'recognizedGoods';
insert into edgebus_sink
select
orderId,
sum(goods_info.price)
from
scan_goods
join
LATERAL goods_info FOR SYSTEM_TIME AS OF PROCTIME()
ON
goods_info.id = scan_goods.goodsId
group by
orderId;
- 保存任务并发布。
- 将该SQL任务分配到边缘实例中,具体操作请参见分配流数据分析到边缘实例。
三、添加函数
- 创建服务,并在该服务下创建名为DisplayPrice的使用Nodejs8运行环境的函数,具体操作请参见使用控制台创建函数。
- 创建函数完成后,在线编辑代码,添加如下内容。
说明 请将your_displaydevice_productKey以及your_displaydevice_deviceName换成实际显示器设备的ProductKey和DeviceName。
const leSdk = require('linkedge-core-sdk');
const iotData = new leSdk.IoTData();
module.exports.handler = function(event, context, callback) {
const payload = JSON.parse(JSON.parse(event.toString()).payload);
const time = payload.time;
const params = payload.params;
iotData.setThingProperties({
productKey: 'your_displaydevice_productKey',
deviceName: 'your_displaydevice_deviceName',
payload: {
content: `orderId: ${params.orderId}, orderPrice: ${params.orderPrice}, time: ${time}`
}
}, (err) => {
if (err) {
console.log(`display price err: ${err}`);
}
});
};
- 创建边缘应用,具体操作请参见函数计算应用。
应用信息部分参数说明如下 :
参数 |
描述 |
应用类型 |
此处选择函数计算。
|
地域 |
选择您创建的服务所在的地域。 |
服务 |
选择DisplayPrice函数所在的服务名称。
|
函数 |
选择DisplayPrice函数。
|
授权 |
选择AliyunIOTAccessingFCRole。
|
函数配置部分参数说明如下:
参数 |
说明 |
启用默认配置 |
选择否。
|
运行模式 |
选择按需运行。
|
超时限制(秒) |
设置为5秒。 |
定时运行 |
使用默认配置:关闭。
|
其余参数无需配置。
- 将应用分配到边缘实例,具体操作请参见分配边缘应用到边缘实例。
四、添加消息路由
- 在边缘实例的实例详情页面,选择消息路由,将商品识别器的数据路由到订单结算的流数据分析任务中,具体操作请参见设置消息路由。
- 将订单结算的流数据分析任务路由到DisplayPrice函数计算中。消息主题过滤设置为
/sys/streamCompute/orderPrice
。
- 将显示器的属性变换数据路由到IoTHub中,方便在云端查看显示器设备的状态。
五、创建商品价格表
若您已经操作过温度平均值计算示例并完成了该章节中五、(可选)保存温度平均值计算结果到数据库这一步内容,则在本文步骤中可以直接跳到子步骤5. 创建商品价格表操作。
- (可选)新建一个RDS实例,或者在搭建边缘环境的设备上安装一个MySQL数据库(也可以跨设备安装,但必须保证两台设备网络互通)。安装命令如下所示。
说明 若已有MySQL数据库(本地数据库或RDS),可直接跳过本步骤。
sudo docker run --name mysql -p your_db_port:3306 -e MYSQL_ROOT_PASSWORD=your_db_password -d mysql:5.6.35
其中,your_db_port为在宿主机(即启动mysql docker的机器)访问数据库的端口号,your_db_password为数据库root账户的密码,请根据实际情况更改。例如,端口号为1234,密码为xxxxxxxx,则实例执行的命令如下。
sudo docker run --name mysql -p 1234:3306 -e MYSQL_ROOT_PASSWORD=xxxxxxxx -d mysql:5.6.35
- 为了保证数据库用户名密码的安全性,您需要把数据库用户名密码保存在配置中心(会加密后再存储)。按如下方式设置。
- 登录物联网平台控制台,在左侧导航栏中选择。
- 在相应的边缘实例右侧单击查看,进入实例详情页面,选择网关页签,单击网关右侧的远程SSH终端,登录远程控制台,执行如下命令。
说明 需要您先在网关页签下打开远程访问按钮,远程SSH终端才可以使用。
cd /linkedge/gateway/build/bin
./tool_config -s local_db_username root
./tool_config -s local_db_password xxxxxxxx
- 查看数据库host,即安装MySQL的宿主机IP。在宿主机上执行
ifconfig
命令可看到类似如下结果。
lo0: flags=8049<UP,LOOPBACK,RUNNING,MULTICAST> mtu 16384
options=1203<RXCSUM,TXCSUM,TXSTATUS,SW_TIMESTAMP>
inet 127.0.0.1 netmask 0xff000000
inet6 ::1 prefixlen 128
inet6 fe80::1%lo0 prefixlen 64 scopeid 0x1
nd6 options=201<PERFORMNUD,DAD>
gif0: flags=8010<POINTOPOINT,MULTICAST> mtu 1280
stf0: flags=0<> mtu 1280
XHC0: flags=0<> mtu 0
XHC20: flags=0<> mtu 0
en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
ether f0:18:98:37:81:b4
inet6 fe80::1c6e:7a87:1c27:724d%en0 prefixlen 64 secured scopeid 0x6
inet 192.168.83.169 netmask 0xfffff000 broadcast 192.168.95.255
nd6 options=201<PERFORMNUD,DAD>
media: autoselect
status: active
其中,inet参数后的192.168.83.169
为宿主机对外IP,即数据库host(用于流数据分析任务中替换your_db_host
)。
- 下载mysql-client命令行工具或者mysql-client图形化工具(如Navicat、Sequel Pro),进行MySQL数据库的访问,并创建
iot_data
数据库,用于在流数据分析任务中替换your_db_name
参数 。
CREATE DATABASE iot_data;
USE iot_data;
- 创建商品价格表。
CREATE TABLE table_case_06_goods_info(
id INT UNSIGNED NOT NULL PRIMARY KEY,
name varchar(128),
price double
);
INSERT INTO table_case_06_goods_info
(id, name, price)
VALUES
(1, 'Coke', 3.00),
(2, 'Sprite', 4.00),
(3, 'Juice', 5.00),
(4, 'Coffee', 6.00),
(5, 'Tea', 7.00);
六、部署边缘实例
- 在边缘实例的实例详情页面,单击部署,将子设备、流数据分析作业及消息路由下发到边缘端。
- 在实例详情页面,选择网关页签,单击远程SSH终端,登录远程控制台。
说明 需要您先打开网关名称右侧的远程访问按钮,远程SSH终端才可以使用。
- 进入/linkedge/gateway/build/bin目录,执行如下命令上报识别到的商品识别器事件。
./ds_ctrl event a1ivc15**** goodsRecognizer01 recognizedGoods '{"orderId": 12, "goodsId": 1}'
- 返回物联网平台控制台,左侧导航栏选择,在相应设备名称右侧单击查看。
在设备详情页面,选择,查看显示器设备的运行状态以及数据。