设备数据订阅
背景介绍:
该文档适用于多租户和单租户两种模式,发布和订阅已授权给应用的设备端的消息。物联网平台提供HTTPS/2 (Java)设备SDK进行建联,用于建立设备端与物联网平台的通信。参考SDK链接 :https://help.aliyun.com/document_detail/143601.html?spm=a2c4g.11186623.4.5.367d4f38ycUoZZ 此处提供了SDK Demo,您可以参考此Demo,开发SDK,进行订阅设备端的消息。
前提条件
这种订阅方式需要您使用托管平台的2个模块:分别是 [设备管理—设备—Topic列表] 和 [应用托管—应用集成—授权设备给应用] 配合使用。前者定义设备端Topic的自定义名称以及Topic的权限(发布和订阅),后者是在托管平台-设备集成把设备授权给应用。
操作步骤
下载HTTP/2 SDK(Java) Demo。下载地址:设备订阅Demo Code
使用IDEA或者Eclipse,将该Demo导入到工程里面。
从控制台获取已经授权的appkey和appsecret信息。
完成身份认证和对接进行设备的消息订阅。
Demo示例
a. 配置参数
     //System.getenv("iot.hosting.appKey")方法可以从环境变量中获取appkey和appsecret
     String appKey = System.getenv("iot.hosting.appKey");
     String appSecret=System.getenv("iot.hosting.appSecret");
     //一般在配置文件中固定为:iot.http2.host=https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443
     @Value("${iot.http2.host}")
     private String httpHost;b. 连接HTTP/2服务器,并接收数据
// 连接配置
        Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
// 如果是true 那么清理所有离线消息,即qos0 或者 1的所有未接收内容
        profile.setCleanSession(false);   
// 构造客户端
        MessageClient messageClient = MessageClientFactory.messageClient(profile);
        try {
// 数据接收
            messageClient.connect(messageToken -> {
                System.out.println(messageToken.getMessage());
                return MessageCallback.Action.CommitSuccess;
            });
        } catch (Throwable ex) {
            System.out.println(ex);
        }c. 订阅Topic
// topic订阅。订阅成功后,即可在建连时的回调接口中收到消息
MessageCallback messageCallback = new MessageCallback() {
            @Override
//消息消费(接收消息)
            public Action consume(MessageToken messageToken) {
//将消息实体化,进行消息体的操作               
                ThingMsgRecordDO recordDO = new ThingMsgRecordDO();
                recordDO.setTopic(messageToken.getMessage().getTopic());
                recordDO.setPayload(new String(messageToken.getMessage().getPayload()));
                System.out.println(recordDO);
                return Action.CommitSuccess;
            }
        };d.设置授权appkey的自定义topic
//消息订阅Topic的设置,需在设备端设置自定义的Topic。(这里以自定义Topic为例)
//也可也使用设备端的统一的Topic
String topic = String.format("/a13TLBPrC0d/wkzijitianjiashebei/user/wkget", appKey);
        messageClient.setMessageListener(topic, messageCallback);多租户和单租户消息处理
a.消息返回示例
Message{payload={"deviceType":"CustomCategory",
                      "iotId":"69JL4ECGiPd28vqfdBM8000100",
                      "requestId":"123",
                      "productKey":"a13TLBPrC0d",
                      "gmtCreate":1565147849691,
                      "deviceName":"wkzijitianjiashebei",
                      "items":{"electric_pfa":{"value":23,"time":1565147849705},
                               "COSa":{"value":1,"time":1565147849705},
                               "Uab":{"value":121212,"time":1565147849705}}}, 
             topic='/a13TLBPrC0d/wkzijitianjiashebei/thing/event/property/post',
             messageId='1158940197065362432', 
             qos=0, 
             generateTime=1565147849705}b.返回消息体参数简要说明
名称  | 描述  | 
|---|---|
deviceType  | 设备类型  | 
iotId  | 阿里云物联网平台为设备颁发全局唯一的设备ID  | 
requestId  | request请求id  | 
productKey  | 设备隶属的产品Key。  | 
gmtCreate  | 时间戳  | 
deviceName  | 设备名称  | 
items  | 消息内容  | 
topic  | 消息来源的Topic  | 
messageId  | 消息ID  | 
qos  | 清除离线消息  | 
generateTime  | 时间戳  | 
c.消息处理
多租户的模式:需要根据返回的消息体的字段进行消息数据的处理。单租户的模式:无需处理,获取的消息为已授权给应用的设备消息。
接口说明
身份认证
设备连接物联网平台时,需要使用Profile配置设备身份及相关参数。具体接口参数如下:
   Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
        MessageClient messageClient = MessageClientFactory.messageClient(profile);
        profile.setCleanSession(false);
        try {
            messageClient.connect(messageToken -> {
                return MessageCallback.Action.CommitSuccess;
            });
        } catch (Throwable ex) {
            logger.info("init h2 client exception", ex);
            System.out.println(ex);
        }Profile参数说明:
名称  | 类型  | 是否必须  | 描述  | 
|---|---|---|---|
appKey  | String  | 是  | 设备授权应用的appkey可以通过环境变量获取system.getev(“iot.hosting.appkey”)  | 
appSecret  | String  | 是  | 设备授权应用appSecret可以通过环境变量获取system.getev(“iot.hosting.appSecret”)  | 
httpHost  | String  | 是  | 请求路由,目前为上海区域:”https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443“  | 
cleanSession  | Boolean  | 否  | 是否清除缓存的离线消息。  | 
heartBeatInterval  | Long  | 否  | 心跳间隔,单位为毫秒。  | 
heartBeatTimeOut  | Long  | 否  | 心跳超时时间,单位为毫秒。  | 
multiConnection  | Boolean  | 否  | 是否使用多连接。若使用设备的productKey和deivceName接入时,请将此参数值设置为false。  | 
callbackThreadCorePoolSize  | Integer  | 否  | 回调线程的corePoolSize(核心池的大小)。  | 
callbackThreadMaximumPoolSize  | Integer  | 否  | 回调线程池的maximumPoolSize(最大线程数)。  | 
callbackThreadBlockingQueueSize  | Integer  | 否  | 回调线程池的BlockingQueueSize(阻塞队列大小)。  | 
authParams  | Map  | 否  | 自定义认证参数。  |