物联网平台将设备上报的数据流转至消息队列RocketMQ的Topic中,然后,RocketMQ再将数据流转到企业服务器。本文介绍数据流转的操作步骤。
背景信息
数据流转流程图:

方案优势:
- 设备使用MQTT协议接入物联网平台,数据传输链路支持TLS加密,保障数据不被篡改。MQTT协议说明,请参见MQTT协议规范。
- 通过消息队列RocketMQ削峰填谷,缓冲消息,减轻服务器同时接收大量设备消息的压力。
操作步骤
- 登录物联网控制台,创建产品和设备。
- 在实例概览页面,找到对应的实例,单击实例进入实例详情页面。
注意 在中国地域,目前仅华东2(上海)地域开通了公共实例服务。

- 在左侧导航栏选择,单击创建产品,配置参数,单击确认。
本示例中,产品名称为MQ_test,所属品类为自定义品类,节点类型为直连设备,其他参数使用默认值。
- 单击查看产品详情,在产品详情页面,单击,然后单击自定义Topic类,定义一个用于设备上报数据的Topic。
本示例中,定义的Topic类:/{YourProductKey}/${YourDeviceName}/user/data
。
- 在左侧导航栏选择,单击添加设备,为产品MQ_test创建设备。
本示例中,创建了一个名称为MQdevice的设备。
- 在消息队列RocketMQ控制台,创建Topic和消费者。
- 登录消息队列RocketMQ控制台。
- 单击创建实例,创建一个标准版实例,地域选择华东2(上海)。
注意 RocketMQ实例所在地域必须与物联网平台实例所在地域保持一致。
- 单击创建 Group,如下图所示。
- 单击创建Topic,消息类型选择普通消息。
- 创建消息消费者,然后在RocketMQ控制台查看消费者状态,确保消费者处于在线状态,订阅关系一致。
本文以调用TCP协议的SDK为例,进行收发消息。SDK获取和使用的详细内容,请参见
调用TCP协议的SDK收发普通消息。
Java语言的SDK代码示例如下:
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, "${AccessKey}");
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "${SecretKey}");
// 设置 TCP 接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
// 集群订阅方式 (默认)
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 广播订阅方式
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("iot_to_mq", "*", new MessageListener() { //订阅多个 Tag
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
- 登录物联网平台控制台,在对应实例下,设置数据流转规则,将设备上报的数据转发至消息队列(RocketMQ)。
- 单击,然后单击创建规则,创建一条数据流转规则MQ流转,数据格式选择为JSON。
- 单击编写SQL,设置数据处理SQL,如下图所示。
- 单击添加操作设置数据转发目的地。
- 所有设置完成后,返回至云产品流转页面,单击MQ流转规则对应的启动。
规则启动后,物联网平台会将规则SQL中定义的设备上报消息转发至消息队列(RocketMQ)的Topic中。
- 使用Java SDK模拟设备,上报消息。
- 下载Java SDK Demo,然后解压。
- 在IntelliJ IDEA中,导入Demo包中的示例工程JavaLinkKitDemo。
- 在文件device_id.json中输入MQdevice的设备证书信息:productKey、deviceName和deviceSecret。
- 在文件src\main\java\com.aliyun.alink.devicesdk.demo\MqttSample.java中修改MQTT Topic为设备上报数据的Topic。本示例中,使用的Topic是
/{YourProductKey}/${YourDeviceName}/user/data
。
- 运行src\main\java\com.aliyun.alink.devicesdk.demo\HelloWorld.java文件,启动设备。
登录
物联网平台控制台,
在对应的实例下,选择,查看该设备的日志信息,发现设备数据成功转发至RocketMQ。

- 在RocketMQ控制台查看消息。
- 在本地运行订阅消息队列RocketMQ资源的代码。
- 在消息队列RocketMQ控制台,消息查询页面,按Topic或者Message ID查询消息,您可下载流转至消息队列RocketMQ中的消息。

消息内容如下:
{"deviceName":"MQdevice"}