设备数据订阅

更新时间:

背景介绍:

该文档适用于多租户和单租户两种模式,发布和订阅已授权给应用的设备端的消息。物联网平台提供HTTPS/2 (Java)设备SDK进行建联,用于建立设备端与物联网平台的通信。参考SDK链接 :https://help.aliyun.com/document_detail/143601.html?spm=a2c4g.11186623.4.5.367d4f38ycUoZZ 此处提供了SDK Demo,您可以参考此Demo,开发SDK,进行订阅设备端的消息。

前提条件

这种订阅方式需要您使用托管平台的2个模块:分别是 [设备管理—设备—Topic列表] 和 [应用托管—应用集成—授权设备给应用] 配合使用。前者定义设备端Topic的自定义名称以及Topic的权限(发布和订阅),后者是在托管平台-设备集成把设备授权给应用。

操作步骤

  1. 下载HTTP/2 SDK(Java) Demo。下载地址:设备订阅Demo Code

  2. 使用IDEA或者Eclipse,将该Demo导入到工程里面。

  3. 从控制台获取已经授权的appkey和appsecret信息。

  4. 完成身份认证和对接进行设备的消息订阅。

    Demo示例

    a. 配置参数

     //System.getenv("iot.hosting.appKey")方法可以从环境变量中获取appkey和appsecret
     String appKey = System.getenv("iot.hosting.appKey");
     String appSecret=System.getenv("iot.hosting.appSecret");
     //一般在配置文件中固定为:iot.http2.host=https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443
     @Value("${iot.http2.host}")
     private String httpHost;

b. 连接HTTP/2服务器,并接收数据

// 连接配置
        Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
// 如果是true 那么清理所有离线消息,即qos0 或者 1的所有未接收内容
        profile.setCleanSession(false);   
// 构造客户端
        MessageClient messageClient = MessageClientFactory.messageClient(profile);
        try {
// 数据接收
            messageClient.connect(messageToken -> {
                System.out.println(messageToken.getMessage());
                return MessageCallback.Action.CommitSuccess;
            });
        } catch (Throwable ex) {
            System.out.println(ex);
        }

c. 订阅Topic

// topic订阅。订阅成功后,即可在建连时的回调接口中收到消息
MessageCallback messageCallback = new MessageCallback() {
            @Override
//消息消费(接收消息)
            public Action consume(MessageToken messageToken) {
//将消息实体化,进行消息体的操作               
                ThingMsgRecordDO recordDO = new ThingMsgRecordDO();
                recordDO.setTopic(messageToken.getMessage().getTopic());
                recordDO.setPayload(new String(messageToken.getMessage().getPayload()));
                System.out.println(recordDO);
                return Action.CommitSuccess;
            }
        };

d.设置授权appkey的自定义topic

//消息订阅Topic的设置,需在设备端设置自定义的Topic。(这里以自定义Topic为例)
//也可也使用设备端的统一的Topic
String topic = String.format("/a13TLBPrC0d/wkzijitianjiashebei/user/wkget", appKey);
        messageClient.setMessageListener(topic, messageCallback);

多租户和单租户消息处理

a.消息返回示例

Message{payload={"deviceType":"CustomCategory",
                      "iotId":"69JL4ECGiPd28vqfdBM8000100",
                      "requestId":"123",
                      "productKey":"a13TLBPrC0d",
                      "gmtCreate":1565147849691,
                      "deviceName":"wkzijitianjiashebei",
                      "items":{"electric_pfa":{"value":23,"time":1565147849705},
                               "COSa":{"value":1,"time":1565147849705},
                               "Uab":{"value":121212,"time":1565147849705}}}, 
             topic='/a13TLBPrC0d/wkzijitianjiashebei/thing/event/property/post',
             messageId='1158940197065362432', 
             qos=0, 
             generateTime=1565147849705}

b.返回消息体参数简要说明

名称

描述

deviceType

设备类型

iotId

阿里云物联网平台为设备颁发全局唯一的设备ID

requestId

request请求id

productKey

设备隶属的产品Key。

gmtCreate

时间戳

deviceName

设备名称

items

消息内容

topic

消息来源的Topic

messageId

消息ID

qos

清除离线消息

generateTime

时间戳

c.消息处理

多租户的模式:需要根据返回的消息体的字段进行消息数据的处理。单租户的模式:无需处理,获取的消息为已授权给应用的设备消息。

接口说明

身份认证

设备连接物联网平台时,需要使用Profile配置设备身份及相关参数。具体接口参数如下:

   Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
        MessageClient messageClient = MessageClientFactory.messageClient(profile);
        profile.setCleanSession(false);
        try {
            messageClient.connect(messageToken -> {
                return MessageCallback.Action.CommitSuccess;
            });
        } catch (Throwable ex) {
            logger.info("init h2 client exception", ex);
            System.out.println(ex);
        }

Profile参数说明:

名称

类型

是否必须

描述

appKey

String

设备授权应用的appkey可以通过环境变量获取system.getev(“iot.hosting.appkey”)

appSecret

String

设备授权应用appSecret可以通过环境变量获取system.getev(“iot.hosting.appSecret”)

httpHost

String

请求路由,目前为上海区域:”https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443

cleanSession

Boolean

是否清除缓存的离线消息。

heartBeatInterval

Long

心跳间隔,单位为毫秒。

heartBeatTimeOut

Long

心跳超时时间,单位为毫秒。

multiConnection

Boolean

是否使用多连接。若使用设备的productKey和deivceName接入时,请将此参数值设置为false。

callbackThreadCorePoolSize

Integer

回调线程的corePoolSize(核心池的大小)。

callbackThreadMaximumPoolSize

Integer

回调线程池的maximumPoolSize(最大线程数)。

callbackThreadBlockingQueueSize

Integer

回调线程池的BlockingQueueSize(阻塞队列大小)。

authParams

Map

自定义认证参数。