全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息服务

并发测试示例代码

更新时间:2017-06-07 13:26:11

本文档介绍了基于Java SDK提供的队列消息发送以及消费的并发测试Case。

  1. 1. 用户可指定并发度、运行时间;
  2. 2. 使用发送总请求数除以运行时间得到QPS

1. 初始化

在运行目录创建perf_test_config.properties文本文件,按照如下格式指定参数(其中队列请先行创建好):

  1. Endpoint=
  2. AccessId=
  3. AccessKey=
  4. QueueName=JavaSDKPerfTestQueue
  5. ThreadNum=200
  6. TotalSeconds=180

注:

  1. ThreadNum是发送/消费的线程数,MNS具备强大的高并发扩展性能;
  2. TotalSeconds是测试Case运行的时间;

2. 代码

  1. package com.aliyun.mns;
  2. import com.aliyun.mns.client.CloudAccount;
  3. import com.aliyun.mns.client.CloudQueue;
  4. import com.aliyun.mns.client.MNSClient;
  5. import com.aliyun.mns.common.http.ClientConfiguration;
  6. import com.aliyun.mns.model.Message;
  7. import com.aliyun.mns.model.QueueMeta;
  8. import java.io.BufferedInputStream;
  9. import java.io.FileInputStream;
  10. import java.io.FileNotFoundException;
  11. import java.io.IOException;
  12. import java.util.ArrayList;
  13. import java.util.Properties;
  14. import java.util.concurrent.atomic.AtomicLong;
  15. public class JavaSDKPerfTest {
  16. private static MNSClient client = null;
  17. private static AtomicLong totalCount = new AtomicLong(0);
  18. private static String endpoint = null;
  19. private static String accessId = null;
  20. private static String accessKey = null;
  21. private static String queueName = "JavaSDKPerfTestQueue";
  22. private static int threadNum = 100;
  23. private static int totalSeconds = 180;
  24. protected static boolean parseConf() {
  25. String confFilePath = System.getProperty("user.dir") + System.getProperty("file.separator") + "perf_test_config.properties";
  26. BufferedInputStream bis = null;
  27. try {
  28. bis = new BufferedInputStream(new FileInputStream(confFilePath));
  29. if (bis == null) {
  30. System.out.println("ConfFile not opened: " + confFilePath);
  31. return false;
  32. }
  33. } catch (FileNotFoundException e) {
  34. System.out.println("ConfFile not found: " + confFilePath);
  35. return false;
  36. }
  37. // load file
  38. Properties properties = new Properties();
  39. try {
  40. properties.load(bis);
  41. } catch(IOException e) {
  42. System.out.println("Load ConfFile Failed: " + e.getMessage());
  43. return false;
  44. } finally {
  45. try {
  46. bis.close();
  47. } catch (Exception e) {
  48. // do nothing
  49. }
  50. }
  51. // init the member parameters
  52. endpoint = properties.getProperty("Endpoint");
  53. System.out.println("Endpoint: " + endpoint);
  54. accessId = properties.getProperty("AccessId");
  55. System.out.println("AccessId: " + accessId);
  56. accessKey = properties.getProperty("AccessKey");
  57. queueName = properties.getProperty("QueueName", queueName);
  58. System.out.println("QueueName: " + queueName);
  59. threadNum = Integer.parseInt(properties.getProperty("ThreadNum", String.valueOf(threadNum)));
  60. System.out.println("ThreadNum: " + threadNum);
  61. totalSeconds = Integer.parseInt(properties.getProperty("TotalSeconds", String.valueOf(totalSeconds)));
  62. System.out.println("TotalSeconds: " + totalSeconds);
  63. return true;
  64. }
  65. public static void main(String[] args) {
  66. if (!parseConf()) {
  67. return;
  68. }
  69. ClientConfiguration clientConfiguration = new ClientConfiguration();
  70. clientConfiguration.setMaxConnections(threadNum);
  71. clientConfiguration.setMaxConnectionsPerRoute(threadNum);
  72. CloudAccount cloudAccount = new CloudAccount(accessId, accessKey, endpoint, clientConfiguration);
  73. client = cloudAccount.getMNSClient();
  74. CloudQueue queue = client.getQueueRef(queueName);
  75. queue.delete();
  76. QueueMeta meta = new QueueMeta();
  77. meta.setQueueName(queueName);
  78. client.createQueue(meta);
  79. // 1. Now check the SendMessage
  80. ArrayList<Thread> threads = new ArrayList<Thread>();
  81. for (int i = 0; i < threadNum; ++i){
  82. Thread thread = new Thread(new Runnable() {
  83. public void run() {
  84. try {
  85. CloudQueue queue = client.getQueueRef(queueName);
  86. Message message = new Message();
  87. message.setMessageBody("Test");
  88. long count = 0;
  89. long startTime = System.currentTimeMillis();
  90. System.out.println(startTime);
  91. long endTime = startTime + totalSeconds * 1000;
  92. while (true) {
  93. for (int i = 0; i < 50; ++i) {
  94. queue.putMessage(message);
  95. }
  96. count += 50;
  97. if (System.currentTimeMillis() >= endTime) {
  98. break;
  99. }
  100. }
  101. System.out.println(System.currentTimeMillis());
  102. System.out.println("Thread" + Thread.currentThread().getName() + ": " + String.valueOf(count));
  103. totalCount.addAndGet(count);
  104. } catch (Exception e) {
  105. e.printStackTrace();
  106. }
  107. }
  108. }, String.valueOf(i));
  109. thread.start();
  110. threads.add(thread);
  111. }
  112. for (int i = 0; i < threadNum; ++i) {
  113. try {
  114. threads.get(i).join();
  115. } catch (InterruptedException e) {
  116. e.printStackTrace();
  117. }
  118. }
  119. System.out.println("SendMessage QPS: ");
  120. System.out.println(totalCount.get() / totalSeconds);
  121. // 2. Now is the ReceiveMessage
  122. threads.clear();
  123. totalCount.set(0);
  124. totalSeconds = totalSeconds / 3; // To ensure that messages in queue are enough for receiving
  125. for (int i = 0; i < threadNum; ++i){
  126. Thread thread = new Thread(new Runnable() {
  127. public void run() {
  128. try {
  129. CloudQueue queue = client.getQueueRef(queueName);
  130. long count = 0;
  131. long endTime = System.currentTimeMillis() + totalSeconds * 1000;
  132. while (true) {
  133. for (int i = 0; i < 50; ++i) {
  134. queue.popMessage();
  135. }
  136. count += 50;
  137. if (System.currentTimeMillis() >= endTime) {
  138. break;
  139. }
  140. }
  141. System.out.println("Thread" + Thread.currentThread().getName() + ": " + String.valueOf(count));
  142. totalCount.addAndGet(count);
  143. } catch (Exception e) {
  144. e.printStackTrace();
  145. }
  146. }
  147. }, String.valueOf(i));
  148. thread.start();
  149. threads.add(thread);
  150. }
  151. for (int i = 0; i < threadNum; ++i) {
  152. try {
  153. threads.get(i).join();
  154. } catch (InterruptedException e) {
  155. e.printStackTrace();
  156. }
  157. }
  158. System.out.println("ReceiveMessage QPS: ");
  159. System.out.println(totalCount.get() / totalSeconds);
  160. return;
  161. }
  162. }
本文导读目录