数据导入场景
本文以数据导入场景为例测试云原生数据仓库 AnalyticDB MySQL 版的性能,您可以按照本文介绍自行测试对比,快速了解产品性能。
测试产品及规格
产品 | 规格 |
AnalyticDB for MySQL 3.0 公有云版本 | 弹性模式集群版,1 Worker (24-Core)。 |
ElasticSearch 6.7.0 | 通用商业版,1节点(24-Core)。 |
AnalyticDB for MySQL 3.0 物理机部署 | 3台物理机,每台32 vCPU 128 GiB,两块SSD磁盘(3.84 TB和960 GB),12块8 TB的HDD磁盘。 |
测试环境配置
测试环境 | 配置 |
ECS | 2个ECS,32 vCPU 128 GiB,NVMe SSD本地盘存储:3576 GiB。 说明 ECS与AnalyticDB for MySQL、ElasticSearch同地域可用区,带宽充足。 |
测试方法
AnalyticDB for MySQL公有云版实时写入方法:在ECS上,使用Java程序,读取本地TPC-H的多个分片文件,基于JDBC多线程导入,导入批次2000条一批。其中导入SQL为
insert into lineitem values (...)
。AnalyticDB for MySQL物理机版实时写入(写入后1秒内可查)方法:使用Java程序,读取本地TPC-H的多个分片文件,基于JDBC多线程导入,导入批次2000条一批,单行数据长度350 b。其中导入SQL为
insert into lineitem values (...)
。ElasticSearch实时写入方法:在ECS上,使用Python程序,读取本地TPC-H的多个分片文件,基于Elasticsearch库多线程导入,导入批次2000条一批。
本测试中所用数据均来自于TPC-H。更多详情,请参见TPC-H官网。
测试结果
客户端并发线程数 | TPS(AnalyticDB for MySQL公有云版) | TPS(AnalyticDB for MySQL物理机版) | TPS(ElasticSearch) |
8 | 33033 | 120192 | 12211 |
16 | 56816 | 218472 | 7165 |
32 | 95083 | 398087 | 6267 |
64 | 153857 | 643618 | 5890 |
128 | 186732 | 787572 | 5516 |
建表语句
AnalyticDB for MySQL建表语句如下:
CREATE TABLE `lineitem` (
`l_orderkey` bigint NOT NULL COMMENT '',
`l_partkey` int NOT NULL COMMENT '',
`l_suppkey` int NOT NULL COMMENT '',
`l_linenumber` int NOT NULL COMMENT '',
`l_quantity` decimal(15, 2) NOT NULL COMMENT '',
`l_extendedprice` decimal(15, 2) NOT NULL COMMENT '',
`l_discount` decimal(15, 2) NOT NULL COMMENT '',
`l_tax` decimal(15, 2) NOT NULL COMMENT '',
`l_returnflag` varchar NOT NULL COMMENT '',
`l_linestatus` varchar NOT NULL COMMENT '',
`l_shipdate` date NOT NULL COMMENT '',
`l_commitdate` date NOT NULL COMMENT '',
`l_receiptdate` date NOT NULL COMMENT '',
`l_shipinstruct` varchar NOT NULL COMMENT '',
`l_shipmode` varchar NOT NULL COMMENT '',
`l_comment` varchar NOT NULL COMMENT ''
PRIMARY KEY(l_orderkey)
) DISTRIBUTED BY HASH(`l_orderkey`) INDEX_ALL='Y'
ElasticSearch建表语句如下:
curl -X PUT 'http://es_ip:9200/tpch' \
-H 'Content-Type: application/json' \
-d '{
"settings": {
"number_of_shards": 32,
"number_of_replicas" : 2
},
"mappings": {
"lineitem": {
"properties": {
"L_ORDERKEY": {
"type": "integer"
},
"L_PARTKEY": {
"type": "integer"
},
"L_SUPPKEY": {
"type": "integer"
},
"L_LINENUMBER": {
"type": "integer"
},
"L_QUANTITY": {
"type": "double"
},
"L_EXTENDEDPRICE": {
"type": "double"
},
"L_DISCOUNT": {
"type": "double"
},
"L_TAX": {
"type": "double"
},
"L_RETURNFLAG": {
"type": "keyword"
},
"L_LINESTATUS": {
"type": "keyword"
},
"L_SHIPDATE": {
"type": "date"
},
"L_COMMITDATE": {
"type": "date"
},
"L_RECEIPTDATE": {
"type": "date"
},
"L_SHIPINSTRUCT": {
"type": "keyword"
},
"L_SHIPMODE": {
"type": "keyword"
},
"L_COMMENT": {
"type": "keyword"
}
}
}
}
}'
数据导入脚本如下:
from threading import Thread
from elasticsearch import Elasticsearch
def func(i):
es = Elasticsearch(hosts=[
"es_ip:9200"
])
idx = 0
with open(r"lineitem.tbl.{}".format(i)) as f:
actions = []
while 1:
r = f.readlines(2000)
if not r:
break
for i in r:
data = i.split('|')
body = {
'L_ORDERKEY': int(data[0]),
'L_PARTKEY': int(data[1]),
'L_SUPPKEY': int(data[2]),
'L_LINENUMBER': int(data[3]),
'L_QUANTITY': float(data[4]),
'L_EXTENDEDPRICE': float(data[5]),
'L_DISCOUNT': float(data[6]),
'L_TAX': float(data[7]),
'L_RETURNFLAG': data[8],
'L_LINESTATUS': data[9],
'L_SHIPDATE': data[10],
'L_COMMITDATE': data[11],
'L_RECEIPTDATE': data[12],
'L_SHIPINSTRUCT': data[13],
'L_SHIPMODE': data[14],
'L_COMMENT': data[15]
}
actions.append({"index": {"_index": "tpch", "_type": "lineitem", "routing": int(data[0])}})
actions.append(body)
idx += 1
es.bulk(actions)
actions = []
print(idx)
if __name__ == '__main__':
for i in range(0, 16):
Thread(target=func, args=(i + 1,)).start()