更新时间:2018-10-09 07:17
Insert语句标准语法如下:
INSERT [IGNORE]
[INTO] tbl_name
[(col_name,...)]
{VALUES | VALUE}
为了利用分析型数据库的高性能写入能力,建议写入时进行如下改造:
分析型数据库需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。
实现聚合写入目前主要有两种途径:
其一,用户自行实现该聚合方法,对分区号的计算规则为
partition_num = CRC32(hash_partition_column_value) mod m
其中hash_partition_column_value是分区列的值,m是分区总数。
示例代码如下:
import java.util.ArrayList;
import java.util.List;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class BatchInsertExample {
// 一级分区个数
private int partitionCount;
// 分区列id,分区列在建表语句中序号
private int partitionColumnIndex;
// 总列数
private int columnCount;
// batch insert 条数
private int batchSize;
private List<List<Object[]>> partitionGroup;
private String tableName;
private boolean forTest = true;
public BatchInsertExample(String tableName, int partitionCount, int columnCount, int partitionColumnIndex, int batchSize) {
this.columnCount = columnCount;
this.tableName = tableName;
this.partitionColumnIndex = partitionColumnIndex;
this.batchSize = batchSize;
this.partitionCount = partitionCount;
partitionGroup = new ArrayList<List<Object[]>>(partitionCount);
for(int i = 0; i < partitionCount; i++) {
List<Object[]> listi = new ArrayList<Object[]>(batchSize);
partitionGroup.add(listi);
}
}
/**
* Function role, batch insert recordCount records, all records are strings except null.
* @param records 的length 建议为 columnCount X batchSize。
*/
public void insert(Object[][] records, int recordCount) {
for(int i = 0; i < recordCount; i++) {
int partitionNum = getPartitionNum(records[i]);
List<Object[]> batchRecord = partitionGroup.get(partitionNum);
batchRecord.add(records[i]);
if (batchRecord.size() >= batchSize) {
String sql = generateInsertSql(batchRecord);
executeInsert(sql);
batchRecord.clear();
}
}
for(int pn = 0; pn < partitionCount; pn++) {
List<Object[]> batchRecord = partitionGroup.get(pn);
if (batchRecord.size() > 0) {
String sql = generateInsertSql(batchRecord);
executeInsert(sql);
batchRecord.clear();
}
}
}
/**
*
* @param batchRecord
* @return insert sql with multiple values,
* like insert into zs_test values(NULL,NULL,NULL),(7,'7',7),(20,'20',20),(48,'48',48),(57,'57',57),(60,'60',60),(95,'95',95),(97,'97',97),(100,'100',100),(102,'102',102)
*/
private String generateInsertSql(List<Object[]> batchRecord) {
StringBuffer sb = new StringBuffer("insert into ").append(tableName).append(" values");
for(int i = 0; i < batchRecord.size(); i++) {
if (i > 0) {
sb.append(",");
}
sb.append("(");
Object[] objs = batchRecord.get(i);
assert(objs.length == columnCount);
for(int j = 0; j < objs.length; j++) {
if (j > 0) {
sb.append(",");
}
if (objs[j] == null) {
sb.append("NULL");
} else {
sb.append(objs[j].toString());
}
}
sb.append(")");
}
return sb.toString();
}
/**
*
* @param record
* @return calculte partition num basing on partition column value
*/
private int getPartitionNum(Object[] record) {
Object value = record[partitionColumnIndex];
long crc32 = (value == null ? getCRC32("-1") : getCRC32(value.toString()));
return (int) (crc32 % this.partitionCount);
}
private final long getCRC32(String value) {
Checksum checksum = new CRC32();
byte[] bytes = value.getBytes();
checksum.update(bytes, 0, bytes.length);
return checksum.getValue();
}
/**
* user should implement it with mysql protocol.
* @param insertSql
*/
public void executeInsert(String insertSql) {
if (forTest) {
System.out.println(insertSql);
} else {
throw new RuntimeException("user should implement it!!!!!");
}
}
public static void main(String args[]) {
String tableName="zs_test";
int partitionCount = 12;
int batchSize = 10;
int columnCount = 3;
int partitionColunIndex = 0;
BatchInsertExample insert = new BatchInsertExample(tableName, partitionCount, columnCount, partitionColunIndex, batchSize);
// buffersize should always be same with batchsize * partition-count
int bufferSize = partitionCount * batchSize;
int recordCount = 0;
Object objs[][] = new Object[bufferSize][];
int totalRecord = 1000;
for(int i = 0; i < totalRecord; i++) {
objs[recordCount] = i == 0 ? new Object[3]: new Object[]{i, "'" + i + "'", i};
recordCount++;
if (recordCount == bufferSize || (i == totalRecord - 1)) {
insert.insert(objs, recordCount);
recordCount = 0;
}
}
}
}
其二,采用阿里云数据集成产品进行实时数据实时同步。(专有云中“大数据开发套件”的“数据同步”任务即为采用“数据集成”工具实现)
分析型数据库的optimize table是指对实时写入的数据进行索引构建并生成高度优化的文件结构的过程。
分析型数据库为实时写入的数据只建立了简单的索引,在进行optimize table之后则会建立相对复杂但是功能更强、性能更佳的索引;在适当时候进行optimize table是提升查询性能的好方法。
目前有两种方法进行基线合并:
其一,自动optimize table。目前系统每天会自动进行一次optimize table(一般在每晚20:00开始)。
其二,手动进行optimize table。AnalyticDB提供了optimize命令,用户可以手动发送该命令,强制系统立刻进行基线合并。命令格式如下:
optimize table <table_name>
在文档使用中是否遇到以下问题
更多建议
匿名提交