您的服务端可以使用SDK,通过HTTP/2通道,接收物联网平台推送的设备消息。本文档为服务端订阅功能的开发指南,介绍了Java版本的开发方法。

背景信息

服务端HTTP/2 SDK Demo下载:HTTP/2 SDK (Java) server side demo

配置服务端订阅

在物联网平台控制台配置要订阅的消息类型。

  1. 登录物联网平台控制台
  2. 实例概览页面,找到对应的实例,单击实例进入实例详情页面。
    实例概览
  3. 左侧导航栏选择设备管理 > 产品
  4. 在产品列表中,搜索到要配置服务端订阅的产品,并单击该产品对应的查看按钮,进入产品详情页。
  5. 单击服务端订阅页签,然后单击创建订阅
    若您已添加订阅,可单击前往编辑,在服务端订阅页面的订阅列表下,单击对应产品的编辑,修改订阅的消息类型。
  6. 选择订阅的消息类型。
    控制台截图
    • 设备上报消息:指产品下所有设备 Topic 列表中,具有发布权限的 Topic 中的消息。勾选后,可以通过 HTTP/2 SDK 接收这些消息。

      设备上报消息,包括设备上报的自定义数据和物模型数据(属性上报、事件上报、属性设置响应和服务调用响应)。

      例如,一个产品有3个Topic类,分别是:
      • /${YourProductKey}/${YourDeviceName}/user/get,具有订阅权限。
      • /${YourProductKey}/${YourDeviceName}/user/update,具有发布权限。
      • /sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post,具有发布权限。

      那么,服务端订阅会推送具有发布权限的Topic类中的消息,即/${YourProductKey}/${YourDeviceName}/user/update/sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post中的消息。其中,/sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post中的数据已经过系统处理。

    • 设备状态变化通知:指一旦该产品下的设备状态变化时通知的消息,例如设备上线、下线的消息。设备状态消息的发送Topic为 /as/mqtt/status/${YourProductKey}/${YourDeviceName}。勾选后,可以通过HTTP/2 SDK接收设备状态变化的通知消息。
    • 网关发现子设备上报:网关可以将发现的子设备信息上报给物联网平台。需要网关上的应用程序支持。网关产品特有消息类型。
    • 设备拓扑关系变更:指子设备和网关之间的拓扑关系建立和解除消息。网关产品特有消息类型。
    • 设备生命周期变更:包括设备创建、删除、禁用、启用等消息。
    说明 设备上报物模型消息、设备状态变化通知、网关发现子设备上报消息、设备拓扑关系变更消息和设备生命周期变更消息均默认为QoS=0;设备上报消息(除物模型相关消息外),您可以在设备端上设置为QoS=0或QoS=1。

接入 SDK

在Maven工程项目中添加以下依赖,安装阿里云IoT SDK。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>iot-client-message</artifactId>
    <version>1.1.3</version>
</dependency>

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-core</artifactId>
    <version>3.7.1</version>
</dependency>

身份认证

使用服务端订阅功能,需要基于您的阿里云 AccessKey 进行身份认证并建立连接。

建立连接示例如下:

// 阿里云accessKey
String accessKey = "xxxxxxxxxxxxxxx";
// 阿里云accessSecret
String accessSecret = "xxxxxxxxxxxxxxx";
// regionId
String regionId = "cn-shanghai";
// 阿里云uid
String uid = "xxxxxxxxxxxx";
// endPoint:  https://${uid}.iot-as-http2.${region}.aliyuncs.com
String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".aliyuncs.com";

// 连接配置
Profile profile = Profile.getAccessKeyProfile(endPoint, regionId, accessKey, accessSecret);

// 构造客户端
MessageClient client = MessageClientFactory.messageClient(profile);
// 数据接收
client.connect(messageToken -> {
    Message m = messageToken.getMessage();
    System.out.println("receive message from " + m);
    return MessageCallback.Action.CommitSuccess;
});

以上示例中账号相关信息的获取方法见下表。

参数 获取途径
accessKey 您的账号AccessKey ID。

登录阿里云控制台,将光标移至账号头像上,然后单击accesskeys,跳转至用户信息管理页,即可获取。

accessSecret 您的账号AccessKey Secret。获取方式同accessKey。
uid 您的账号ID。

用主账号登录阿里云控制台,将鼠标指针移动到账号头像,即可获取账号ID。

regionId 您的物联网平台服务所在地域代码。

