设备数据订阅
背景介绍:
该文档适用于多租户和单租户两种模式,发布和订阅已授权给应用的设备端的消息。物联网平台提供HTTPS/2 (Java)设备SDK进行建联,用于建立设备端与物联网平台的通信。参考SDK链接 :https://help.aliyun.com/document_detail/143601.html?spm=a2c4g.11186623.4.5.367d4f38ycUoZZ 此处提供了SDK Demo,您可以参考此Demo,开发SDK,进行订阅设备端的消息。
前提条件
这种订阅方式需要您使用托管平台的2个模块:分别是 [设备管理—设备—Topic列表] 和 [应用托管—应用集成—授权设备给应用] 配合使用。前者定义设备端Topic的自定义名称以及Topic的权限(发布和订阅),后者是在托管平台-设备集成把设备授权给应用。
操作步骤
下载HTTP/2 SDK(Java) Demo。下载地址:设备订阅Demo Code
使用IDEA或者Eclipse,将该Demo导入到工程里面。
从控制台获取已经授权的appkey和appsecret信息。
完成身份认证和对接进行设备的消息订阅。
Demo示例
a. 配置参数
//System.getenv("iot.hosting.appKey")方法可以从环境变量中获取appkey和appsecret
String appKey = System.getenv("iot.hosting.appKey");
String appSecret=System.getenv("iot.hosting.appSecret");
//一般在配置文件中固定为:iot.http2.host=https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443
@Value("${iot.http2.host}")
private String httpHost;
b. 连接HTTP/2服务器,并接收数据
// 连接配置
Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
// 如果是true 那么清理所有离线消息,即qos0 或者 1的所有未接收内容
profile.setCleanSession(false);
// 构造客户端
MessageClient messageClient = MessageClientFactory.messageClient(profile);
try {
// 数据接收
messageClient.connect(messageToken -> {
System.out.println(messageToken.getMessage());
return MessageCallback.Action.CommitSuccess;
});
} catch (Throwable ex) {
System.out.println(ex);
}
c. 订阅Topic
// topic订阅。订阅成功后,即可在建连时的回调接口中收到消息
MessageCallback messageCallback = new MessageCallback() {
@Override
//消息消费(接收消息)
public Action consume(MessageToken messageToken) {
//将消息实体化,进行消息体的操作
ThingMsgRecordDO recordDO = new ThingMsgRecordDO();
recordDO.setTopic(messageToken.getMessage().getTopic());
recordDO.setPayload(new String(messageToken.getMessage().getPayload()));
System.out.println(recordDO);
return Action.CommitSuccess;
}
};
d.设置授权appkey的自定义topic
//消息订阅Topic的设置,需在设备端设置自定义的Topic。(这里以自定义Topic为例)
//也可也使用设备端的统一的Topic
String topic = String.format("/a13TLBPrC0d/wkzijitianjiashebei/user/wkget", appKey);
messageClient.setMessageListener(topic, messageCallback);
多租户和单租户消息处理
a.消息返回示例
Message{payload={"deviceType":"CustomCategory",
"iotId":"69JL4ECGiPd28vqfdBM8000100",
"requestId":"123",
"productKey":"a13TLBPrC0d",
"gmtCreate":1565147849691,
"deviceName":"wkzijitianjiashebei",
"items":{"electric_pfa":{"value":23,"time":1565147849705},
"COSa":{"value":1,"time":1565147849705},
"Uab":{"value":121212,"time":1565147849705}}},
topic='/a13TLBPrC0d/wkzijitianjiashebei/thing/event/property/post',
messageId='1158940197065362432',
qos=0,
generateTime=1565147849705}
b.返回消息体参数简要说明
名称 | 描述 |
---|---|
deviceType | 设备类型 |
iotId | 阿里云物联网平台为设备颁发全局唯一的设备ID |
requestId | request请求id |
productKey | 设备隶属的产品Key。 |
gmtCreate | 时间戳 |
deviceName | 设备名称 |
items | 消息内容 |
topic | 消息来源的Topic |
messageId | 消息ID |
qos | 清除离线消息 |
generateTime | 时间戳 |
c.消息处理
多租户的模式:需要根据返回的消息体的字段进行消息数据的处理。单租户的模式:无需处理,获取的消息为已授权给应用的设备消息。
接口说明
身份认证
设备连接物联网平台时,需要使用Profile配置设备身份及相关参数。具体接口参数如下:
Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
MessageClient messageClient = MessageClientFactory.messageClient(profile);
profile.setCleanSession(false);
try {
messageClient.connect(messageToken -> {
return MessageCallback.Action.CommitSuccess;
});
} catch (Throwable ex) {
logger.info("init h2 client exception", ex);
System.out.println(ex);
}
Profile参数说明:
名称 | 类型 | 是否必须 | 描述 |
---|---|---|---|
appKey | String | 是 | 设备授权应用的appkey可以通过环境变量获取system.getev(“iot.hosting.appkey”) |
appSecret | String | 是 | 设备授权应用appSecret可以通过环境变量获取system.getev(“iot.hosting.appSecret”) |
httpHost | String | 是 | 请求路由,目前为上海区域:”https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443“ |
cleanSession | Boolean | 否 | 是否清除缓存的离线消息。 |
heartBeatInterval | Long | 否 | 心跳间隔,单位为毫秒。 |
heartBeatTimeOut | Long | 否 | 心跳超时时间,单位为毫秒。 |
multiConnection | Boolean | 否 | 是否使用多连接。若使用设备的productKey和deivceName接入时,请将此参数值设置为false。 |
callbackThreadCorePoolSize | Integer | 否 | 回调线程的corePoolSize(核心池的大小)。 |
callbackThreadMaximumPoolSize | Integer | 否 | 回调线程池的maximumPoolSize(最大线程数)。 |
callbackThreadBlockingQueueSize | Integer | 否 | 回调线程池的BlockingQueueSize(阻塞队列大小)。 |
authParams | Map | 否 | 自定义认证参数。 |