EMQX数据接入

将模拟温湿度数据通过MQTT协议上报到EMQX Platform后,使用EMQX Platform数据集成将数据转存到表格存储(Tablestore)。

前提条件

  • EMQX Platform

    EMQX PlatformEMQ推出的一款面向物联网领域的 MQTT 消息中间件产品。作为全球首个 MQTT 5.0 消息云服务,EMQX Platform 提供了一站式运维代管、独有隔离环境的 MQTT 消息服务。在万物互联的时代,EMQX Platform 可以帮助您快速构建面向物联网领域的行业应用,轻松实现物联网数据的采集、传输、计算和持久化。
    • 已在EMQX Platform上创建专有版EMQX集群。

    • 已完成与阿里云平台的VPC对等连接的创建,用于VPC之间的网络连接。

      说明

      如果要使用公网IP地址连接资源,请开通NAT网关

    • 已下载MQTT X工具,用于后续模拟数据上报。

  • 表格存储

    • 已开通表格存储服务,并创建表格存储实例、时序表(时序表名为temp_hum)和时间线(度量名称为cloud)。具体操作,请参见通过控制台使用时序模型

    • 已为实例绑定与EMQX Platform建立了对等连接的VPC。具体操作,请参见绑定VPC

    • 已为阿里云账号或者具有表格存储访问权限的RAM用户创建AccessKey(包括AccessKey IDAccessKey Secret)。

步骤一:通过EMQX控制台配置Tablestore资源

  1. 进入数据集成页面。

    1. 登录EMQX Platform控制台

    2. 单击项目。

    3. 在左侧导航栏中,单击数据集成

  2. 新建连接器。

    1. 数据集成页面的数据持久化区域,单击Alibaba表格存储

    2. 新建连接器页面,根据下表说明配置要连接的资源参数。

      参数

      是否必选

      示例值

      描述

      连接器名称

      c-f0****

      由系统自动生成,不可修改。

      备注

      测试Tablestore

      连接器的备注信息。

      端点

      https://myinstance.cn-hangzhou.vpc.tablestore.aliyuncs.com/

      表格存储的服务地址。当使用VPC对等连接进行对接,请使用表格存储的VPC地址;当使用公网进行对接,请使用表格存储的公网地址。

      实例名称

      myinstance

      Tablestore实例名称。

      访问密钥 ID

      LTAI********************

      阿里云账号或者RAM用户的AccessKey ID。

      访问密钥

      ********************************

      阿里云账号或者RAM用户的AccessKey Secret。

      存储模型类型

      时序

      固定取值为时序,不可修改。

      启用TLS

      未启用

      默认未启用TLS。如需通过TLS协议进行数据传输,请打开启用TLS开关并配置TLS信息等。

      高级设置

      -

      一般情况下保持默认配置即可。如需修改连接池大小、启动超时时间、健康检查间隔的配置,请单击高级设置并修改相应配置。

    3. 单击测试连接,检查连接是否正常。

      说明

      如果测试连接时出现错误,请及时检查数据库配置是否正确。

    4. 测试通过后,单击新建

    系统弹出成功新建连接器对话框。

  3. 新建规则。

    完成资源的新建后,您可以直接配置关联的规则来定义如何处理数据。

    1. 成功新建连接器对话框,单击新建规则

      说明

      您也可以在数据集成页面,单击目标连接器操作列的fig_20220914_rule图标来新建规则。

    2. 新建规则页面,输入规则匹配SQL语句。

      如下SQL示例语句表示从temp_hum/emqx主题读取客户端ID、消息体(Payload),并从消息体中分别读取温度和湿度。

      SELECT
      
      payload.temp as temp, 
      payload.hum as hum,
      clientid
      
      FROM
      "temp_hum/emqx"

      您可以打开启用调试开关并根据上述SQL中的字段修改测试数据,然后单击测试,查询结果。

    3. 单击下一步

  4. 新建输出动作。

    1. 根据下表说明配置动作参数。

      image

      参数

      是否必选

      示例值

      描述

      使用连接器

      c-****

      选择新建的Tablestore连接器。

      动作类型

      Alibaba 表格存储

      固定取值为Alibaba 表格存储,不可修改。

      动作名称

      a-******

      固定取值为新建的动作名称,不可修改。

      备注

      输出动作测试

      备注信息。

      数据源

      device_001

      产生时间线的数据源标识,可以为空。

      表名

      temp_hum

      表格存储时序表名称。

      度量名称

      cloud

      时序表的度量名称。时间线数据所度量的物理量或者监控指标的名称,例如temphum,用于表示该时间线记录的是温度或者湿度等。

      标签

      键:region

      值:hangzhou

      数据标签,所有数据都以字符串形式处理。

      字段

      键:clientid

      值:${clientid}

      字段的键值对,系统会自动识别字段的数据类型。表格存储列值支持多种数据类型,包括布尔、整型、浮点数、字符串和二进制。其中字符串数据默认按照二进制数据处理以保证字符集最佳兼容性。

      单击字段区域的添加,并配置字段列、消息的值、是否为整数和是否为二进制

      时间戳

      1663298365000000

      该行时序数据对应的时间,格式为时间戳,单位为微秒(us)。

      默认值为消息到达EMQX的微秒时间戳。

      重要

      EMQXMQTT消息中默认带有毫秒精度的时间戳,不可直接使用。

      元数据更新模式

      MUM_NORMAL

      时序元数据的更新模式。取值范围如下:

      • MUM_NORMAL(默认):正常模式。

      • MUM_IGNORE:不更新元数据。

      高级设置

      -

      一般情况下保持默认配置即可。

      如需修改缓存池大小、请求超期时间、健康检查间隔、缓存队列最大长度等配置,请单击高级设置并按需修改相应配置。

    2. 单击确认

      系统弹出成功新建规则对话框。

  5. 测试规则。

    规则创建完成后,测试规则是否正确。

    1. 成功新建规则对话框,单击前往测试规则

      说明

      您也可以在数据集成页面,单击目标规则名称,然后在设置页签下打开启用调试开关进行规则测试。

    2. 如果未启用调试,请打开启用调试开关。

    3. 测试规则页签,单击开始测试

    4. 单击输入模拟数据,然后在模拟数据面板配置客户端ID、主题和Payload。

      image

    5. 单击提交测试

      系统会在测试规则页签中显示测试结果信息。您可以在表格存储控制台查看存储到表格存储中的数据。

      image

