全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
高性能时间序列数据库 HiTSDB

基准写性能测试

更新时间:2017-10-17 16:28:05

本文介绍如何使用以下提供的程序对 HiTSDB 的数据写入功能进行测试,用到的技术有多线程并发写,批量写,写入流控和错误重试。

测试文件

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>HiTSDBDemo</groupId>
  5. <artifactId>Perf</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>Perf</name>
  9. <url>http://maven.apache.org</url>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>com.alibaba</groupId>
  16. <artifactId>fastjson</artifactId>
  17. <version>1.2.13</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.httpcomponents</groupId>
  21. <artifactId>httpclient</artifactId>
  22. <version>4.3.3</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>junit</groupId>
  26. <artifactId>junit</artifactId>
  27. <version>3.8.1</version>
  28. <scope>test</scope>
  29. </dependency>
  30. </dependencies>
  31. <build>
  32. <plugins>
  33. <plugin>
  34. <artifactId>maven-compiler-plugin</artifactId>
  35. <version>2.3.2</version>
  36. <configuration>
  37. <source>1.6</source>
  38. <target>1.6</target>
  39. <encoding>UTF-8</encoding>
  40. </configuration>
  41. </plugin>
  42. <plugin>
  43. <artifactId>maven-assembly-plugin</artifactId>
  44. <version>2.6</version>
  45. <configuration>
  46. <finalName>PerfDemo</finalName>
  47. <appendAssemblyId>false</appendAssemblyId>
  48. <descriptorRefs>
  49. <descriptorRef>jar-with-dependencies</descriptorRef>
  50. </descriptorRefs>
  51. <archive>
  52. <manifest>
  53. <mainClass>demo.perf.PerfMain</mainClass>
  54. </manifest>
  55. </archive>
  56. </configuration>
  57. <executions>
  58. <execution>
  59. <id>make-assembly-put</id>
  60. <phase>package</phase>
  61. <goals>
  62. <goal>single</goal>
  63. </goals>
  64. </execution>
  65. </executions>
  66. </plugin>
  67. </plugins>
  68. </build>
  69. </project>

