全部产品
云市场

设备数据订阅

更新时间:2019-09-19 10:12:13

背景介绍:

该文档适用于多租户和单租户两种模式,发布和订阅已授权给应用的设备端的消息。物联网平台提供HTTP/2 (Java)设备SDK进行建联,用于建立设备端与物联网平台的通信。
参考SDK链接 :https://help.aliyun.com/document_detail/30581.htmlspm=a2c4g.11174283.6.683.3a8b1668RkUA0i
此处提供了SDK Demo,您可以参考此Demo,开发SDK,进行订阅设备端的消息。

前提条件

这种订阅方式需要您使用托管平台的2个模块:分别是 [设备管理—设备—Topic列表] 和 [应用托管—应用集成—授权设备给应用] 配合使用。前者定义设备端Topic的自定义名称以及Topic的权限(发布和订阅),后者是在托管平台-设备集成把设备授权给应用。

操作步骤

  1. 下载HTTP/2 SDK(Java) Demo。下载地址为:
  2. 使用IDEA或者Eclipse,将该Demo导入到工程里面。
  3. 从控制台获取已经授权的appkey和appsecret信息。
  4. 完成身份认证和对接进行设备的消息订阅。

    Demo示例

    a. 配置参数

  1. //System.getenv("iot.hosting.appKey")方法可以从环境变量中获取appkey和appsecret
  2. String appKey = System.getenv("iot.hosting.appKey");
  3. String appSecret=System.getenv("iot.hosting.appSecret");
  4. //一般在配置文件中固定为:iot.http2.host=https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443
  5. @Value("${iot.http2.host}")
  6. private String httpHost;

b. 连接HTTP/2服务器,并接收数据

  1. // 连接配置
  2. Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
  3. // 如果是true 那么清理所有离线消息,即qos0 或者 1的所有未接收内容
  4. profile.setCleanSession(false);
  5. // 构造客户端
  6. MessageClient messageClient = MessageClientFactory.messageClient(profile);
  7. try {
  8. // 数据接收
  9. messageClient.connect(messageToken -> {
  10. System.out.println(messageToken.getMessage());
  11. return MessageCallback.Action.CommitSuccess;
  12. });
  13. } catch (Throwable ex) {
  14. System.out.println(ex);
  15. }

c. 订阅Topic

  1. // topic订阅。订阅成功后,即可在建连时的回调接口中收到消息
  2. MessageCallback messageCallback = new MessageCallback() {
  3. @Override
  4. //消息消费(接收消息)
  5. public Action consume(MessageToken messageToken) {
  6. //将消息实体化,进行消息体的操作
  7. ThingMsgRecordDO recordDO = new ThingMsgRecordDO();
  8. recordDO.setTopic(messageToken.getMessage().getTopic());
  9. recordDO.setPayload(new String(messageToken.getMessage().getPayload()));
  10. System.out.println(recordDO);
  11. return Action.CommitSuccess;
  12. }
  13. };

d.设置授权appkey的自定义topic

  1. //消息订阅Topic的设置,需在设备端设置自定义的Topic。(这里以自定义Topic为例)
  2. //也可也使用设备端的统一的Topic
  3. String topic = String.format("/a13TLBPrC0d/wkzijitianjiashebei/user/wkget", appKey);
  4. messageClient.setMessageListener(topic, messageCallback);

多租户和单租户消息处理

a.消息返回示例

  1. Message{payload={"deviceType":"CustomCategory",
  2. "iotId":"69JL4ECGiPd28vqfdBM8000100",
  3. "requestId":"123",
  4. "productKey":"a13TLBPrC0d",
  5. "gmtCreate":1565147849691,
  6. "deviceName":"wkzijitianjiashebei",
  7. "items":{"electric_pfa":{"value":23,"time":1565147849705},
  8. "COSa":{"value":1,"time":1565147849705},
  9. "Uab":{"value":121212,"time":1565147849705}}},
  10. topic='/a13TLBPrC0d/wkzijitianjiashebei/thing/event/property/post',
  11. messageId='1158940197065362432',
  12. qos=0,
  13. generateTime=1565147849705}

b.返回消息体参数简要说明

名称 描述
deviceType 设备类型
iotId 阿里云物联网平台为设备颁发全局唯一的设备ID
requestId request请求id
productKey 设备隶属的产品Key。
gmtCreate 时间戳
deviceName 设备名称
items 消息内容
topic 消息来源的Topic
messageId 消息ID
qos 清除离线消息
generateTime 时间戳

c.消息处理

多租户的模式:需要根据返回的消息体的字段进行消息数据的处理。
单租户的模式:无需处理,获取的消息为已授权给应用的设备消息。

接口说明

身份认证

设备连接物联网平台时,需要使用Profile配置设备身份及相关参数。具体接口参数如下:

  1. Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
  2. MessageClient messageClient = MessageClientFactory.messageClient(profile);
  3. profile.setCleanSession(false);
  4. try {
  5. messageClient.connect(messageToken -> {
  6. return MessageCallback.Action.CommitSuccess;
  7. });
  8. } catch (Throwable ex) {
  9. logger.info("init h2 client exception", ex);
  10. System.out.println(ex);
  11. }

Profile参数说明:

名称 类型 是否必须 描述
appKey String 设备授权应用的appkey
可以通过环境变量获取
system.getev(“iot.hosting.appkey”)
appSecret String 设备授权应用appSecret
可以通过环境变量获取
system.getev(“iot.hosting.appSecret”)
httpHost String 请求路由,目前为上海区域:”https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443
cleanSession Boolean 是否清除缓存的离线消息。
heartBeatInterval Long 心跳间隔,单位为毫秒。
heartBeatTimeOut Long 心跳超时时间,单位为毫秒。
multiConnection Boolean 是否使用多连接。若使用设备的productKey和deivceName接入时,请将此参数值设置为false。
callbackThreadCorePoolSize Integer 回调线程的corePoolSize(核心池的大小)。
callbackThreadMaximumPoolSize Integer 回调线程池的maximumPoolSize(最大线程数)。
callbackThreadBlockingQueueSize Integer 回调线程池的BlockingQueueSize(阻塞队列大小)。
authParams Map 自定义认证参数。