Hologres是兼容PostgreSQL协议的一站式实时数仓引擎,支持实时离线一体化场景下的海量数据写入、更新与查询。本文针对Hologres的批量写入场景,结合Spark批量写入Hologres的性能测试与结果,为您介绍不同的写入场景下应如何选择合适的写入模式。
针对数据的实时写入与实时更新,性能测试方案与结果请参见数据写入、更新、点查场景压测最佳实践。
批量写入模式对比
针对数据批量写入场景,Hologres支持不同的写入模式,包括:传统的COPY模式、基于COPY协议开发的FIXED COPY流式导入模式,以及将批量导入转化成流式的INSERT INTO VALUES模式。
三种写入模式详细对比如下:
对比项 | COPY | FIXED COPY | INSERT INTO VALUES | |
简介 | COPY批量导入无主键表 | COPY批量导入有主键表 | 基于COPY协议开发的流式导入模式 | 基础流式导入模式 |
典型场景 |
|
|
| |
锁粒度 | 行锁 | 表锁 | 行锁 | 行锁 |
数据可见性 | COPY结束后可见 | COPY结束后可见 | 实时可见 | 实时可见 |
性能 | 高 | 高 | 中 | 中 |
Hologres资源消耗 | 低 | 低 | 高 | 高 |
客户端资源消耗 | 低 | 高 | 低 | 中 |
主键冲突策略支持 | 不涉及 |
|
|
|
批量写入模式选择
针对数据批量写入场景,三种模式的特点如下:
COPY模式:传统的COPY模式,拥有最优的写入性能、最低的Hologres资源消耗。
FIXED COPY模式:基于COPY协议开发的流式导入模式。仅产生行锁、数据实时可见。
INSERT INTO VALUES模式:将批量导入转化为传统的流式写入模式,在批量写入场景下已不具备明显优势。
说明INSERT模式仅在批量写入场景下不具备明显优势,但仅INSERT模式具备数据回撤(即删除)能力,在Binlog数据写入等场景下仍需使用该模式。
若您对数据实时可见性、锁粒度(产生表锁时,该表无法支持多个任务同时写入),以及数据源端负载没有特殊要求,可以优先选择COPY模式进行批量写入。
以Spark批量写入为例:推荐将Hologres实例升级至V2.2.25或以上版本,并将Connector的写入参数
write.mode
值设为auto
(默认值),系统将会自动选择最优的写入模式。以Flink批量写入为例:需要先根据下述决策树确定选用COPY模式或FIXED COPY模式,然后分别配置如下参数。
jdbccopywritemode:设为TRUE,即不使用INSERT模式。
bulkload:TRUE(COPY模式)或FALSE(FIXED COPY模式),请根据需求进行配置。
针对具体的批量写入场景,可根据如下决策树选择合适的写入模式。
批量写入性能测试
测试过程使用Hologres研发的开源组件Hologres-Spark-Connector。
准备工作
基础环境准备
您需准备以下环境:
Hologres实例和EMR-Spark集群需要位于同一地域,且使用相同的VPC。
开通V2.2.25或以上版本的Hologres实例,并创建数据库。
下载Spark-Connector包。
您可通过Maven中央仓库下载Spark读写Hologres时需要引用的连接器JAR包
hologres-connector-spark-3.x
。
本文中使用的环境信息如下:
服务 | 版本 | 规格 |
Hologres | V3.0.30 | 64 Core,256 GB(1CU=1 core 4GB) |
EMR-Spark | EMR-5.18.1,Spark-3.5.3 | 8 Core,32 GB * 8(1个master节点,7个core节点) 重要 需开通OSS-HDFS服务。 |
Spark-Connector | 1.5.2 | 不涉及 |
测试数据准备
准备原始数据。
登录EMR-Spark集群的master节点,详情请参见连接ECS实例的方式。
单击TPC-H_Tools_v3.0.0.zip下载TPCH工具,将其复制到集群master节点的ECS机器中进行解压,然后进入
TPC-H_Tools_v3.0.0/TPC-H_Tools_v3.0.0/dbgen
目录。执行如下代码,在
dbgen
目录下生成1 TB
测试集文件customer.tbl
,原tbl格式文件大小为23 GB。./dbgen -s 1000 -T c
TPC-H 1 TB数据集中customer表的详细信息如下:
表信息
说明
字段数据量
8
字段类型
INT、BIGINT、TEXT、DECIMAL
数据行数
150,000,000
表Shard数
40
测试数据导入Spark。
执行如下命令,将customer.tbl文件上传到Spark集群。
hadoop fs -put customer.tbl <spark_resource>
其中spark_resource为创建EMR-Spark集群时,配置的集群存储根路径。
在Hologres中创建不同存储格式的表。SQL命令如下:
行列共存
CREATE TABLE test_table_mixed ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column,row' );
列存(有主键)
CREATE TABLE test_table_column ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column' );
列存(无主键)
CREATE TABLE test_table_column_no_pk ( C_CUSTKEY BIGINT, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column' );
行存(有主键)
CREATE TABLE test_table_row ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'row' );
行存(无主键)
CREATE TABLE test_table_row_no_pk ( C_CUSTKEY BIGINT, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'row' );
性能测试
测试配置
本文主要测试不同模式下的导入性能。
登录EMR-Spark集群的master节点(登录方式请参见连接ECS实例的方式),上传已下载的Spark-Connector包,然后执行如下命令进入spark-sql交互界面。
说明通过调整
spark.sql.files.maxPartitionBytes
参数值, 可以控制Spark读取HDFS文件的并发。此处并发控制为40。# 进入spark-sql交互界面 spark-sql --jars <path>/hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.executor.instances=40 \ --conf spark.executor.cores=1 \ --conf spark.executor.memory=4g \ --conf spark.sql.files.maxPartitionBytes=644245094
其中path为hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar所在的根路径。
在spark-sql交互界面执行如下SQL,通过创建临时表的方式写入数据。
由于测试过程中,需要多次调整参数以测试不同模式的写入性能,此处选择创建临时表的方式。
说明实际使用中,您可直接使用Catalog加载Hologres表,更加方便。
-- 创建csv格式的临时表 CREATE TEMPORARY VIEW csvtable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "<spark_resources>/customer.tbl", sep "|" ); CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://<hologres_vpc_endpoint>/<database_name>", username "<accesskey_id>", password "<accesskey_secret>", table "<table_name>", direct_connect "false", write.mode "auto", write.insert.thread_size "3", write.insert.batch_size "2048" ); INSERT INTO hologresTable SELECT * FROM csvTable;
参数说明如下:
参数名
描述
spark_resources
创建EMR-Spark集群时,配置的集群存储根路径。
您可登录EMR on ECS控制台,单击目标集群ID,在基础信息页面的集群信息区域获取集群存储根路径。
hologres_vpc_endpoint
Hologres实例对应的VPC网络类型的域名。
您可登录Hologres管理控制台,单击目标实例ID,在实例详情页的网络信息区域,获取指定VPC的域名。例如:杭州地域的VPC Endpoint格式为
<Instance ID>-cn-hangzhou-vpc-st.hologres.aliyuncs.com:80
。database_name
Hologres实例的数据库名称。
accesskey_id
具有Hologres相应Database读取权限的AccessKey ID。
accesskey_secret
具有Hologres相应Database读取权限的AccessKey Secret。
table_name
待写入的Hologres目标表名称。
write.mode
写入模式,取值如下:
auto
:默认值,Connector自行选择最佳的模式。insert
:使用INSERT INTO VALUES方式写入。stream
:使用FIXED COPY流式写入。bulk_load
:使用COPY模式批量导入无主键表。bulk_load_on_conflict
:使用COPY模式批量导入有主键表。
write.insert.thread_size
写入并发,仅使用INSERT写入时生效。
write.insert.batch_size
写入攒批,仅使用INSERT写入时生效。
write.on_conflict_action
INSERT_OR_REPLACE(默认):主键冲突更新。
INSERT_OR_IGNORE: 主键冲突忽略。
更多参数信息请参见参数说明。
测试场景
测试场景 | 可选场景 |
表存储格式 |
|
数据更新方式 |
|
写入方式 |
|
测试结果
测试结果包括以下字段:
字段 | 描述 |
作业总耗时 | Spark作业总的运行时间。 即您在EMR-Spark集群节点的spark-sql交互界面,执行INSERT数据写入操作的耗时。获取方法如下图: |
数据写入平均耗时 | 剔除Spark集群调度以及读取数据或者数据重分布的时间,仅统计写入作业的平均耗时。 您可在Hologres管理控制台的HoloWeb页面,执行如下SQL命令,即可获取Shard并发数(count)和数据写入平均耗时(avg_duration_ms,单位为:毫秒)。
参数说明如下:
|
Hologres负载 | Hologres实例的CPU使用率。 您可在Hologres管理控制台的实例监控信息页面获取CPU使用率。 |
Spark负载 | EMR节点负载。 您可在EMR on ECS控制台单击目标集群ID,进入监控诊断 > 指标监控页签,选择如下参数:
然后查询CPU utilization指标,即为Spark负载。 |
详细测试结果如下:
存储格式 | 主键 | 写入模式 | 作业总耗时 | 数据写入平均耗时 | Hologres负载 | Spark负载 |
列存 | 无主键 | insert | 241.61s | 232.70s | 92% | 15% |
stream | 228.11s | 222.34s | 100% | 36% | ||
bulk_load | 88.72s | 57.16s | 97% | 47% | ||
有主键 ignore | insert | 190.96s | 172.60s | 90% | 14% | |
stream | 149.60s | 142.16s | 100% | 14% | ||
bulk_load_on_conflict | 115.96s | 42.92s | 60% | 75% | ||
有主键 replace | insert | 600.40s | 574.31s | 91% | 5% | |
stream | 550.29s | 540.32s | 100% | 5% | ||
bulk_load_on_conflict | 188.05s | 109.77s | 93% | 78% | ||
行存 | 无主键 | insert | 132.38s | 123.79s | 94% | 22% |
stream | 114.41s | 103.81s | 100% | 17% | ||
bulk_load | 68.20s | 41.22s | 98% | 32% | ||
有主键 ignore | insert | 190.48s | 170.49s | 89% | 15% | |
stream | 185.46s | 172.48s | 85% | 14% | ||
bulk_load_on_conflict | 117.81s | 47.69s | 58% | 75% | ||
有主键 replace | insert | 177.97s | 170.78s | 93% | 15% | |
stream | 142.44s | 130.16s | 100% | 20% | ||
bulk_load_on_conflict | 137.69s | 65.18s | 92% | 78% | ||
行列共存 | 有主键 ignore | insert | 172.19s | 158.74s | 86% | 16% |
stream | 150.63s | 149.76s | 100% | 12% | ||
bulk_load_on_conflict | 128.83s | 42.09s | 59% | 79% | ||
有主键 replace | insert | 690.37s | 662.00s | 92% | 5% | |
stream | 625.84s | 623.08s | 100% | 4% | ||
bulk_load_on_conflict | 202.07s | 121.58s | 93% | 80% |
写入模式的对应关系如下:
insert:INSERT INTO VALUES
stream:FIXED COPY
bulk_load:COPY导入无主键表
bulk_load_on_conflict :COPY导入有主键表