本文以Java代码示例,介绍基于MQTT通信协议的设备,如何使用物联网平台颁发的X.509证书进行认证并接入物联网平台。

使用限制

  • 仅MQTT协议直连的设备可使用X.509证书认证。
  • 目前仅华东2(上海)地域下旧版公共实例支持X.509证书认证。

准备开发环境

示例使用的开发环境如下:

步骤一:创建产品和设备

  1. 登录物联网平台控制台
  2. 在控制台左上方选择地域为华东2(上海),然后在实例概览页面,单击公共实例
  3. 在左侧导航栏,选择设备管理 > 产品,单击创建产品
  4. 新建产品页签,按照页面提示填写信息,然后单击确认
    本示例中创建产品X509产品认证方式选择X.509证书,参数配置如下图所示。参数详细说明,请参见创建产品创建产品
  5. 产品创建成功后,单击添加设备下的前往添加,创建设备。具体操作,请参见单个创建设备
    添加设备
    设备创建成功后,可在设备详情页面,查看该设备的X.509证书。设备详情
  6. 单击X.509对应的下载,下载证书。
    在开发设备端时,需导入证书文件。

步骤二:配置设备端依赖库

  1. 打开IntelliJ IDEA,创建一个Maven工程。例如X509-demo
  2. 在工程中的pom.xml文件中,添加Maven依赖,然后单击Load Maven Changes图标,完成依赖包下载。
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.1</version>
    </dependency>
    
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.61</version>
    </dependency>
    
    <dependency>
      <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.13</version>
    </dependency>

步骤三:导入设备证书文件

在Java maven工程X509-demoresource目录下,导入以下证书文件。

  • 将从设备详情下载的X.509证书包解压缩,获取证书文件****.cer****.key,并将这两个文件放置到resource目录下。
    说明 物联网平台提供的证书私钥是PKCS#1格式,而Java原生代码只能使用PKCS#8格式。您可以使用 OpenSSL工具进行转换,命令如下:
    openssl pkcs8 -topk8 -in devicex509.key -nocrypt -out devicex509_pkcs8.key
  • 使用TLS方式(securemode=2)将设备接入物联网平台,需使用物联网平台根证书。

    下载根证书,然后将根证书放置到resource目录下。

