数据导入场景

更新时间:

本文以数据导入场景为例测试云原生数据仓库 AnalyticDB MySQL 版的性能,您可以按照本文介绍自行测试对比,快速了解产品性能。

测试产品及规格

产品

规格

AnalyticDB for MySQL 3.0 公有云版本

弹性模式集群版,1 Worker (24-Core)。

ElasticSearch 6.7.0

通用商业版,1节点(24-Core)。

AnalyticDB for MySQL 3.0 物理机部署

3台物理机,每台32 vCPU 128 GiB,两块SSD磁盘(3.84 TB960 GB),128 TBHDD磁盘。

测试环境配置

测试环境

配置

ECS

2ECS,32 vCPU 128 GiB,NVMe SSD本地盘存储:3576 GiB。

说明

ECSAnalyticDB for MySQL、ElasticSearch同地域可用区,带宽充足。

测试方法

  • AnalyticDB for MySQL公有云版实时写入方法:在ECS上,使用Java程序,读取本地TPC-H的多个分片文件,基于JDBC多线程导入,导入批次2000条一批。其中导入SQLinsert into lineitem values (...)

  • AnalyticDB for MySQL物理机版实时写入(写入后1秒内可查)方法:使用Java程序,读取本地TPC-H的多个分片文件,基于JDBC多线程导入,导入批次2000条一批,单行数据长度350 b。其中导入SQLinsert into lineitem values (...)

  • ElasticSearch实时写入方法:在ECS上,使用Python程序,读取本地TPC-H的多个分片文件,基于Elasticsearch库多线程导入,导入批次2000条一批。

说明

本测试中所用数据均来自于TPC-H。更多详情,请参见TPC-H官网

测试结果

客户端并发线程数

TPS(AnalyticDB for MySQL公有云版)

TPS(AnalyticDB for MySQL物理机版)

TPS(ElasticSearch)

8

33033

120192

12211

16

56816

218472

7165

32

95083

398087

6267

64

153857

643618

5890

128

186732

787572

5516

建表语句

AnalyticDB for MySQL建表语句如下:

CREATE TABLE `lineitem` (
 `l_orderkey` bigint NOT NULL COMMENT '',
 `l_partkey` int NOT NULL COMMENT '',
 `l_suppkey` int NOT NULL COMMENT '',
 `l_linenumber` int NOT NULL COMMENT '',
 `l_quantity` decimal(15, 2) NOT NULL COMMENT '',
 `l_extendedprice` decimal(15, 2) NOT NULL COMMENT '',
 `l_discount` decimal(15, 2) NOT NULL COMMENT '',
 `l_tax` decimal(15, 2) NOT NULL COMMENT '',
 `l_returnflag` varchar NOT NULL COMMENT '',
 `l_linestatus` varchar NOT NULL COMMENT '',
 `l_shipdate` date NOT NULL COMMENT '',
 `l_commitdate` date NOT NULL COMMENT '',
 `l_receiptdate` date NOT NULL COMMENT '',
 `l_shipinstruct` varchar NOT NULL COMMENT '',
 `l_shipmode` varchar NOT NULL COMMENT '',
 `l_comment` varchar NOT NULL COMMENT ''
PRIMARY KEY(l_orderkey)
) DISTRIBUTED BY HASH(`l_orderkey`) INDEX_ALL='Y'

ElasticSearch建表语句如下:

curl -X PUT 'http://es_ip:9200/tpch' \
-H 'Content-Type: application/json' \
-d '{
    "settings": {
        "number_of_shards": 32,
        "number_of_replicas" : 2
    },
    "mappings": {
         "lineitem": { 
              "properties": {
               "L_ORDERKEY": {
                  "type": "integer"
               },
               "L_PARTKEY": {
                  "type": "integer"
               },
               "L_SUPPKEY": {
                  "type": "integer"
               },
               "L_LINENUMBER": {
                  "type": "integer"
               },
               "L_QUANTITY": {
                  "type": "double"
               },
               "L_EXTENDEDPRICE": {
                  "type": "double"
               },
               "L_DISCOUNT": {
                  "type": "double"
               },
               "L_TAX": {
                  "type": "double"
               },
               "L_RETURNFLAG": {
                  "type": "keyword"
               },
               "L_LINESTATUS": {
                  "type": "keyword"
               },
               "L_SHIPDATE": {
                  "type": "date"
               },
               "L_COMMITDATE": {
                  "type": "date"
               },
               "L_RECEIPTDATE": {
                  "type": "date"
               },
               "L_SHIPINSTRUCT": {
                  "type": "keyword"
               },
               "L_SHIPMODE": {
                  "type": "keyword"
               },
               "L_COMMENT": {
                  "type": "keyword"
               }
            }
          }
     }
}'

数据导入脚本如下:

from threading import Thread
from elasticsearch import Elasticsearch


def func(i):
    es = Elasticsearch(hosts=[
        "es_ip:9200"
    ])
    idx = 0
    with open(r"lineitem.tbl.{}".format(i)) as f:
        actions = []
        while 1:
            r = f.readlines(2000)
            if not r:
                break
            for i in r:
                data = i.split('|')
                body = {
                    'L_ORDERKEY': int(data[0]),
                    'L_PARTKEY': int(data[1]),
                    'L_SUPPKEY': int(data[2]),
                    'L_LINENUMBER': int(data[3]),
                    'L_QUANTITY': float(data[4]),
                    'L_EXTENDEDPRICE': float(data[5]),
                    'L_DISCOUNT': float(data[6]),
                    'L_TAX': float(data[7]),
                    'L_RETURNFLAG': data[8],
                    'L_LINESTATUS': data[9],
                    'L_SHIPDATE': data[10],
                    'L_COMMITDATE': data[11],
                    'L_RECEIPTDATE': data[12],
                    'L_SHIPINSTRUCT': data[13],
                    'L_SHIPMODE': data[14],
                    'L_COMMENT': data[15]
                }
                actions.append({"index": {"_index": "tpch", "_type": "lineitem", "routing": int(data[0])}})
                actions.append(body)
                idx += 1
            es.bulk(actions)
            actions = []
            print(idx)


if __name__ == '__main__':
    for i in range(0, 16):
        Thread(target=func, args=(i + 1,)).start()