src/main/java/demo/perf/PerfMain.java

  1. package demo.perf;
  2. import java.util.ArrayList;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.Timer;
  7. import java.util.TimerTask;
  8. import java.util.concurrent.ArrayBlockingQueue;
  9. import java.util.concurrent.BlockingQueue;
  10. import java.util.concurrent.TimeUnit;
  11. import java.util.concurrent.atomic.AtomicLong;
  12. import org.apache.http.HttpResponse;
  13. import org.apache.http.client.methods.HttpPost;
  14. import org.apache.http.entity.StringEntity;
  15. import org.apache.http.impl.client.CloseableHttpClient;
  16. import org.apache.http.impl.client.HttpClients;
  17. import com.alibaba.fastjson.JSON;
  18. public class PerfMain {
  19. // 模拟数据Tag个数
  20. private static final int TAG;
  21. // 每秒生成的数据点数
  22. private static final int SOURCE_RATE;
  23. // 批量写入点数
  24. private static final int BATCH;
  25. // 并发数
  26. private static final int THREADS;
  27. // 实例地址
  28. private static final String HITSDB_ADDR;
  29. // 实例端口
  30. private static final int HITSDB_PORT;
  31. public static int getEnvInt(String envName, int value) {
  32. String env = System.getenv(envName);
  33. if (env != null && !env.equals("")) {
  34. try {
  35. value = Integer.parseInt(env);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. return value;
  41. }
  42. // 超时时间
  43. private static final int SYNC_TIMEOUT_MS;
  44. // 重试次数
  45. private static final int MAX_TRY;
  46. static {
  47. TAG = getEnvInt("TAG", 6);
  48. SOURCE_RATE = getEnvInt("SOURCE_RATE", 50000);
  49. BATCH = getEnvInt("BATCH", 400);
  50. THREADS = getEnvInt("THREADS", 256);
  51. HITSDB_PORT = getEnvInt("HITSDB_PORT", 8242);
  52. HITSDB_ADDR = System.getenv("HITSDB_ADDR");
  53. MAX_TRY = getEnvInt("MAX_TRY", 4);
  54. SYNC_TIMEOUT_MS = getEnvInt("SYNC_TIMEOUT_MS", 2 * 60 * 1000);
  55. System.out.println("TAG:" + TAG);
  56. System.out.println("SOURCE_RATE:" + SOURCE_RATE);
  57. System.out.println("BATCH:" + BATCH);
  58. System.out.println("THREADS:" + THREADS);
  59. System.out.println("HITSDB_ADDR:" + HITSDB_ADDR);
  60. System.out.println("HITSDB_PORT:" + HITSDB_PORT);
  61. System.out.println("SYNC_TIMEOUT_MS:" + SYNC_TIMEOUT_MS);
  62. System.out.println("MAX_TRY:" + MAX_TRY);
  63. }
  64. // 写入路径
  65. private final String putUrl = "http://" + HITSDB_ADDR + ":" + HITSDB_PORT + "/api/put?sync_timeout="
  66. + SYNC_TIMEOUT_MS;
  67. private final AtomicLong successed = new AtomicLong();
  68. private final DataSource dataSource = new DataSource();
  69. private final ArrayList<Thread> THREAD_POOL = new ArrayList<Thread>();
  70. // 数据点类型
  71. static class DataPoint {
  72. public String metric;
  73. public Long timestamp;
  74. public Double value;
  75. public Map<String, String> tags;
  76. }
  77. class DataSource {
  78. private final BlockingQueue<String> flowQueue = new ArrayBlockingQueue<String>(10000);
  79. private final BlockingQueue<String> dataQueue = new ArrayBlockingQueue<String>(10000);
  80. private final BlockingQueue<String> retryQueue = new ArrayBlockingQueue<String>(10000);
  81. private final Timer timer = new Timer(true);
  82. private final List<Thread> threads = new ArrayList<Thread>();
  83. public void start() {
  84. // 模拟数据发生器
  85. for (int i = 0; i < 4; ++i) {
  86. Thread thread = new Thread(new Runnable() {
  87. @Override
  88. public void run() {
  89. Long loop = 0L;
  90. // 生成数据模型
  91. ArrayList<DataPoint> builder = new ArrayList<DataPoint>();
  92. for (int b = 0; b < BATCH; ++b) {
  93. DataPoint dataPoint = new DataPoint();
  94. dataPoint.timestamp = System.currentTimeMillis();
  95. dataPoint.value = 3.14;
  96. dataPoint.metric = "perf_metric_" + b;
  97. dataPoint.tags = new HashMap<String, String>();
  98. for (int t = 0; t < TAG; ++t) {
  99. dataPoint.tags.put("tagk" + t, "tagv" + t);
  100. }
  101. builder.add(dataPoint);
  102. }
  103. while (true) {
  104. try {
  105. String data = null;
  106. if (retryQueue.size() > 0) {
  107. data = retryQueue.poll(0, TimeUnit.MILLISECONDS);
  108. }
  109. if (data != null) {
  110. dataQueue.put(data);
  111. } else {
  112. // 生成每条时间线上的数据
  113. for (int i = 0; i < BATCH; ++i) {
  114. DataPoint dataPoint = builder.get(i);
  115. dataPoint.timestamp += loop * 1000;
  116. }
  117. dataQueue.put(JSON.toJSONString(builder));
  118. ++loop;
  119. }
  120. } catch (Exception e) {
  121. }
  122. }
  123. }
  124. });
  125. thread.setDaemon(true);
  126. thread.start();
  127. threads.add(thread);
  128. }
  129. try {
  130. Thread.sleep(1000);
  131. } catch (InterruptedException e1) {
  132. }
  133. // 流控
  134. timer.schedule(new TimerTask() {
  135. @Override
  136. public void run() {
  137. for (int i = 0; i < SOURCE_RATE; i += BATCH) {
  138. String elem = dataQueue.peek();
  139. if (elem == null) {
  140. System.out.println("pull null, data gen too low");
  141. break;
  142. } else {
  143. if (flowQueue.offer(elem)) {
  144. try {
  145. dataQueue.take();
  146. } catch (InterruptedException e) {
  147. break;
  148. }
  149. }
  150. }
  151. }
  152. }
  153. }, 0, 1000);
  154. }
  155. public String pull() throws InterruptedException {
  156. return flowQueue.poll(Long.MAX_VALUE, TimeUnit.SECONDS);
  157. }
  158. public void retryPush(String data) throws InterruptedException {
  159. retryQueue.put(data);
  160. if(retryQueue.size() > 1000) {
  161. System.out.println("Retry queued " + retryQueue.size());
  162. }
  163. }
  164. }
  165. public void start() {
  166. dataSource.start();
  167. // 并发写
  168. for (int i = 0; i < THREADS; ++i) {
  169. Thread thread = new Thread(new Runnable() {
  170. @Override
  171. public void run() {
  172. while (true) {
  173. try {
  174. String data = dataSource.pull();
  175. int try_count = 0;
  176. for (; try_count < MAX_TRY; ++try_count) {
  177. CloseableHttpClient httpClient = HttpClients.createDefault();
  178. HttpPost httpPost = new HttpPost(putUrl);
  179. StringEntity eStringEntity = new StringEntity(data, "utf-8");
  180. eStringEntity.setContentType("application/json");
  181. httpPost.setEntity(eStringEntity);
  182. HttpResponse response = httpClient.execute(httpPost);
  183. int statusCode = response.getStatusLine().getStatusCode();
  184. if (statusCode != 200 && statusCode != 204) {
  185. httpClient.close();
  186. Thread.sleep(100);
  187. } else {
  188. successed.addAndGet(BATCH);
  189. httpClient.close();
  190. break;
  191. }
  192. }
  193. if (try_count == MAX_TRY) {
  194. dataSource.retryPush(data);
  195. }
  196. } catch (Exception e) {
  197. System.err.println(e.getMessage());
  198. e.printStackTrace();
  199. }
  200. }
  201. }
  202. });
  203. thread.setDaemon(true);
  204. thread.start();
  205. THREAD_POOL.add(thread);
  206. }
  207. }
  208. public void test() throws InterruptedException {
  209. // 统计TPS的时间间隔
  210. final Long showTimeMs = 10 * 1000L;
  211. final AtomicLong lastSuccess = new AtomicLong();
  212. start();
  213. Timer tps = new Timer(true);
  214. tps.schedule(new TimerTask() {
  215. @Override
  216. public void run() {
  217. Long nowSuccess = successed.get();
  218. System.out.println("total success: " + nowSuccess.longValue() + " tps: "
  219. + (nowSuccess - lastSuccess.get()) * 1000 / showTimeMs);
  220. lastSuccess.set(nowSuccess);
  221. }
  222. }, showTimeMs, showTimeMs);
  223. }
  224. public static void main(String[] args) throws InterruptedException {
  225. PerfMain demo = new PerfMain();
  226. demo.test();
  227. // 测试执行时间
  228. final Long testRunSeconds = (long) getEnvInt("RUN_SECONDS", 5 * 60);
  229. System.out.println("RUN_SECONDS:" + testRunSeconds);
  230. Thread.sleep(testRunSeconds * 1000);
  231. System.exit(0);
  232. }
  233. }

build.sh

  1. #!/bin/sh
  2. mvn -DskipTests=true package
  3. cp target/PerfDemo.jar ./bin

bin/run.sh

  1. #!/bin/sh
  2. SHELL_DIR=$(cd `dirname $0`; pwd)
  3. cd ${SHELL_DIR}
  4. # TAG个数
  5. export TAG=8
  6. # 数据发生速率
  7. export SOURCE_RATE=50000
  8. # 批量写入点数
  9. export BATCH=400
  10. # 并发数
  11. export THREADS=256
  12. # 写入地址,域名或者IP
  13. export HITSDB_ADDR="100.81.155.3"
  14. # 写入端口
  15. export HITSDB_PORT=8242
  16. mem=`cat /proc/meminfo | grep MemTotal | awk '{print $2}'`
  17. max_mem_cost=`expr ${mem} / 1024 \* 80 / 100`
  18. java -jar -Xmx${max_mem_cost}m PerfDemo.jar

构建

依赖 JDK 6 和 Maven 3。

  1. sh build.sh

运行

  1. # 相关参数配置见脚本说明
  2. sh bin/run.sh
本文导读目录