全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
分析型数据库

7.5 实时写入优化

更新时间:2017-11-07 10:46:09

Insert 语句最优写法

Insert语句标准语法如下:

  1. INSERT [IGNORE]
  2. [INTO] tbl_name
  3. [(col_name,...)]
  4. {VALUES | VALUE}

为了利用分析型数据库的高性能写入能力,建议写入时进行如下改造:

  • 采用批量写入(batch insert)模式,即每次在VALUES部分添加多行数据,一般建议每次批量写入数据量大约为16KB,以提高网络和磁盘吞吐。
  • 如果对一行的所有列都进行插入,则去除col_name并保证values顺序与表结构中的col_name顺序一致,以降低网络带宽耗用。
  • 保持主键相对有序。AnalyticDB的insert语句要求必须提供主键,且主键可以为复合主键。当确定复合主键时,根据业务含义调整复合主键中各个列的次序,从业务层面保证插入时主键是严格递增或近似递增的,也可以提升实时写入速度。
  • 增加ignore关键字。执行不带ignore关键字的insert sql,当主键冲突时,后续数据会覆盖之前插入的数据;带上ignore关键字,则主键冲突时,会保留之前插入的数据而自动忽略新数据。如果业务层没有数据覆盖的语义要求,则建议所有insert sql都加上ignore关键字,以减小覆盖数据带来的性能开销。

按hash分区列聚合写入

分析型数据库需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。

实现聚合写入目前主要有两种途径:

其一,用户自行实现该聚合方法,对分区号的计算规则为

partition_num = CRC32(hash_partition_column_value) mod m

其中hash_partition_column_value是分区列的值,m是分区总数。

