全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
阿里云物联网套件

IoT对接大数据实践-datahub流计算

更新时间:2017-10-18 14:31:51

场景:实时统计某产品下活跃设备数量,并得到一些实时图表。

这里活跃设备的定义是:设备端在最近N分钟内发送过消息。

为什么要用datahub:对于这种场景,我们需要一个算法,并且需要一定的历史数据累积,比如这个活跃的定义是通过某N分钟内有过多少条消息行为才能算为活跃数,传统做法关系型数据库需要存储很大量的数据,计算复杂度大大增加。而datahub是我们大数据的存储入口,通过把数据流转到大数据平台,结合streamsql即可实现实时流计算,最终再配合大数据可视化产品dataV配置出你想要的大屏。

思路:使用规则引擎,将该产品下所有设备发送的消息转发到datahub,再经过流计算处理,在dataV可视化大屏实时展示设备数量。1

1、开通datahub服务,登录datahub控制台,创建project,创建topic。6

2、开通iot套件,登录控制台,创建产品、设备、规则 (以华东2为例)

2

3

编写规则sql

4

配置规则动作(将步骤1创建的datahub的topic作为规则的转发目的地)

5

启动规则

3、使用模拟设备端,发送消息。具体参见设备端接入手册7

4、套件跟datahub对接检验

检验设备发送的消息是否成功流转至datahub中,可以通过日志服务,也可以直接去datahub控制台对应的topic中,查看Shards中数据量的变化,数据抽样功能可以看到具体的消息内容。

4.1、日志服务检查9

10

这条消息的流转路径,全都能查询出来。(这里有三个datahub的记录,是因为这个规则配置了三条往datahub转发的动作)

4.2、datahub数据验证8

消息内容跟设备发送的消息内容一致。

至此,套件跟datahub对接已经成功。

5、接入流计算

根据datahub中,设备发送消息记录的流式数据表,需要计算出一个指标:截至到当前时间,发送了消息的设备数量。这个指标经过流计算,写入在线rds系统中,并通过dataV大屏展示。Rds表设计如下:

11

5.1、前期准备:登录Rds控制台,登录数据库,创建rds表格12

13

5.2、流计算控制台新建作业14

5.3、注册存储

  • datahub数据存储15

其中:project为自己申请的project名称

  • rds存储

16

5.4、作业开发引用数据存储

DataHub的Topic作为我们数据输入源,点击选择”作为输入表引用”,流计算将自动解析Topic的Schema,并自动添加对应的SQL到IDE:

17

RDS的表作为我们的数据结果输出,点击选择“作为结果表引用”,流计算将自动解析RDS的表Schema,并自动添加对应的SQL到IDE

18

5.5、编写streamSQL

  1. REPLACE INTO activity_device
  2. SELECT
  3. from_unixtime(FLOOR(dm.msgtime/1000), 'yyyy-MM-dd') as d_data,
  4. count(DISTINCT dm.devicename) as device_num
  5. FROM device_message dm
  6. group by from_unixtime(FLOOR(dm.msgtime/1000), 'yyyy-MM-dd');

5.6、调试sql

22

23

得到的结果跟数据预期一致,表示sql没有问题

5.7、上线、启动作业

19

20

5.7、验证

使用不同的设备登录,发送消息,查看rds中数据变化,每使用一个新的设备发送消息,当天rds中设备数量会加1,跟预期结果一致,表示iot-datahub-stream对接成功。

6、dataV可视化接入

开通dataV服务,登录dataV控制台

6.1、新建数据 将rds对应的数据库连接信息配置数据存储

24

6.2、新建可视化,配置数据

dataV提供了很多的大屏模板,样式也都可以自己调节,这里以最简单的折线图为例。具体的配置说明这里不详细说明,可以参看dataV提供的官网文档学习。

配置完成之后,使用不同的设备登录,并发送消息,大屏展示的活跃设备数会实时改变。

25

本文导读目录