AMQP(Advanced Message Queuing Protocol,高级消息队列协议)转储功能适用于生活物联网平台与企业服务器之间的消息流转。通过集成和使用AMQP
SDK,即可实现身份认证、消息接收的能力。我们推荐使用AMQP的方式推送设备数据(如设备状态数据、设备控制记录等),用户信息数据等。
前提条件
开启设备数据同步,并配置要同步数据的产品。详细参见
设置数据同步。

说明 当开启数据同步后,集成AMQP客户端SDK来订阅数据。如果通过控制台关闭数据同步再开启时,客户端SDK需要重新进行连接流程,否则无法正常接收数据。如果切换不同的AMQP客户端,需要把之前的客户端断开,再连接新的客户端。否则新客户端无法正常接收数据。
AMQP SDK使用
- 引入依赖。
AMQP SDK为开源SDK。如果您使用Java开发语言,推荐使用Apache Qpid JMS客户端。在项目中添加Maven依赖,Maven信息如下。
说明 目前生活物联网平台仅支持Java语言开发,暂未提供其他语言版本,请以生活物联网平台官网为主。
<!-- amqp 1.0 qpid client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.47.0</version>
</dependency>
<!-- util for base64-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
- 认证身份信息。
- 接收云端消息。
首先需要创建消息接收的客户端对象client,并传入上面身份认证的profile信息。当消息接收的客户端和服务端建立连接后,服务端会立即向消息接收的客户端推送已订阅的消息,因此建立连接时需要提供默认消息接收的回调接口,用于处理云端推送的消息。
完整代码示例如下。
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpJavaClientDemo {
private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
public static void main(String[] args) throws Exception {
String appKey = "${YourAppkey}"; //在控制台自有App,单击密钥对应的查看,可显示App的App Key和App Secret 创建App时系统
String appSecret = "${YourAppSecret}";
String consumerGroupId = "${YourAppkey}";
long random = xxxxx;
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
String clientId = "${YourClientId}";
String userName = clientId + "|authMode=appkey"
+ ",signMethod=" + "SHA256"
+ ",random=" + random
+ ",appKey=" + appKey
+ ",groupId=" + consumerGroupId + "|";
String signContent = "random=" + random;
String password = doSign(signContent, appSecret, "HmacSHA256");
String connectionUrlTemplate = "failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)"
+ "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrlTemplate);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// 创建连接。
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(() -> processMessage(message));
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};
/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
logger.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Hex.encodeHexString(rawHmac);
}
}
说明 消息推送失败时,平台会重新推送,重试策略如下。
- 如果对端不在线或未回复ack消息,则会造成消息堆积,堆积的消息转为离线消息。
- 离线消息每隔5min重试推送一次(每次推送10条)。对端如果成功接收了消息,则重试策略会继续推送剩余的离线消息(推送失败的消息,下一次继续推送)。
- 离线消息最多会保存 1 天,如果 1 天后仍然无法推送成功,则会被删除。
- 离线消息会进入单独的队列,不会影响后续消息的实时推送。
消息格式
- 物的属性变更消息
topic:/${productKey}/${deviceName}/thing/event/property/post
消息字段说明如下。
参数 |
子参数 |
子参数 |
类型 |
含义 |
deviceType |
|
|
String |
设备所属品类 |
gmtCreate |
|
|
Long |
数据流转消息产生时间,自1970-1-1起流逝的毫秒值 |
iotId |
|
|
String |
设备的唯一ID |
productKey |
|
|
String |
设备所属产品的唯一标识符 |
deviceName |
|
|
String |
设备名称 |
items |
|
|
JSON |
变更的状态列表 |
|
attribute |
|
String |
发生变更的属性,具体取值由具体情况确定 |
|
|
value |
具体数据类型由具体情况确定 |
变更值 |
|
|
time |
Long |
设备属性发生变化的时间,自1970-1-1起流逝的毫秒值 |
消息示例如下。
{
"deviceType": "SmartDoor",
"iotId": "Xzf15db9xxxxxxxxWR001046b400",
"productKey": "a17xxxxTYNA",
"gmtCreate": 153xxxx145304,
"deviceName": "Xzf15xxxxucTHBgUo6WR",
"items": {
"WIFI_Rx_Rate": {
"value": 74274,
"time": 1534299145344
}
}
}
- 物的事件变更消息
topic:/${productKey}/${deviceName}/thing/event/{tsl.event.identifier}/post
消息字段说明如下。
参数 |
子参数 |
类型 |
含义 |
deviceType |
|
String |
设备所属品类 |
iotId |
|
String |
设备的唯一ID |
productKey |
|
String |
设备所属产品的唯一标识符 |
deviceName |
|
String |
设备名称 |
identifier |
|
String |
事件标识符,对应事件的identifier |
name |
|
String |
事件名称 |
type |
|
String |
事件类型 |
time |
|
Long |
设备上报value对应的时间,自1970-1-1起流逝的毫秒值 |
value |
|
JSON |
变更的事件属性列表:key-value键值对 |
|
key |
String |
属性key |
|
value |
具体数据类型由具体情况确定 |
属性取值 |
消息示例如下。
{
"deviceType": "SmartDoor",
"identifier": "Doorxxxxication",
"iotId": "Xzf15db9xxxxxxxxx01046b400",
"name": "开门通知",
"time": 1534319108982,
"type": "info",
"productKey": "a17xxxxTYNA",
"deviceName": "Xzf15xxxxucTHBgUo6WR",
"value": {
"KeyID": "x8xxxxxkDY",
"LockType": 3
}
}
- 设备服务返回消息
topic:/${productKey}/${deviceName}/thing/downlink/reply/message
消息字段说明如下。
参数 |
类型 |
含义 |
gmtCreate |
Long |
数据流转消息产生时间,自1970-1-1起流逝的毫秒值 |
iotId |
String |
设备的唯一ID |
productKey |
String |
设备所属产品的唯一标识符 |
deviceName |
String |
设备名称 |
requestId |
String |
阿里云产生和设备通信的信息ID |
code |
Integer |
调用的结果信息 |
message |
String |
结果信息说明 |
topic |
String |
服务调用下行时使用的topic |
data |
Object |
设备返回的结果,非透传之间返回设备结果,透传则需要经过脚本转换 |
消息示例如下。
{
"gmtCreate": 151xxxx39881,
"iotId": "4z819VQHxxxxxxxxxxxx7ee200",
"productKey": "p1gxxxxBd",
"deviceName": "xxxxxxxxxx",
"requestId": "1234",
"code": 200,
"message": "success",
"topic": "/sys/p1gxxxxeUBd/xxxxxxxxxx/thing/service/property/set",
"data": {}
}
- 物的状态变更消息
为了提高消息有效性,设备上下线过于频繁时,会对消息进行筛检。
topic: /${productKey}/${deviceName}/mqtt/status
消息字段说明如下。
参数 |
子参数 |
类型 |
含义 |
deviceType |
|
String |
设备所属品类 |
gmtCreate |
|
Long |
数据流转消息产生时间,自1970-1-1起流逝的毫秒值 |
iotId |
|
String |
设备的唯一ID |
action |
|
String |
设备状态变更动作:
|
productKey |
|
String |
设备所属产品的唯一标识符 |
deviceName |
|
String |
设备名称 |
status |
|
JSON |
状态信息,元素包括:value-状态值,time-发生变化的时间 |
|
time |
Long |
设备上下线状态发生变化的时间,自1970-1-1起流逝的毫秒值 |
|
value |
String |
设备上下线状态
value状态值定义:
|
消息示例如下。
{
"deviceType": "SmartDoor",
"iotId": "Xzf15dxxxxxxxxxxxxxxxx01046b400",
"action": "online",
"productKey": "a17xxxxxxTYNA",
"gmtCreate": 153xxxx1368,
"deviceName": "Xzf1xxxxxxxxxxxxgUo6WR",
"status": {
"time": 1534319611368,
"value": "1"
}
}
- 用户绑定变更消息
用户绑定/解绑设备产生的回流消息,用于同步用户与设备的绑定、解绑。
topic:/${productKey}/${deviceName}/thing/awss/enrollee/user
消息字段说明如下。
参数 |
子参数 |
类型 |
含义 |
bind |
|
bool |
true-绑定;false-解绑 |
productKey |
|
String |
设备所属产品的唯一标识符 |
deviceName |
|
String |
设备名称 |
iotId |
|
String |
设备的唯一ID |
messageCreateTime |
|
JSON |
消息创建时间 |
identityInfos |
|
list |
用户信息列表 |
|
identityId |
String |
用户身份ID |
|
scopeId |
String |
隔离ID |
|
tenantId |
String |
租户ID |
|
owned |
Integer |
拥有标记
|
params |
|
Map |
扩展参数(暂未使用) |
{
"bind":true,
"productKey": "123xxxx569",
"deviceName": "deviceNamexxxx34",
"iotId": "",
"messageCreateTime": 151xxxx9881,
"identityInfos":[
{
"identityId":"50xxxxxxxxxxxx62060259",
"scopeId":"",
"tenantId":"1D89B5xxxxxxxxxxxxxxxx861678FF",
"owned":1
}
],
"params":{
}
}