测试实例规格

产品 规格
AnalyticDB MySQL 3.0 弹性模式集群版,1 worker (8core)
ElasticSearch 6.7.0 通用商业版,1节点(8core)
ECS 2个 ECS,32 vCPU 128 GiB,NVMe SSD本地盘存储:3576 GiB。
说明 ECS与AnalyticDB MySQL、ElasticSearch同地域可用区,带宽充足。

测试方法

  • AnalyticDB MySQL实时写入方法:在ECS上,使用Java程序,读取本地TPC-H的多个分片文件,基于JDBC多线程导入,导入批次2000条一批。其中导入SQL为insert into lineitem values (...)
  • ElasticSearch实时写入方法:在ECS上,使用Python程序,读取本地TPC-H的多个分片文件,基于elasticsearch库多线程导入,导入批次2000条一批。

测试结果

客户端并发线程数 TPS (AnalyticDB MySQL) TPS (ElasticSearch)
8 33033 12211
16 56816 7165
32 95083 6267
64 153857 5890
128 186732 5516

建表语句

AnalyticDB MySQL建表语句如下:

create table `lineitem` (
 `l_orderkey` bigint NOT NULL COMMENT '',
 `l_partkey` int NOT NULL COMMENT '',
 `l_suppkey` int NOT NULL COMMENT '',
 `l_linenumber` int NOT NULL COMMENT '',
 `l_quantity` decimal(15, 2) NOT NULL COMMENT '',
 `l_extendedprice` decimal(15, 2) NOT NULL COMMENT '',
 `l_discount` decimal(15, 2) NOT NULL COMMENT '',
 `l_tax` decimal(15, 2) NOT NULL COMMENT '',
 `l_returnflag` varchar NOT NULL COMMENT '',
 `l_linestatus` varchar NOT NULL COMMENT '',
 `l_shipdate` date NOT NULL COMMENT '',
 `l_commitdate` date NOT NULL COMMENT '',
 `l_receiptdate` date NOT NULL COMMENT '',
 `l_shipinstruct` varchar NOT NULL COMMENT '',
 `l_shipmode` varchar NOT NULL COMMENT '',
 `l_comment` varchar NOT NULL COMMENT ''
) DISTRIBUTE BY HASH(`l_orderkey`) INDEX_ALL='Y'

ElasticSearch建表语句如下:

curl -X PUT 'http://es_ip:9200/tpch' \
-H 'Content-Type: application/json' \
-d '{
    "settings": {
        "number_of_shards": 32,
        "number_of_replicas" : 2
    },
    "mappings": {
         "lineitem": { 
              "properties": {
               "L_ORDERKEY": {
                  "type": "integer"
               },
               "L_PARTKEY": {
                  "type": "integer"
               },
               "L_SUPPKEY": {
                  "type": "integer"
               },
               "L_LINENUMBER": {
                  "type": "integer"
               },
               "L_QUANTITY": {
                  "type": "double"
               },
               "L_EXTENDEDPRICE": {
                  "type": "double"
               },
               "L_DISCOUNT": {
                  "type": "double"
               },
               "L_TAX": {
                  "type": "double"
               },
               "L_RETURNFLAG": {
                  "type": "keyword"
               },
               "L_LINESTATUS": {
                  "type": "keyword"
               },
               "L_SHIPDATE": {
                  "type": "date"
               },
               "L_COMMITDATE": {
                  "type": "date"
               },
               "L_RECEIPTDATE": {
                  "type": "date"
               },
               "L_SHIPINSTRUCT": {
                  "type": "keyword"
               },
               "L_SHIPMODE": {
                  "type": "keyword"
               },
               "L_COMMENT": {
                  "type": "keyword"
               }
            }
          }
     }
}'

数据导入脚本如下:

from threading import Thread
from elasticsearch import Elasticsearch


def func(i):
    es = Elasticsearch(hosts=[
        "es_ip:9200"
    ])
    idx = 0
    with open(r"lineitem.tbl.{}".format(i)) as f:
        actions = []
        while 1:
            r = f.readlines(2000)
            if not r:
                break
            for i in r:
                data = i.split('|')
                body = {
                    'L_ORDERKEY': int(data[0]),
                    'L_PARTKEY': int(data[1]),
                    'L_SUPPKEY': int(data[2]),
                    'L_LINENUMBER': int(data[3]),
                    'L_QUANTITY': float(data[4]),
                    'L_EXTENDEDPRICE': float(data[5]),
                    'L_DISCOUNT': float(data[6]),
                    'L_TAX': float(data[7]),
                    'L_RETURNFLAG': data[8],
                    'L_LINESTATUS': data[9],
                    'L_SHIPDATE': data[10],
                    'L_COMMITDATE': data[11],
                    'L_RECEIPTDATE': data[12],
                    'L_SHIPINSTRUCT': data[13],
                    'L_SHIPMODE': data[14],
                    'L_COMMENT': data[15]
                }
                actions.append({"index": {"_index": "tpch", "_type": "lineitem", "routing": int(data[0])}})
                actions.append(body)
                idx += 1
            es.bulk(actions)
            actions = []
            print(idx)


if __name__ == '__main__':
    for i in range(0, 16):
        Thread(target=func, args=(i + 1,)).start()