使用X.509证书认证接入示例

本文以Java代码为示例,介绍基于MQTT通信协议的设备,使用物联网平台颁发的X.509证书进行认证并接入物联网平台。

使用限制

  • MQTT协议直连的设备仅在华东2(上海)地域的旧版公共实例下可使用X.509证书认证。

    本文示例为旧版公共实例下MQTT协议设备使用X.509证书认证。

  • 尊享型企业版实例下的设备可通过云网关协议使用X.509证书认证。具体内容,请参见使用云网关协议接入

准备开发环境

示例使用的开发环境如下:

步骤一:创建产品和设备

  1. 登录物联网平台控制台

  2. 在控制台左上方选择地域为华东2(上海),然后在实例概览页面,单击公共实例

  3. 在左侧导航栏,选择设备管理 > 产品,单击创建产品

  4. 新建产品页签,按照页面提示填写信息,然后单击确认

    本示例中创建产品X509产品认证方式选择X.509证书,参数配置如下图所示。参数详细说明,请参见创建产品创建产品

  5. 产品创建成功后,单击添加设备下的前往添加,创建设备。具体操作,请参见单个创建设备

    添加设备

    设备创建成功后,可在设备详情页面,查看该设备的X.509证书。设备详情

  6. 单击X.509对应的下载,下载证书。

    在开发设备端时,需导入证书文件。

步骤二:配置设备端依赖库

  1. 打开IntelliJ IDEA,创建一个Maven工程。例如X509-demo

  2. 在工程中的pom.xml文件中,添加Maven依赖,然后单击Load Maven Changes图标,完成依赖包下载。

    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.1</version>
    </dependency>
    
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.83</version>
    </dependency>
    
    <dependency>
      <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.13</version>
    </dependency>

步骤三:导入设备证书文件

Java maven工程X509-demoresource目录下,导入以下证书文件。

  • 将从设备详情下载的X.509证书包解压缩,获取证书文件****.cer****.key,并将这两个文件放置到resource目录下。

    说明

    物联网平台提供的证书私钥是PKCS#1格式,而Java原生代码只能使用PKCS#8格式。您可以使用 OpenSSL工具进行转换,命令如下:

    openssl pkcs8 -topk8 -in devicex509.key -nocrypt -out devicex509_pkcs8.key
  • 使用TLS方式(securemode=2)将设备接入物联网平台,需使用物联网平台根证书。

    下载Global Sign R1根证书,然后将根证书放置到resource目录下。

    说明

    CA根证书有效期到20280128日,连接1883端口。证书失效后将无法再用于校验服务器。因此,请确保所有使用TLS加密的设备,均具备更新CA根证书的功能。

