本文以Java代码为示例,介绍基于MQTT通信协议的设备,使用物联网平台颁发的X.509证书进行认证并接入物联网平台。
使用限制
MQTT协议直连的设备仅在华东2(上海)地域的旧版公共实例下可使用X.509证书认证。
本文示例为旧版公共实例下MQTT协议设备使用X.509证书认证。
尊享型企业版实例下的设备可通过云网关协议使用X.509证书认证。具体内容,请参见使用云网关协议接入。
准备开发环境
示例使用的开发环境如下:
操作系统:Windows10
JDK版本:JDK8
集成开发环境:IntelliJ IDEA社区版
步骤一:创建产品和设备
步骤二:配置设备端依赖库
打开IntelliJ IDEA,创建一个Maven工程。例如X509-demo。
在工程中的pom.xml文件中,添加Maven依赖,然后单击Load Maven Changes图标,完成依赖包下载。
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.13</version> </dependency>
步骤三:导入设备证书文件
在Java maven工程X509-demo的resource目录下,导入以下证书文件。
将从设备详情下载的X.509证书包解压缩,获取证书文件****.cer和****.key,并将这两个文件放置到resource目录下。
说明物联网平台提供的证书私钥是PKCS#1格式,而Java原生代码只能使用PKCS#8格式。您可以使用 OpenSSL工具进行转换,命令如下:
openssl pkcs8 -topk8 -in devicex509.key -nocrypt -out devicex509_pkcs8.key
使用TLS方式(securemode=2)将设备接入物联网平台,需使用物联网平台根证书。
请下载Global Sign R1根证书,然后将根证书放置到resource目录下。
说明该CA根证书有效期到2028年01月28日,连接1883端口。证书失效后将无法再用于校验服务器。因此,请确保所有使用TLS加密的设备,均具备更新CA根证书的功能。
步骤四:编写代码
在Java maven工程X509-demo的路径/src/main/java下,创建Java类。例如X509Test.java。
在X509Test.java中,输入代码。
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.security.KeyFactory; import java.security.KeyStore; import java.security.PrivateKey; import java.security.cert.Certificate; import java.security.cert.CertificateFactory; import java.security.spec.PKCS8EncodedKeySpec; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; import org.apache.commons.codec.binary.Base64; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import com.alibaba.fastjson.JSONObject; public class X509Test { /** 地域ID:目前仅华东2(上海)支持X.509认证。 */ private static String regionId = "cn-shanghai"; /** 根证书*/ private static String rootPath = ""; /** 设备证书*/ private static String certPath = ""; /** 设备证书私钥*/ private static String privateKeyPath = ""; /** 密码固定为空*/ private static String privateKeyPassword = ""; /** X.509认证返回信息的Topic。无需创建,无需订阅,直接使用。*/ private static final String AUTH_TOPIC = "/ext/auth/identity/response"; /** 设备productKey,用于接收物联网平台下发的productKey,无需填写。*/ private static String productKey = ""; /** 设备deviceName,用于接收物联网平台下发的deviceName,无需填写。*/ private static String deviceName = ""; /** MQTT客户端*/ private MqttClient sampleClient = null; /** * 建立MQTT连接 */ public void connect() { /** 接入域名*/ String broker = "ssl://x509.itls." + regionId + ".aliyuncs.com:1883"; /** 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内。*/ String clientId = "."; /** 只支持securemode=2,表示使用TLS。*/ String clientOpts = "|securemode=2|"; /** MQTT接入客户端ID */ String mqttClientId = clientId + clientOpts; /** 建立MQTT连接。使用X.509证书认证,所以不需要username和password。*/ connect(broker, mqttClientId, "", ""); } /** * 建立MQTT连接 * * @param serverURL 连接服务器地址 * @param clientId MQTT接入客户端ID * @param username MQTT接入用户名 * @param password MQTT接入密码 */ protected void connect(String serverURL, String clientId, String username, String password) { try { MemoryPersistence persistence = new MemoryPersistence(); sampleClient = new MqttClient(serverURL, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // MQTT 3.1.1 connOpts.setMqttVersion(4); // 用户名 connOpts.setUserName(username); // 密码 connOpts.setPassword(password.toCharArray()); // 使用TLS,需要下载根证书root.crt,设置securemode=2。 connOpts.setSocketFactory(createSSLSocket()); // 不清理离线消息。qos=1的消息,在设备离线期间会保存在云端。 connOpts.setCleanSession(false); // 本demo关闭自动重连。强烈建议生产环境开启自动重连。 connOpts.setAutomaticReconnect(false); // 设置心跳,建议300秒。 connOpts.setKeepAliveInterval(300); // 先设置回调。如果是先connect,后设置回调,可能会导致消息到达时回调还没准备好,这样消息可能会丢失。 sampleClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 只处理X.509认证返回信息 if (AUTH_TOPIC.equals(topic)) { JSONObject json = JSONObject .parseObject(new String(message.getPayload(), StandardCharsets.UTF_8)); productKey = json.getString("productKey"); deviceName = json.getString("deviceName"); } else { // 处理其他下行消息,强烈建议另起线程处理,以免回调堵塞。 } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectionLost(Throwable cause) { } }); System.out.println("Connecting to broker: " + serverURL); sampleClient.connect(connOpts); System.out.print("Connected: clientId=" + clientId); System.out.println(",username=" + username + ",password=" + password); } catch (MqttException e) { System.out.print("connect failed: clientId=" + clientId); System.out.println(",username=" + username + ",password=" + password); System.out.println("reason " + e.getReasonCode()); System.out.println("msg " + e.getMessage()); System.out.println("loc " + e.getLocalizedMessage()); System.out.println("cause " + e.getCause()); System.out.println("except " + e); e.printStackTrace(); } catch (Exception e) { System.out.print("connect exception: clientId=" + clientId); System.out.println(",username=" + username + ",password=" + password); System.out.println("msg " + e.getMessage()); e.printStackTrace(); } } /** * 发布消息,默认qos=0 * * @param topic 发布消息的Topic * @param payload 发布的消息内容 */ public void publish(String topic, String payload) { byte[] content = payload.getBytes(StandardCharsets.UTF_8); publish(topic, 0, content); } /** * 发布消息 * * @param topic 发布消息的Topic * @param qos 消息等级,平台支持qos=0和qos=1,不支持qos=2。 * @param payload 发布的消息内容 */ public void publish(String topic, int qos, byte[] payload) { MqttMessage message = new MqttMessage(payload); message.setQos(qos); try { sampleClient.publish(topic, message); System.out.println("Message published: topic=" + topic + ",qos=" + qos); } catch (MqttException e) { System.out.println("publish failed: topic=" + topic + ",qos=" + qos); System.out.println("reason " + e.getReasonCode()); System.out.println("msg " + e.getMessage()); System.out.println("loc " + e.getLocalizedMessage()); System.out.println("cause " + e.getCause()); System.out.println("except " + e); e.printStackTrace(); } } protected SSLSocketFactory createSSLSocket() throws Exception { /** 物联网平台根证书,可以从官网文档中下载https://help.aliyun.com/document_detail/73742.html*/ /** 设备X.509证书,可以从控制台设备信息中下载。*/ /** CA certificate is used to authenticate server */ InputStream in = new FileInputStream(new File(rootPath)); CertificateFactory cf = CertificateFactory.getInstance("X.509"); Certificate ca = cf.generateCertificate(in); in.close(); KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); keyStore.load(null, null); keyStore.setCertificateEntry("ca", ca); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); tmf.init(keyStore); /** client key and certificates are sent to server so it can authenticate us */ FileInputStream certIn = new FileInputStream(new File(certPath)); CertificateFactory certCf = CertificateFactory.getInstance("X.509"); Certificate certCa = certCf.generateCertificate(certIn); certIn.close(); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null, null); ks.setCertificateEntry("certificate", certCa); PrivateKey privateKey = getPrivateKey(privateKeyPath); ks.setKeyEntry("private-key", privateKey, privateKeyPassword.toCharArray(), new Certificate[] {certCa}); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, privateKeyPassword.toCharArray()); SSLContext context = SSLContext.getInstance("TLSV1.2"); context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); SSLSocketFactory socketFactory = context.getSocketFactory(); return socketFactory; } private PrivateKey getPrivateKey(String path) throws Exception { byte[] buffer = Base64.decodeBase64(getPem(path)); PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(buffer); KeyFactory keyFactory = KeyFactory.getInstance("RSA"); return keyFactory.generatePrivate(keySpec); } private String getPem(String path) throws Exception { InputStream in = new FileInputStream(new File(path)); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String readLine = null; StringBuilder sb = new StringBuilder(); while ((readLine = br.readLine()) != null) { if (readLine.charAt(0) == '-') { continue; } else { sb.append(readLine); sb.append('\r'); } } in.close(); return sb.toString(); } /** * 连接成功后,物联网平台会下发productKey和deviceName信息到/ext/auth/identity/response,用于组装Topic进行消息收发。 * * @param args */ public static void main(String[] args) { X509Test client = new X509Test(); /** 填写设备证书路径信息 */ client.connect(); /** 连接成功之后,休眠两秒,为保证接收云端下发的productKey和deviceName,不然消息收发Topic的productKey和deviceName字段可能为空。*/ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } /** 发送消息,验证连接是否成功。*/ String updateTopic = "/" + productKey + "/" + deviceName + "/user/update"; client.publish(updateTopic, "hello mqtt with X.509 auth"); } }
在以上代码中配置设备证书和证书私钥的路径。
本示例工程X509-demo的resource目录下证书文件为
root.crt
、x509device.cer
、devicex509_pkcs8.key
,填入对应证书文件路径。//根证书 private static String rootPath = "***/resource/root.crt"; // 设备证书 private static String certPath = "***/resource/x509device.cer"; // 设备证书私钥 private static String privateKeyPath = "***/resource/devicex509_pkcs8.key";
步骤五:运行程序
运行X590Test.java主程序。
运行成功日志如下:
Connecting to broker: ssl://x509.itls.cn-shanghai.aliyuncs.com:1883
Connected: clientId=.|securemode=2|,username=,password=
Message published: topic=/a1****m/X509device/user/update,qos=0
您可登录物联网平台控制台,在公共实例下查看设备状态和日志。
选择
,可看到该设备的状态显示为在线。选择
,可查看相关日志。