全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
阿里云物联网平台

杭州节点MQTT JavaSDK使用

更新时间:2017-07-27 13:54:48

杭州节点

使用示例

  • 第一步: 首先通过设备认证,获取到对应的配置信息.
  • 第二步: 通过获取到的公钥证书来配置TLS/SSL,与阿里云进行连接.
  • 第三步: 通过MQTT协议与阿里云进行设备通信. 注意需要使用TLS单向认证.

建议使用JDK7以上。

下文使用的demo工程可以下载参考 此demo为maven工程,请安装maven。

以eclipse为例的使用方法:解压后在目录下运行 mvn eclipse:eclipse 再打开eclipse导入工程即可。

第一步

引入MQTT SDK,本文使用MQTT客户端Maven坐标如下

  1. <dependency>
  2. <groupId>org.eclipse.paho</groupId>
  3. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

jar 包下载地址

https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3/1.1.0

第二步

配置TLS/SSL信息. 证书文件可以从这里下载 并保存到您的classpath下,假定文件名为;src/pubkey.crt


  1. private static SSLSocketFactory createSSLSocket() throws Exception {
  2. //加载阿里云的CA根证书文件
  3. InputStream in = SimpleClient.class.getResourceAsStream("/pubkey.crt");
  4. CertificateFactory cf = CertificateFactory.getInstance("X.509");
  5. Certificate ca = null;
  6. try {
  7. ca = cf.generateCertificate(in);
  8. } catch (CertificateException e) {
  9. e.printStackTrace();
  10. } finally {
  11. in.close();
  12. }
  13. String keyStoreType = KeyStore.getDefaultType();
  14. KeyStore keyStore = KeyStore.getInstance(keyStoreType);
  15. keyStore.load(null, null);
  16. keyStore.setCertificateEntry("ca", ca);
  17. String tmfAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
  18. TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
  19. tmf.init(keyStore);
  20. SSLContext context = SSLContext.getInstance("TLSV1.2");
  21. //单向认证,这里设置null
  22. context.init(null, tmf.getTrustManagers(), null);
  23. SSLSocketFactory socketFactory = context.getSocketFactory();
  24. return socketFactory;
  25. }

第三步

与阿里云建立TLS/SSL连接.


注意在配置MQTT连接时, Connect协议当中 clientId 属性值为 productKey:deviceId:type username 属性值为 ToUpperCase(MD5_32(productKey+productSecret+deviceId+deviceSecret))

  1. String productKey = ""; //这个是设备模版productkey
  2. String productSecret = "";//这个是设备模版product secret
  3. final String deviceName = "";//这个是创建的设备名称
  4. String deviceSecret = "";//这个是设备秘钥
  5. String targetServer = "iot-as.aliyuncs.com:80";
  6. //用于测试的topic
  7. final String topic = "/" + productKey + "/" + deviceName + "/update";
  8. String broker = "ssl://" + targetServer;//使用TLS方式
  9. //客户端ID格式: productKey : deviceId:idtype ,如果idtype=1则 代表设备id是用户自定义的设备名称
  10. String clientId = productKey + ":" + deviceName + ":" + "1";
  11. MemoryPersistence persistence = new MemoryPersistence();
  12. SSLSocketFactory socketFactory = createSSLSocket();
  13. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  14. MqttConnectOptions connOpts = new MqttConnectOptions();
  15. connOpts.setMqttVersion(4);// MQTT 3.1.1
  16. connOpts.setSocketFactory(socketFactory);
  17. //设置是否自动重连
  18. connOpts.setAutomaticReconnect(true);
  19. //用户名签名
  20. String sign = productKey + productSecret + deviceName + deviceSecret;
  21. String signUserName = Md5.getInstance().md5_32(sign).toUpperCase();
  22. connOpts.setUserName(signUserName);
  23. connOpts.setKeepAliveInterval(65);
  24. System.out.println(clientId + "进行连接, 目的地: " + broker+","+signUserName);
  25. sampleClient.connect(connOpts);
  26. sampleClient.setCallback(new MqttCallback() {
  27. @Override
  28. public void connectionLost(Throwable cause) {
  29. System.out.println("连接失败,原因:" + cause);
  30. cause.printStackTrace();
  31. }
  32. @Override
  33. public void messageArrived(String topic, MqttMessage message) throws Exception {
  34. System.out.println(System.currentTimeMillis() +"接收到消息,来至Topic [" + topic + "] , 内容是:["
  35. + new String(message.getPayload(), "UTF-8") + "], num ");
  36. }
  37. @Override
  38. public void deliveryComplete(IMqttDeliveryToken token) {
  39. //如果是qos1 返回 token ack id
  40. //如果是qos 0消息 token.resp是没有回复的
  41. }
  42. });

第四步

进行消息Publish.


  1. String content = "Message From Device:" + deviceName;
  2. MqttMessage message = new MqttMessage(content.getBytes());
  3. message.setQos(1);
  4. sampleClient.publish(topic, message);
  5. System.out.println("消息发布成功!");
本文导读目录