步骤二:(可选)通过MQTT X工具模拟温湿度数据上报

通过MQTT X工具模拟温湿度数据上报,以测试数据存储效果。

  1. 创建认证信息。

    创建认证信息后,您可以通过MQTT X工具使用该用户连接EMQX Platform。

    1. 登录EMQX Platform控制台

    2. 单击项目。

    3. 在左侧导航栏,选择访问控制 > 客户端认证

    4. 默认认证页签,单击添加

    5. 添加认证对话框,设置用户名和密码,单击确认

  2. 获取EMQX集群的部署地址和端口。

    获取部署地址用于连接部署的EMQX集群。

    1. 登录EMQX Platform控制台

    2. 单击项目。

    3. 概览页面,获取并记录连接地址和连接端口信息。

  3. 使用MQTT X工具模拟温湿度数据上报。

    1. 下载并安装MQTT X工具。

    2. 打开MQTT X配置工具,单击New Connection

    3. 根据下表说明配置基础连接参数,其他保持默认即可。

      image

      参数

      是否必选

      示例值

      描述

      Name

      emqxtest

      自定义MQTT X工具与EMQX Platform建立的连接名称。

      Host

      p*******.***.*********.aliyun.emqxcloud.cn

      EMQX集群的部署地址。请填写通过EMQX Platform控制台获取到的连接地址

      Port

      1883

      EMQX集群的MQTT服务端口。请填写通过EMQX Platform控制台获取到的连接端口。

      Username

      test

      连接EMQX Cloud时使用的用户名和密码。请填写通过EMQX Platform控制台创建认证信息时设置的用户名和密码。

      Password

      ********

    4. 单击Connect

  4. 发送测试数据。

    1. 单击New Subscribe

    2. New Subscription对话框,配置TopicQos,单击Confirm

      此处以temp_hum/emqxTopic为例介绍。

    3. 选择TopicPayloadJSON,配置Topic名称和消息体。

      JSON示例如下:

      {
        "temp": "35.6",
        "hum": "99"
      }
    4. 单击fig_send图标,发送消息。

步骤三:通过表格存储控制台验证数据

  1. 进入时序表列表页签。

    1. 登录表格存储控制台

    2. 在页面上方,选择资源组和地域。

    3. 概览页面,单击实例名称。

    4. 实例详情页签,单击时序表列表

  2. 在时序表列表页签,单击时序表名称后选择数据管理页签或在操作列单击数据管理

  3. 数据管理页签,输入度量名称cloud,选择时间范围为1小时,单击查询

    image