示例代码如下:

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.zip.CRC32;
  4. import java.util.zip.Checksum;
  5. public class BatchInsertExample {
  6. // 一级分区个数
  7. private int partitionCount;
  8. // 分区列id,分区列在建表语句中序号
  9. private int partitionColumnIndex;
  10. // 总列数
  11. private int columnCount;
  12. // batch insert 条数
  13. private int batchSize;
  14. private List<List<Object[]>> partitionGroup;
  15. private String tableName;
  16. private boolean forTest = true;
  17. public BatchInsertExample(String tableName, int partitionCount, int columnCount, int partitionColumnIndex, int batchSize) {
  18. this.columnCount = columnCount;
  19. this.tableName = tableName;
  20. this.partitionColumnIndex = partitionColumnIndex;
  21. this.batchSize = batchSize;
  22. this.partitionCount = partitionCount;
  23. partitionGroup = new ArrayList<List<Object[]>>(partitionCount);
  24. for(int i = 0; i < partitionCount; i++) {
  25. List<Object[]> listi = new ArrayList<Object[]>(batchSize);
  26. partitionGroup.add(listi);
  27. }
  28. }
  29. /**
  30. * Function role, batch insert recordCount records, all records are strings except null.
  31. * @param records 的length 建议为 columnCount X batchSize。
  32. */
  33. public void insert(Object[][] records, int recordCount) {
  34. for(int i = 0; i < recordCount; i++) {
  35. int partitionNum = getPartitionNum(records[i]);
  36. List<Object[]> batchRecord = partitionGroup.get(partitionNum);
  37. batchRecord.add(records[i]);
  38. if (batchRecord.size() >= batchSize) {
  39. String sql = generateInsertSql(batchRecord);
  40. executeInsert(sql);
  41. batchRecord.clear();
  42. }
  43. }
  44. for(int pn = 0; pn < partitionCount; pn++) {
  45. List<Object[]> batchRecord = partitionGroup.get(pn);
  46. if (batchRecord.size() > 0) {
  47. String sql = generateInsertSql(batchRecord);
  48. executeInsert(sql);
  49. batchRecord.clear();
  50. }
  51. }
  52. }
  53. /**
  54. *
  55. * @param batchRecord
  56. * @return insert sql with multiple values,
  57. * like insert into zs_test values(NULL,NULL,NULL),(7,'7',7),(20,'20',20),(48,'48',48),(57,'57',57),(60,'60',60),(95,'95',95),(97,'97',97),(100,'100',100),(102,'102',102)
  58. */
  59. private String generateInsertSql(List<Object[]> batchRecord) {
  60. StringBuffer sb = new StringBuffer("insert into ").append(tableName).append(" values");
  61. for(int i = 0; i < batchRecord.size(); i++) {
  62. if (i > 0) {
  63. sb.append(",");
  64. }
  65. sb.append("(");
  66. Object[] objs = batchRecord.get(i);
  67. assert(objs.length == columnCount);
  68. for(int j = 0; j < objs.length; j++) {
  69. if (j > 0) {
  70. sb.append(",");
  71. }
  72. if (objs[j] == null) {
  73. sb.append("NULL");
  74. } else {
  75. sb.append(objs[j].toString());
  76. }
  77. }
  78. sb.append(")");
  79. }
  80. return sb.toString();
  81. }
  82. /**
  83. *
  84. * @param record
  85. * @return calculte partition num basing on partition column value
  86. */
  87. private int getPartitionNum(Object[] record) {
  88. Object value = record[partitionColumnIndex];
  89. long crc32 = (value == null ? getCRC32("-1") : getCRC32(value.toString()));
  90. return (int) (crc32 % this.partitionCount);
  91. }
  92. private final long getCRC32(String value) {
  93. Checksum checksum = new CRC32();
  94. byte[] bytes = value.getBytes();
  95. checksum.update(bytes, 0, bytes.length);
  96. return checksum.getValue();
  97. }
  98. /**
  99. * user should implement it with mysql protocol.
  100. * @param insertSql
  101. */
  102. public void executeInsert(String insertSql) {
  103. if (forTest) {
  104. System.out.println(insertSql);
  105. } else {
  106. throw new RuntimeException("user should implement it!!!!!");
  107. }
  108. }
  109. public static void main(String args[]) {
  110. String tableName="zs_test";
  111. int partitionCount = 12;
  112. int batchSize = 10;
  113. int columnCount = 3;
  114. int partitionColunIndex = 0;
  115. BatchInsertExample insert = new BatchInsertExample(tableName, partitionCount, columnCount, partitionColunIndex, batchSize);
  116. // buffersize should always be same with batchsize * partition-count
  117. int bufferSize = partitionCount * batchSize;
  118. int recordCount = 0;
  119. Object objs[][] = new Object[bufferSize][];
  120. int totalRecord = 1000;
  121. for(int i = 0; i < totalRecord; i++) {
  122. objs[recordCount] = i == 0 ? new Object[3]: new Object[]{i, "'" + i + "'", i};
  123. recordCount++;
  124. if (recordCount == bufferSize || (i == totalRecord - 1)) {
  125. insert.insert(objs, recordCount);
  126. recordCount = 0;
  127. }
  128. }
  129. }
  130. }

其二,采用阿里云数据集成产品进行实时数据实时同步。(专有云中“大数据开发套件”的“数据同步”任务即为采用“数据集成”工具实现)

提前进行optimize table

分析型数据库的optimize table是指对实时写入的数据进行索引构建并生成高度优化的文件结构的过程。

分析型数据库为实时写入的数据只建立了简单的索引,在进行optimize table之后则会建立相对复杂但是功能更强、性能更佳的索引;在适当时候进行optimize table是提升查询性能的好方法。

目前有两种方法进行基线合并:

其一,自动optimize table。目前系统每天会自动进行一次optimize table(一般在每晚20:00开始)。

其二,手动进行optimize table。AnalyticDB提供了optimize命令,用户可以手动发送该命令,强制系统立刻进行基线合并。命令格式如下:

  1. optimize table <table_name>
本文导读目录