将模拟温湿度数据通过MQTT协议上报到EMQX Cloud后,使用EMQX Cloud数据集成将数据转存到表格存储(Tablestore)。
前提条件
- EMQX Cloud
- 已在EMQX Cloud上创建专业版EMQX集群。
- 已完成与阿里云平台的VPC对等连接创建,用于VPC之间的网络连接。具体操作,请参见VPC对等连接的创建。
说明 如果要使用公网IP地址连接资源,请开通NAT网关。具体操作,请参见NAT网关。
- 已下载MQTT X工具,用于后续模拟数据上报。具体下载路径请参见MQTT X。
- 表格存储
- 已开通表格存储服务,并创建表格存储实例、时序表(时序表名为temp_hum)和时间线(度量名称为cloud)。具体操作,请参见通过控制台使用时序模型。
- 已为实例绑定与EMQX Cloud建立了对等连接的VPC。具体操作,请参见绑定VPC。
- 已获取阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。具体操作,请参见创建AccessKey。
背景信息
步骤一:通过EMQX控制台配置Tablestore资源
- 进入数据集成页面。
- 登录EMQX Cloud控制台。
- 单击项目。
- 在左侧导航栏中,单击数据集成。
- 新建资源。
- 在数据集成页面的数据持久化区域,单击Tablestore(时序)。
- 在新建资源页面,根据下表说明配置要连接的资源参数。
参数 是否必选 示例值 描述 资源类型 是 Tablestore(时序) 选择的资源类型,不可修改。 备注 否 测试Tablestore 资源的备注信息。 Tablestore服务器 是 https://myinstance.cn-hangzhou.vpc.tablestore.aliyuncs.com/ 表格存储的服务地址。当使用VPC对等连接进行对接,请使用表格存储的VPC地址;当使用公网进行对接,请使用表格存储的公网地址。更多信息,请参见服务地址。 实例 是 myinstance Tablestore实例名称。 AccessKey ID 是 ************************ 阿里云账号或者RAM用户的AccessKey ID。 说明 AccessKey ID是由英文字母和数字组成的字符串,长度为24字节。AccessKey Secret 是 ******************************** 阿里云账号或者RAM用户的AccessKey Secret。 说明 AccessKey Secret是由英文字母和数字组成的字符串,长度为30字节。连接池大小 是 8 支持连接的最大个数。 - 单击测试连接,检查连接是否正常。说明 如果测试连接时出现错误,请及时检查数据库配置是否正确。
- 测试通过后,单击新建。
系统弹出成功新建资源对话框。 - 新建规则。完成资源的新建后,您可以直接配置关联的规则来定义如何处理数据。
- 在成功新建资源对话框,单击新建规则。说明 您也可以在数据集成页面,单击目标资源操作列的图标来新建规则。
- 在新建规则页面,输入规则匹配SQL语句。如下SQL示例语句表示从
temp_hum/emqx
主题读取客户端ID、消息体(Payload),并从消息体中分别读取温度和湿度。SELECT payload.temp as temp, payload.hum as hum, clientid FROM "temp_hum/emqx"
您可以打开SQL测试开关,测试查询结果。
- 单击下一步。
- 在成功新建资源对话框,单击新建规则。
- 新建动作。
- 根据下表说明配置动作参数。
参数 是否必选 示例值 描述 使用资源 是 resource:f2d3**** 选择新建的Tablestore(时序)资源。 动作类型 是 数据持久化 - 保存数据到Tablestore 固定取值为数据持久化 - 保存数据到Tablestore,不可修改。 表名 是 temp_hum 表格存储时序表名称。 度量名称 是 cloud 时序表的度量名称。时间线数据所度量的物理量或者监控指标的名称,例如temp或hum,用于表示该时间线记录的是温度或者湿度等。 数据源 否 device_001 产生时间线的数据源标识,可以为空。 时间戳(微秒) 否 1663298365000000 该行时序数据对应的时间,格式为时间戳,单位为微秒(us)。 默认值为消息到达EMQX的微秒时间戳。
注意 EMQX中MQTT消息中默认带有毫秒精度的时间戳,不可直接使用。时间线缓存 否 true Tablestore识别当前数据是否需要创建或更新时间线元数据。默认值为true。 同步写入 否 false 数据是否同步写入表格存储。默认值为false。 开启批量写入或者单条同步写入。批量写入时,备选动作不会触发。
异步模式批量大小 否 100 批量写入数据到表格存储的最大条数。只有当设置同步写入为false时生效。 异步模式批量间隔(毫秒) 否 100 批量写入最长时间间隔。只有当设置同步写入为false时生效。 Tags 否 键:region 值:hangzhou
数据标签,所有数据都以字符串形式处理。 数据列名 是 键:clientid 值:${clientid}
数据键值对,数据类型自动识别。表格存储列值支持多种数据类型,包括布尔、整型、浮点数、字符串和二进制。其中字符串数据默认按照二进制数据处理以保证字符集最佳兼容性。 字符型数据列名 否 键:clientid 值:${clientid}
字符串键值对,数据会按照字符串类型处理。为了保证最佳兼容性,系统默认将字符串类型按照二进制处理。如果需要指定数据按照字符串处理,请将字段配置在此处。 - 单击确认。系统弹出成功新建规则对话框。如果需要继续添加规则,请单击继续添加。
- 根据下表说明配置动作参数。
步骤二:通过MQTT X工具模拟温湿度数据上报
- 创建认证信息。创建认证信息后,您可以通过MQTT X工具使用该用户连接EMQX Cloud。
- 登录EMQX Cloud控制台。
- 单击项目。
- 在左侧导航栏,选择认证鉴权 > 认证。
- 在认证页面,单击添加。
- 在添加认证对话框,设置用户名和密码,单击确认。
- 获取EMQX集群的部署地址和端口。获取部署地址用于连接部署的EMQX集群。
- 登录EMQX Cloud控制台。
- 单击项目。
- 在概览页面,获取并记录连接地址和连接端口信息。
- 使用MQTT X工具模拟温湿度数据上报。
- 下载并安装MQTT X工具。
- 打开MQTT X配置工具,单击New Connection。
- 根据下表说明配置基础连接参数,其他保持默认即可。
参数 是否必选 示例值 描述 Name 是 emqxtest 自定义MQTT X工具与EMQX Cloud建立的连接名称。 Host 是 192.0.2.110 EMQX集群的部署地址。请填写通过EMQX Cloud控制台获取到的连接地址。 Port 是 1883 EMQX集群的MQTT服务端口。请填写通过EMQX Cloud控制台获取到的连接端口。 Username 是 test 连接在EMQX Cloud时使用的用户名和密码。请填写通过EMQX Cloud控制台创建认证信息时设置的用户名和密码。 Password 是 ******** - 单击Connect。
- 发送测试数据。
- 单击New Subscribe。
- 在New Subscription对话框,配置Topic和Qos,单击Confirm。此处以
temp_hum/emqx
的Topic为例介绍。 - 选择Topic的Payload为JSON,配置Topic名称和消息体。JSON示例如下:
{ "temp": "23.5", "hum": "37.4" }
- 单击图标,发送消息。
步骤三:通过表格存储控制台验证数据
- 登录表格存储控制台。
- 在页面上方,选择地域。
- 在概览页面,单击实例名称。
- 在实例详情页签,单击时序表列表。
- 在时序表列表页签,单击时序表名称后选择数据管理页签或在操作列单击数据管理。
- 在数据管理页签,输入度量名称cloud,选择时间范围为1小时,单击查询。