全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

MQTT Token 客户端接口

更新时间:2018-01-23 21:00:41

本文介绍 MQTT 客户端上传 Token、监听 Token 失效信息、监听 Token 非法信息三个接口的使用,并给出相关示例代码以供参考。

客户端上传 Token 凭证

发送 Topic:$SYS/uploadToken

内容:JSONString

内容信息:

名称 类型 说明
Token String 如果客户端选用 Token 模式,则需要上传 Token 字符串。
type String Token 类型,分为 W,R,RW 共三种,对应三种权限类型的 Token。一个客户端最多拥有这3个 Token,设置错误的类型会导致权限校验错误。

返回值

普通的 PubAck 报文。客户端必须等到该响应才能进行下一步 Pub 或者 Sub 操作,否则服务端可能会鉴权失败导致连接断开。

客户端监听即将失效的 Token 信息

接收 Topic:$SYS/tokenExpireNotice

内容:JSONString

内容信息:

名称 类型 说明
expireTime int 该 Token 即将于什么时候失效,一般提前5分钟通知,只通知一次。
type String Token 类型,分为 W,R,RW 共三种,对应客户端上传的三种权限类型的 Token。

响应:

客户端收到 Token 即将失效的消息后,需要尽快处理重新申请 Token 的动作,以免造成收发消息失败。

客户端监听 Token 非法的通知

接收 Topic:$SYS/tokenInvalidNotice

内容:JSONString

内容信息:

名称 类型 说明
code int Token 校验失败的类型。
type String Token 类型,分为 W,R,RW 共三种,对应客户端上传的三种权限类型的 Token。

响应:

服务端校验 Token 失效,会导致鉴权失败,服务端会主动断开链接。断开链接之前,服务端会给客户端推送失败的 code,客户端根据 code 即可判断原因。

type code 错误类型
1 伪造 Token,不可解析
2 Token 已经过期
3 Token 已经被吊销
4 资源和 Token 不匹配
5 权限类型和 Token 不匹配
8 签名不合法
-1 帐号权限不合法

注意:

  • 服务端下推 Token 非法的通知后,会接着断开连接。正常情况下客户端应该能根据该通知判断是否需要申请 Token。
  • 异常情况下,可能尚未收到通知,即已经断开连接。此时客户端需要判断原先的 Token 是否到期,如果确实接近到期时间,则先申请 Token 再重连。

示例程序

public static void main(String[] args) throws MqttException, InterruptedException {
        String broker = "tcp://XXXX:1883";
        String clientId = "GID_XXX@@@XXXX";
        final String topic = "XXXX/XXXXX";
        MemoryPersistence persistence = new MemoryPersistence();
        final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        final MqttConnectOptions connOpts = new MqttConnectOptions();
        System.out.println("Connecting to broker: " + broker);
        connOpts.setServerURIs(new String[] {broker});
        connOpts.setCleanSession(true);
        connOpts.setKeepAliveInterval(90);
        sampleClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                //连接成功,需重新上传 Token
                JSONObject object = new JSONObject();
                object.put("token", "XXXX");//设置 Token 内容
                object.put("type", "RW");//设置 Token 类型,共有 RW,R,W 三种类型
                MqttMessage message = new MqttMessage(object.toJSONString().getBytes());
                message.setQos(1);
                try {
                    sampleClient.publish("$SYS/uploadToken", message);
                    sampleClient.subscribe(topic, 0);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("mqtt connection lost");
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                if (topic.equals("$SYS/tokenInvalidNotice")) {
                    //Token 非法的通知
                } else if (topic.equals("$SYS/tokenExpireNotice")) {
                    //Token 即将失效的通知
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
            }
        });
        sampleClient.connect(connOpts);
        JSONObject object = new JSONObject();
        object.put("token", "XXXX");//设置 Token 内容
        object.put("type", "RW");//设置 Token 类型,共有 RW,R,W 三种类型
        MqttMessage message = new MqttMessage(object.toJSONString().getBytes());
        message.setQos(1);
        MqttTopic pubTopic = sampleClient.getTopic("$SYS/uploadToken");
        MqttDeliveryToken token = pubTopic.publish(message);
        token.waitForCompletion();//同步等待上传结果
        //Token 上传成功,在开始正常的业务消息收发
        sampleClient.subscribe(topic, 0);
        while (true) {
            sampleClient.publish(topic, message);
            Thread.sleep(5000);
        }
    }
本文导读目录