步骤四:编写代码

  1. Java maven工程X509-demo的路径/src/main/java下,创建Java类。例如X509Test.java

  2. X509Test.java中,输入代码。

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileReader;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.nio.charset.StandardCharsets;
    import java.security.KeyFactory;
    import java.security.KeyStore;
    import java.security.PrivateKey;
    import java.security.cert.Certificate;
    import java.security.cert.CertificateFactory;
    import java.security.spec.PKCS8EncodedKeySpec;
    
    import javax.net.ssl.KeyManagerFactory;
    import javax.net.ssl.SSLContext;
    import javax.net.ssl.SSLSocketFactory;
    import javax.net.ssl.TrustManagerFactory;
    
    import org.apache.commons.codec.binary.Base64;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import com.alibaba.fastjson.JSONObject;
    
    public class X509Test {
        /** 地域ID:目前仅华东2(上海)支持X.509认证。 */
        private static String regionId = "cn-shanghai"; 
    
        /** 根证书*/
        private static String rootPath = "";
        /** 设备证书*/
        private static String certPath = "";
        /** 设备证书私钥*/
        private static String privateKeyPath = "";
        /** 密码固定为空*/
        private static String privateKeyPassword = "";
        /** X.509认证返回信息的Topic。无需创建,无需订阅,直接使用。*/
        private static final String AUTH_TOPIC = "/ext/auth/identity/response";
        /** 设备productKey,用于接收物联网平台下发的productKey,无需填写。*/
        private static String productKey = "";
        /** 设备deviceName,用于接收物联网平台下发的deviceName,无需填写。*/
        private static String deviceName = "";
    
        /** MQTT客户端*/
        private MqttClient sampleClient = null;
    
        /**
         * 建立MQTT连接
         */
        public void connect() {
    
            /** 接入域名*/
            String broker = "ssl://x509.itls." + regionId + ".aliyuncs.com:1883";
    
            /** 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内。*/
            String clientId = ".";
    
            /** 只支持securemode=2,表示使用TLS。*/
            String clientOpts = "|securemode=2|";
    
            /** MQTT接入客户端ID */
            String mqttClientId = clientId + clientOpts;
    
            /** 建立MQTT连接。使用X.509证书认证,所以不需要usernamepassword。*/
            connect(broker, mqttClientId, "", "");
        }
    
        /**
         * 建立MQTT连接
         *
         * @param serverURL 连接服务器地址
         * @param clientId  MQTT接入客户端ID
         * @param username  MQTT接入用户名
         * @param password  MQTT接入密码
         */
        protected void connect(String serverURL, String clientId, String username, String password) {
            try {
                MemoryPersistence persistence = new MemoryPersistence();
                sampleClient = new MqttClient(serverURL, clientId, persistence);
                MqttConnectOptions connOpts = new MqttConnectOptions();
                // MQTT 3.1.1
                connOpts.setMqttVersion(4); 
                // 用户名
                connOpts.setUserName(username);
                // 密码
                connOpts.setPassword(password.toCharArray());
                 // 使用TLS,需要下载根证书root.crt,设置securemode=2。
                connOpts.setSocketFactory(createSSLSocket());
                // 不清理离线消息。qos=1的消息,在设备离线期间会保存在云端。
                connOpts.setCleanSession(false); 
                // 本demo关闭自动重连。强烈建议生产环境开启自动重连。
                connOpts.setAutomaticReconnect(false);
                // 设置心跳,建议300秒。
                connOpts.setKeepAliveInterval(300); 
                // 先设置回调。如果是先connect,后设置回调,可能会导致消息到达时回调还没准备好,这样消息可能会丢失。
                sampleClient.setCallback(new MqttCallback() {
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        // 只处理X.509认证返回信息
                        if (AUTH_TOPIC.equals(topic)) {
                            JSONObject json = JSONObject
                                    .parseObject(new String(message.getPayload(), StandardCharsets.UTF_8));
                            productKey = json.getString("productKey");
                            deviceName = json.getString("deviceName");
                        } else {
                            // 处理其他下行消息,强烈建议另起线程处理,以免回调堵塞。
                        }
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                    }
    
                    @Override
                    public void connectionLost(Throwable cause) {
                    }
                });
                System.out.println("Connecting to broker: " + serverURL);
                sampleClient.connect(connOpts);
                System.out.print("Connected: clientId=" + clientId);
                System.out.println(",username=" + username + ",password=" + password);
            } catch (MqttException e) {
                System.out.print("connect failed: clientId=" + clientId);
                System.out.println(",username=" + username + ",password=" + password);
                System.out.println("reason " + e.getReasonCode());
                System.out.println("msg " + e.getMessage());
                System.out.println("loc " + e.getLocalizedMessage());
                System.out.println("cause " + e.getCause());
                System.out.println("except " + e);
                e.printStackTrace();
            } catch (Exception e) {
                System.out.print("connect exception: clientId=" + clientId);
                System.out.println(",username=" + username + ",password=" + password);
                System.out.println("msg " + e.getMessage());
                e.printStackTrace();
            }
        }
    
        /**
         * 发布消息,默认qos=0
         *
         * @param topic   发布消息的Topic
         * @param payload 发布的消息内容
         */
        public void publish(String topic, String payload) {
            byte[] content = payload.getBytes(StandardCharsets.UTF_8);
            publish(topic, 0, content);
        }
    
        /**
         * 发布消息
         *
         * @param topic   发布消息的Topic
         * @param qos     消息等级,平台支持qos=0qos=1,不支持qos=2。
         * @param payload 发布的消息内容
         */
        public void publish(String topic, int qos, byte[] payload) {
            MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
            try {
                sampleClient.publish(topic, message);
                System.out.println("Message published: topic=" + topic + ",qos=" + qos);
            } catch (MqttException e) {
                System.out.println("publish failed: topic=" + topic + ",qos=" + qos);
                System.out.println("reason " + e.getReasonCode());
                System.out.println("msg " + e.getMessage());
                System.out.println("loc " + e.getLocalizedMessage());
                System.out.println("cause " + e.getCause());
                System.out.println("except " + e);
                e.printStackTrace();
            }
        }
    
        protected SSLSocketFactory createSSLSocket() throws Exception {
    
            /** 物联网平台根证书,可以从官网文档中下载https://help.aliyun.com/document_detail/73742.html*/
            /** 设备X.509证书,可以从控制台设备信息中下载。*/
    
            /** CA certificate is used to authenticate server */
            InputStream in = new FileInputStream(new File(rootPath));
            CertificateFactory cf = CertificateFactory.getInstance("X.509");
            Certificate ca = cf.generateCertificate(in);
            in.close();
            KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
            keyStore.load(null, null);
            keyStore.setCertificateEntry("ca", ca);
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            tmf.init(keyStore);
    
            /** client key and certificates are sent to server so it can authenticate us */
            FileInputStream certIn = new FileInputStream(new File(certPath));
            CertificateFactory certCf = CertificateFactory.getInstance("X.509");
            Certificate certCa = certCf.generateCertificate(certIn);
            certIn.close();
            KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
            ks.load(null, null);
            ks.setCertificateEntry("certificate", certCa);
            PrivateKey privateKey = getPrivateKey(privateKeyPath);
            ks.setKeyEntry("private-key", privateKey, privateKeyPassword.toCharArray(), new Certificate[] {certCa});
            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            kmf.init(ks, privateKeyPassword.toCharArray());
    
            SSLContext context = SSLContext.getInstance("TLSV1.2");
            context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
            SSLSocketFactory socketFactory = context.getSocketFactory();
            return socketFactory;
        }
    
        private PrivateKey getPrivateKey(String path) throws Exception {
            byte[] buffer = Base64.decodeBase64(getPem(path));
            PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(buffer);
            KeyFactory keyFactory = KeyFactory.getInstance("RSA");
            return keyFactory.generatePrivate(keySpec);
        }
    
        private String getPem(String path) throws Exception {
            InputStream in = new FileInputStream(new File(path));
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            String readLine = null;
            StringBuilder sb = new StringBuilder();
            while ((readLine = br.readLine()) != null) {
                if (readLine.charAt(0) == '-') {
                    continue;
                } else {
                    sb.append(readLine);
                    sb.append('\r');
                }
            }
            in.close();
            return sb.toString();
        }
    
        /**
         * 连接成功后,物联网平台会下发productKeydeviceName信息到/ext/auth/identity/response,用于组装Topic进行消息收发。
         *
         * @param args
         */
        public static void main(String[] args) {
    
            X509Test client = new X509Test();
    
            /** 填写设备证书路径信息 */
            client.connect();
    
            /** 连接成功之后,休眠两秒,为保证接收云端下发的productKeydeviceName,不然消息收发TopicproductKeydeviceName字段可能为空。*/
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            /**  发送消息,验证连接是否成功。*/
            String updateTopic = "/" + productKey + "/" + deviceName + "/user/update";
            client.publish(updateTopic, "hello mqtt with X.509 auth");
        }
    }
    

  3. 在以上代码中配置设备证书和证书私钥的路径。

    本示例工程X509-demoresource目录下证书文件为root.crtx509device.cerdevicex509_pkcs8.key,填入对应证书文件路径。

        //根证书
        private static String rootPath = "***/resource/root.crt";
        // 设备证书
        private static String certPath = "***/resource/x509device.cer";
        // 设备证书私钥
        private static String privateKeyPath = "***/resource/devicex509_pkcs8.key";

步骤五:运行程序

运行X590Test.java主程序。

运行成功日志如下:

Connecting to broker: ssl://x509.itls.cn-shanghai.aliyuncs.com:1883
Connected: clientId=.|securemode=2|,username=,password=
Message published: topic=/a1****m/X509device/user/update,qos=0

您可登录物联网平台控制台,在公共实例下查看设备状态和日志。

  • 选择设备管理 > 设备,可看到该设备的状态显示为在线设备在线

  • 选择监控运维 > 日志服务,可查看相关日志。日志