步骤四:编写代码

  1. 在Java maven工程X509-demo的路径/src/main/java下,创建Java类。例如IotMqttClientWithAuthByX509.java
  2. IotMqttClientWithAuthByX509.java中,输入代码。
    import java.io.BufferedReader;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.nio.charset.StandardCharsets;
    import java.security.KeyFactory;
    import java.security.KeyStore;
    import java.security.PrivateKey;
    import java.security.cert.Certificate;
    import java.security.cert.CertificateFactory;
    import java.security.spec.PKCS8EncodedKeySpec;
    
    import javax.net.ssl.KeyManagerFactory;
    import javax.net.ssl.SSLContext;
    import javax.net.ssl.SSLSocketFactory;
    import javax.net.ssl.TrustManagerFactory;
    
    import org.apache.commons.codec.binary.Base64;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import com.alibaba.fastjson.JSONObject;
    
    /**
     * MQTT客户端直连阿里云物联网平台,基于Eclipse Paho开发。
     * 基于X.509认证接入文档:https://help.aliyun.com/document_detail/140588.html
     */
    public class IotMqttClientWithAuthByX509 {
    
        // 地域ID
        private static String regionId = "cn-shanghai"; // 目前仅华东2(上海)支持X.509认证。
    
        // 设备证书
        private String certPath = "";
        // 设备证书私钥
        private String privateKeyPath = "";
        // 密码固定为空
        private String privateKeyPassword = "";
        // X.509认证返回信息的Topic。无需创建,无需订阅,直接使用。
        private static final String AUTH_TOPIC = "/ext/auth/identity/response";
        // 设备productKey,用于接收物联网平台下发的productKey,无需填写。
        private static String productKey = "";
        // 设备deviceName,用于接收物联网平台下发的deviceName,无需填写。
        private static String deviceName = "";
    
        // MQTT客户端
        private MqttClient sampleClient = null;
    
        /**
         * 建立MQTT连接
         * 
         * @param certPath 证书路径
         * @param privateKeyPath 私钥路径
         * @param privateKeyPassword 私钥密码,目前固定为空
         */
        public void connect(String certPath, String privateKeyPath, String privateKeyPassword) {
    
            this.certPath = certPath;
            this.privateKeyPath = privateKeyPath;
            this.privateKeyPassword = privateKeyPassword;
    
            // 接入域名
            String broker = "ssl://x509.itls." + regionId + ".aliyuncs.com:1883";
    
            // 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内。
            String clientId = ".";
    
            // 只支持securemode=2,表示使用TLS。
            String clientOpts = "|securemode=2|";
    
            // MQTT接入客户端ID
            String mqttClientId = clientId + clientOpts;
    
            // 建立MQTT连接。使用X.509证书认证,所以不需要username和password。
            connect(broker, mqttClientId, "", "");
        }
    
        /**
         * 建立MQTT连接
         * 
         * @param serverURL 连接服务器地址
         * @param clientId MQTT接入客户端ID
         * @param username MQTT接入用户名
         * @param password MQTT接入密码
         */
        protected void connect(String serverURL, String clientId, String username, String password) {
            try {
                MemoryPersistence persistence = new MemoryPersistence();
                sampleClient = new MqttClient(serverURL, clientId, persistence);
                MqttConnectOptions connOpts = new MqttConnectOptions();
                connOpts.setMqttVersion(4);// MQTT 3.1.1
                connOpts.setUserName(username);// 用户名
                connOpts.setPassword(password.toCharArray());// 密码
                connOpts.setSocketFactory(createSSLSocket()); // 使用TLS,需要下载根证书root.crt,设置securemode=2。
                connOpts.setCleanSession(false); // 不清理离线消息。qos=1的消息,在设备离线期间会保存在云端。
                connOpts.setAutomaticReconnect(false); // 本demo关闭自动重连。强烈建议生产环境开启自动重连。
                connOpts.setKeepAliveInterval(300); // 设置心跳,建议300秒。
                // 先设置回调。如果是先connect,后设置回调,可能会导致消息到达时回调还没准备好,这样消息可能会丢失。
                sampleClient.setCallback(new MqttCallback() {
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        // 只处理X.509认证返回信息
                        if (AUTH_TOPIC.equals(topic)) {
                            JSONObject json = JSONObject
                                    .parseObject(new String(message.getPayload(), StandardCharsets.UTF_8));
                            productKey = json.getString("productKey");
                            deviceName = json.getString("deviceName");
                        } else {
                            // 处理其他下行消息,强烈建议另起线程处理,以免回调堵塞。
                        }
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                    }
    
                    @Override
                    public void connectionLost(Throwable cause) {
                    }
                });
                System.out.println("Connecting to broker: " + serverURL);
                sampleClient.connect(connOpts);
                System.out.print("Connected: clientId=" + clientId);
                System.out.println(",username=" + username + ",password=" + password);
            } catch (MqttException e) {
                System.out.print("connect failed: clientId=" + clientId);
                System.out.println(",username=" + username + ",password=" + password);
                System.out.println("reason " + e.getReasonCode());
                System.out.println("msg " + e.getMessage());
                System.out.println("loc " + e.getLocalizedMessage());
                System.out.println("cause " + e.getCause());
                System.out.println("excep " + e);
                e.printStackTrace();
            } catch (Exception e) {
                System.out.print("connect exception: clientId=" + clientId);
                System.out.println(",username=" + username + ",password=" + password);
                System.out.println("msg " + e.getMessage());
                e.printStackTrace();
            }
        }
    
        /**
         * 发布消息,默认qos=0
         * 
         * @param topic 发布消息的Topic
         * @param payload 发布的消息内容
         */
        public void publish(String topic, String payload) {
            byte[] content = payload.getBytes(StandardCharsets.UTF_8);
            publish(topic, 0, content);
        }
    
        /**
         * 发布消息
         * 
         * @param topic 发布消息的Topic
         * @param qos 消息等级,平台支持qos=0和qos=1,不支持qos=2。
         * @param payload 发布的消息内容
         */
        public void publish(String topic, int qos, byte[] payload) {
            MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
            try {
                sampleClient.publish(topic, message);
                System.out.println("Message published: topic=" + topic + ",qos=" + qos);
            } catch (MqttException e) {
                System.out.println("publish failed: topic=" + topic + ",qos=" + qos);
                System.out.println("reason " + e.getReasonCode());
                System.out.println("msg " + e.getMessage());
                System.out.println("loc " + e.getLocalizedMessage());
                System.out.println("cause " + e.getCause());
                System.out.println("excep " + e);
                e.printStackTrace();
            }
        }
    
        protected SSLSocketFactory createSSLSocket() throws Exception {
    
            // 物联网平台根证书,可以从官网文档中下载https://help.aliyun.com/document_detail/73742.html
            // 设备X.509证书,可以从控制台设备信息中下载。
    
            // CA certificate is used to authenticate server
            InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream("/root.crt");
            CertificateFactory cf = CertificateFactory.getInstance("X.509");
            Certificate ca = cf.generateCertificate(in);
            in.close();
            KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
            keyStore.load(null, null);
            keyStore.setCertificateEntry("ca", ca);
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            tmf.init(keyStore);
    
            // client key and certificates are sent to server so it can authenticate us
            InputStream certIn = IotMqttClientWithAuthByX509.class.getResourceAsStream(certPath);
            CertificateFactory certCf = CertificateFactory.getInstance("X.509");
            Certificate certCa = certCf.generateCertificate(certIn);
            certIn.close();
            KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
            ks.load(null, null);
            ks.setCertificateEntry("certificate", certCa);
            PrivateKey privateKey = getPrivateKey(privateKeyPath);
            ks.setKeyEntry("private-key", privateKey, privateKeyPassword.toCharArray(), new Certificate[] { certCa });
            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            kmf.init(ks, privateKeyPassword.toCharArray());
    
            SSLContext context = SSLContext.getInstance("TLSV1.2");
            context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
            SSLSocketFactory socketFactory = context.getSocketFactory();
            return socketFactory;
        }
    
        private PrivateKey getPrivateKey(String path) throws Exception {
            byte[] buffer = Base64.decodeBase64(getPem(path));
            PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(buffer);
            KeyFactory keyFactory = KeyFactory.getInstance("RSA");
            return keyFactory.generatePrivate(keySpec);
        }
    
        private String getPem(String path) throws Exception {
            InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream(path);
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            String readLine = null;
            StringBuilder sb = new StringBuilder();
            while ((readLine = br.readLine()) != null) {
                if (readLine.charAt(0) == '-') {
                    continue;
                } else {
                    sb.append(readLine);
                    sb.append('\r');
                }
            }
            in.close();
            return sb.toString();
        }
    
        /**  
         * 连接成功后,物联网平台会下发productKey和deviceName信息到/ext/auth/identity/response,用于组装Topic进行消息收发。 
         * 
         * @param args
         */
        public static void main(String[] args) {
    
            IotMqttClientWithAuthByX509 client = new IotMqttClientWithAuthByX509();
    
            // 填写设备证书路径信息
            client.connect("您的设备证书路径", "您的证书私钥路径", "");
    
            // 连接成功之后,休眠两秒,为保证接收云端下发的productKey和deviceName,不然消息收发Topic的productKey和deviceName字段可能为空。
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            // 发送消息,验证连接是否成功。
            String updateTopic = "/" + productKey + "/" + deviceName + "/user/update";
            client.publish(updateTopic, "hello mqtt with X.509 auth");
        }
    }
  3. 在以上代码中配置设备证书和证书私钥的路径。
    本示例工程X509-demoresource目录下证书文件为x509device.cerdevicex509_pkcs8.key
    ......
    // 设备证书
    private String certPath = "/x509device.cer";
    // 设备证书私钥
    private String privateKeyPath = "/devicex509_pkcs8.key";
    
    ......
    ......
    
    public static void main(String[] args) {
        ......
        // 填写设备证书路径信息
        client.connect("/x509device.cer", "/devicex509_pkcs8.key", "");
        ......
    }

步骤五:运行程序

运行IotMqttClientWithAuthByX509.java主程序。

运行成功日志如下:

Connecting to broker: ssl://x509.itls.cn-shanghai.aliyuncs.com:1883
Connected: clientId=.|securemode=2|,username=,password=
Message published: topic=/a1****m/X509device/user/update,qos=0
您可登录物联网平台控制台,在公共实例下查看设备状态和日志。
  • 选择设备管理 > 设备,可看到该设备的状态显示为在线设备在线
  • 选择监控运维 > 日志服务,可查看相关日志。日志