在物联网平台控制台页,右上方即可查看地域(Region)。RegionId 的表达方法,请参见地域和可用区

调用连接相关的方法

方法 说明
connect方法 调用该方法建立服务端HTTP/2 SDK和物联网平台的连接。连接建立后,服务端便可以收到物联网平台推送的消息。
void connect(MessageCallback callback);
disconnect方法 调用该方法断开服务端HTTP/2 SDK与物联网平台连接,停止接收消息。
void disconnect();
isConnected方法 调用该方法判断HTTP/2 SDK是否已连接物联网平台。
boolean isConnected();
setConnectionCallback方法 调用该方法设置连接状态监听回调。
void setConnectionCallback(ConnectionCallback var1);

示例:

client.setConnectionCallback(new ConnectionCallback() {
    @Override
    public void onConnectionLost() {
        System.out.println("连接断开");
    }

    @Override
    public void onConnected(boolean isReconnected) {
        System.out.println("连接成功,是否为重连: " + isReconnected);

    }
});

设置消息接收接口

连接建立后,服务端会立即向SDK推送已订阅的消息。因此,建立连接时,需要提供消息接收接口,用于处理未设置回调的消息。建议在连接之前,调用 setMessageListener 设置消息回调。

您需要通过 MessageCallback 接口的consume方法,和调用messageClientsetMessageListener()方法来设置消息接收接口。

consume方法的返回值决定SDK是否发送 ACK(acknowledgement,即回复确认消息)。

设置消息接收接口的方法如下:

MessageCallback messageCallback = new MessageCallback() {
    @Override
    public Action consume(MessageToken messageToken) {
        Message m = messageToken.getMessage();
        log.info("receive : " + new String(messageToken.getMessage().getPayload()));
        return MessageCallback.Action.CommitSuccess;
    }
};
messageClient.setMessageListener("/${YourProductKey}/#",messageCallback);

其中:

  • 参数MessageToken指消息回执的消息体。通过MessageToken.getMessage()可获取消息体。MessageToken可以用于手动回复ACK。

    消息体包含的内容如下:

    public class Message {
        // 消息体
        private byte[] payload;
        // Topic 
        private String topic;
        // 消息ID
        private String messageId;
        // QoS
        private int qos;
    }

    各类型消息的具体格式,请参见数据格式

    说明 关于设备上下线状态,为避免消息时序紊乱造成影响,建议您根据消息中的lastTime字段来判断最终设备状态。
  • 示例中,messageClient.setMessageListener("/${YourProductKey}/#",messageCallback);用于设置指定Topic回调。

    您可以设置为指定Topic回调,也可以设置为通用回调。

    • 指定Topic回调

      指定Topic回调的优先级高于通用回调。一条消息匹配到多个Topic时,按字典顺序优先调用,并且仅回调一次。

      设置回调时,可以指定带通配符的Topic,如/${YourProductKey}/${YourDeviceName}/# 。

      示例:
      messageClient.setMessageListener("/alEddfaXXXX/device1/#",messageCallback);
      //当收到消息的Topic,如"/alEddfaXXXX/device1/update",匹配指定Topic时,会优先调用该回调
    • 通用回调

      未指定Topic回调的消息,则调用通用回调。

      设置通用回调方法:
      messageClient.setMessageListener(messageCallback);
      //当收到消息topic未匹配到已定指的Topic 回调时,调用该回调
  • 设置回复ACK。

    QOS=1的消息消费后,需要回复ACK。支持设置为自动回复ACK和手动回复ACK。默认为自动回复 ACK。本示例中未设置回复 ACK,则默认为自动回复。

    • 自动回复ACK:设置为自动回复ACK后,若MessageCallback.consume的返回值为true, SDK会发送ACK;返回false或抛出异常,则不会返回ACK。对于QOS=1且未回复ACK的消息,服务器会重新发送。
    • 手动回复ACK方法:
      CompletableFuture<Boolean> ack(MessageToken var1);

      调用该方法的请求参数为MessageToken,表示需要回复ACK的消息。该参数值从MessageCallback回调信息中获取。

      设置回复ACK示例:

      client.connect(messageToken -> {
          System.out.println(messageToken.getMessage());
          asyncHandleMessage(messageToken);
          return MessageCallback.Action.CommitAckManually;
      });
      public void asyncHandleMessage(MessageToken messageToken) {
        client.ack(messageToken)
      }

相关文档

  • 服务端订阅使用限制说明,请参见使用限制
  • 查看各类型消息的具体格式,请参见数据格式