Hologres批量写入最佳实践

Hologres是兼容PostgreSQL协议的一站式实时数仓引擎,支持实时离线一体化场景下的海量数据写入、更新与查询。本文针对Hologres的批量写入场景,结合Spark批量写入Hologres的性能测试与结果,为您介绍不同的写入场景下应如何选择合适的写入模式。

针对数据的实时写入与实时更新,性能测试方案与结果请参见数据写入、更新、点查场景压测最佳实践

批量写入模式对比

针对数据批量写入场景,Hologres支持不同的写入模式,包括:传统的COPY模式、基于COPY协议开发的FIXED COPY流式导入模式,以及将批量导入转化成流式的INSERT INTO VALUES模式。

三种写入模式详细对比如下:

对比项

COPY

FIXED COPY

INSERT INTO VALUES

简介

COPY批量导入无主键表

COPY批量导入有主键表

基于COPY协议开发的流式导入模式

基础流式导入模式

典型场景

  • Spark批量导入

  • 开源Flink批量导入

  • Flink导入

  • DataWorks数据集成导入

  • Spark流式导入

  • Flink导入

  • DataWorks数据集成导入

  • Spark流式导入

锁粒度

行锁

表锁

行锁

行锁

数据可见性

COPY结束后可见

COPY结束后可见

实时可见

实时可见

性能

Hologres资源消耗

客户端资源消耗

主键冲突策略支持

不涉及

  • NONE

  • UPDATE:V3.0.4版本起支持整行更新,V3.1.1版本起支持局部更新。

  • IGNORE:V3.0.4版本起支持。

  • NONE(冲突则报错)

  • UPDATE

  • IGNORE

  • NONE(冲突则报错)

  • UPDATE

  • IGNORE

批量写入模式选择

针对数据批量写入场景,三种模式的特点如下:

  • COPY模式:传统的COPY模式,拥有最优的写入性能、最低的Hologres资源消耗。

  • FIXED COPY模式:基于COPY协议开发的流式导入模式。仅产生行锁、数据实时可见。

  • INSERT INTO VALUES模式:将批量导入转化为传统的流式写入模式,在批量写入场景下已不具备明显优势。

    说明

    INSERT模式仅在批量写入场景下不具备明显优势,但仅INSERT模式具备数据回撤(即删除)能力,在Binlog数据写入等场景下仍需使用该模式。

若您对数据实时可见性、锁粒度(产生表锁时,该表无法支持多个任务同时写入),以及数据源端负载没有特殊要求,可以优先选择COPY模式进行批量写入。

  • Spark批量写入为例:推荐将Hologres实例升级至V2.2.25或以上版本,并将Connector写入参数write.mode值设为auto(默认值),系统将会自动选择最优的写入模式。

  • Flink批量写入为例:需要先根据下述决策树确定选用COPY模式或FIXED COPY模式,然后分别配置如下参数。

    • jdbccopywritemode:设为TRUE,即不使用INSERT模式。

    • bulkload:TRUE(COPY模式)或FALSE(FIXED COPY模式),请根据需求进行配置。

针对具体的批量写入场景,可根据如下决策树选择合适的写入模式。

image

批量写入性能测试

测试过程使用Hologres研发的开源组件Hologres-Spark-Connector。

准备工作

基础环境准备

您需准备以下环境:

重要

Hologres实例和EMR-Spark集群需要位于同一地域,且使用相同的VPC。

  • 开通V2.2.25或以上版本的Hologres实例,并创建数据库

  • 创建EMR-Spark集群(Spark版本需大于或等于3.3.0)。详情请参见创建集群

  • 下载Spark-Connector包。

    您可通过Maven中央仓库下载Spark读写Hologres时需要引用的连接器JARhologres-connector-spark-3.x

本文中使用的环境信息如下:

服务

版本

规格

Hologres

V3.0.30

64 Core,256 GB(1CU=1 core 4GB)

EMR-Spark

EMR-5.18.1,Spark-3.5.3

8 Core,32 GB * 8(1master节点,7core节点)

重要

需开通OSS-HDFS服务。

Spark-Connector

1.5.2

不涉及

测试数据准备

  1. 准备原始数据。

    1. 登录EMR-Spark集群的master节点,详情请参见连接ECS实例的方式

    2. 单击TPC-H_Tools_v3.0.0.zip下载TPCH工具,将其复制到集群master节点的ECS机器中进行解压,然后进入TPC-H_Tools_v3.0.0/TPC-H_Tools_v3.0.0/dbgen目录。

    3. 执行如下代码,在dbgen目录下生成1 TB测试集文件customer.tbl,原tbl格式文件大小为23 GB。

      ./dbgen -s 1000 -T c

      TPC-H 1 TB数据集中customer表的详细信息如下:

      表信息

      说明

      字段数据量

      8

      字段类型

      INT、BIGINT、TEXT、DECIMAL

      数据行数

      150,000,000

      Shard

      40

  2. 测试数据导入Spark。

    执行如下命令,将customer.tbl文件上传到Spark集群。

    hadoop fs -put customer.tbl <spark_resource>

    其中spark_resource为创建EMR-Spark集群时,配置的集群存储根路径

  3. Hologres中创建不同存储格式的表。SQL命令如下:

    行列共存

    CREATE TABLE test_table_mixed (
        C_CUSTKEY BIGINT PRIMARY KEY,
        C_NAME TEXT,
        C_ADDRESS TEXT,
        C_NATIONKEY INT,
        C_PHONE TEXT,
        C_ACCTBAL DECIMAL(15, 2),
        C_MKTSEGMENT TEXT,
        C_COMMENT TEXT
    )
    WITH (
        orientation = 'column,row'
    );

    列存(有主键)

    CREATE TABLE test_table_column (
        C_CUSTKEY BIGINT PRIMARY KEY,
        C_NAME TEXT,
        C_ADDRESS TEXT,
        C_NATIONKEY INT,
        C_PHONE TEXT,
        C_ACCTBAL DECIMAL(15, 2),
        C_MKTSEGMENT TEXT,
        C_COMMENT TEXT
    )
    WITH (
        orientation = 'column'
    );

    列存(无主键)

    CREATE TABLE test_table_column_no_pk (
        C_CUSTKEY BIGINT,
        C_NAME TEXT,
        C_ADDRESS TEXT,
        C_NATIONKEY INT,
        C_PHONE TEXT,
        C_ACCTBAL DECIMAL(15, 2),
        C_MKTSEGMENT TEXT,
        C_COMMENT TEXT
    )
    WITH (
        orientation = 'column'
    );

    行存(有主键)

    CREATE TABLE test_table_row (
        C_CUSTKEY BIGINT PRIMARY KEY,
        C_NAME TEXT,
        C_ADDRESS TEXT,
        C_NATIONKEY INT,
        C_PHONE TEXT,
        C_ACCTBAL DECIMAL(15, 2),
        C_MKTSEGMENT TEXT,
        C_COMMENT TEXT
    )
    WITH (
        orientation = 'row'
    );

    行存(无主键)

    CREATE TABLE test_table_row_no_pk (
        C_CUSTKEY BIGINT,
        C_NAME TEXT,
        C_ADDRESS TEXT,
        C_NATIONKEY INT,
        C_PHONE TEXT,
        C_ACCTBAL DECIMAL(15, 2),
        C_MKTSEGMENT TEXT,
        C_COMMENT TEXT
    )
    WITH (
        orientation = 'row'
    );

性能测试

测试配置

本文主要测试不同模式下的导入性能。

  1. 登录EMR-Spark集群的master节点(登录方式请参见连接ECS实例的方式),上传已下载的Spark-Connector包,然后执行如下命令进入spark-sql交互界面。

    说明

    通过调整spark.sql.files.maxPartitionBytes参数值, 可以控制Spark读取HDFS文件的并发。此处并发控制为40。

    # 进入spark-sql交互界面
    
    spark-sql --jars <path>/hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.executor.instances=40 \
    --conf spark.executor.cores=1 \
    --conf spark.executor.memory=4g \
    --conf spark.sql.files.maxPartitionBytes=644245094

    其中pathhologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar所在的根路径。

  2. spark-sql交互界面执行如下SQL,通过创建临时表的方式写入数据。

    由于测试过程中,需要多次调整参数以测试不同模式的写入性能,此处选择创建临时表的方式。

    说明

    实际使用中,您可直接使用Catalog加载Hologres表,更加方便。

    -- 创建csv格式的临时表
    CREATE TEMPORARY VIEW csvtable (
      c_custkey BIGINT,
      c_name STRING,
      c_address STRING,
      c_nationkey INT,
      c_phone STRING,
      c_acctbal DECIMAL(15, 2),
      c_mktsegment STRING,
      c_comment STRING)
    USING csv OPTIONS (
      path "<spark_resources>/customer.tbl", sep "|"
    );
    
    CREATE TEMPORARY VIEW hologresTable (
      c_custkey BIGINT,
      c_name STRING,
      c_address STRING,
      c_nationkey INT,
      c_phone STRING,
      c_acctbal DECIMAL(15, 2),
      c_mktsegment STRING,
      c_comment STRING)
    USING hologres OPTIONS (
      jdbcurl "jdbc:postgresql://<hologres_vpc_endpoint>/<database_name>",
      username "<accesskey_id>", 
      password "<accesskey_secret>", 
      table "<table_name>",
      direct_connect "false",
      write.mode "auto",
      write.insert.thread_size "3",
      write.insert.batch_size "2048"
    );
    
    INSERT INTO hologresTable SELECT * FROM csvTable;

    参数说明如下:

    参数名

    描述

    spark_resources

    创建EMR-Spark集群时,配置的集群存储根路径

    您可登录EMR on ECS控制台,单击目标集群ID,在基础信息页面的集群信息区域获取集群存储根路径。

    hologres_vpc_endpoint

    Hologres实例对应的VPC网络类型的域名

    您可登录Hologres管理控制台,单击目标实例ID,在实例详情页的网络信息区域,获取指定VPC的域名。例如:杭州地域的VPC Endpoint格式为<Instance ID>-cn-hangzhou-vpc-st.hologres.aliyuncs.com:80

    database_name

    Hologres实例的数据库名称。

    accesskey_id

    具有Hologres相应Database读取权限的AccessKey ID。

    accesskey_secret

    具有Hologres相应Database读取权限的AccessKey Secret。

    table_name

    待写入的Hologres目标表名称。

    write.mode

    写入模式,取值如下:

    • auto :默认值,Connector自行选择最佳的模式。

    • insert:使用INSERT INTO VALUES方式写入。

    • stream:使用FIXED COPY流式写入。

    • bulk_load :使用COPY模式批量导入无主键表。

    • bulk_load_on_conflict :使用COPY模式批量导入有主键表。

    write.insert.thread_size

    写入并发,仅使用INSERT写入时生效。

    write.insert.batch_size

    写入攒批,仅使用INSERT写入时生效。

    write.on_conflict_action

    INSERT_OR_REPLACE(默认):主键冲突更新。

    INSERT_OR_IGNORE: 主键冲突忽略。

    更多参数信息请参见参数说明

测试场景

测试场景

可选场景

表存储格式

  • 行存

  • 列存

  • 行列共存

数据更新方式

  • 无主键表Append Only写入

  • 有主键空表首次写入

  • 有主键表更新整行

写入方式

  • insert(INSERT INTO VALUES)

  • stream(FIXED COPY)

  • bulk_load(COPY导入无主键表)

  • bulk_load_on_conflict (COPY导入有主键表)

测试结果

测试结果包括以下字段:

字段

描述

作业总耗时

Spark作业总的运行时间。

即您在EMR-Spark集群节点的spark-sql交互界面,执行INSERT数据写入操作的耗时。获取方法如下图:image

数据写入平均耗时

剔除Spark集群调度以及读取数据或者数据重分布的时间,仅统计写入作业的平均耗时。

您可在Hologres管理控制台的HoloWeb页面,执行如下SQL命令,即可获取Shard并发数(count)和数据写入平均耗时(avg_duration_ms,单位为:毫秒)。

SELECT
    COUNT(*), AVG(duration) AS avg_duration_ms
FROM
    hologres.hg_query_log
WHERE
    query_start >= '<start_time>' AND query_start <= '<end_time>'
    AND query LIKE '%<test_table_column>%' AND command_tag = 'COPY';

参数说明如下:

  • start_time:数据写入的开始时间,例如2025-04-11 15:00:00

  • end_time:数据写入的结束时间,例如2025-04-11 15:10:00

  • hologres_table_name:写入Hologres的目标表名称,例如test_table_column

Hologres负载

Hologres实例的CPU使用率。

您可在Hologres管理控制台的实例监控信息页面获取CPU使用率。

Spark负载

EMR节点负载。

您可在EMR on ECS控制台单击目标集群ID,进入监控诊断 > 指标监控页签,选择如下参数:

  • Dashboard:选择HOST。

  • nodeGroupId:选择目标集群的Core节点ID。

  • hostname:选择各个Core节点。

  • 选择时间:选择Spark中进行数据写入操作的时间段。

然后查询CPU utilization指标,即为Spark负载。

详细测试结果如下:

存储格式

主键

写入模式

作业总耗时

数据写入平均耗时

Hologres负载

Spark负载

列存

无主键

insert

241.61s

232.70s

92%

15%

stream

228.11s

222.34s

100%

36%

bulk_load

88.72s

57.16s

97%

47%

有主键

ignore

insert

190.96s

172.60s

90%

14%

stream

149.60s

142.16s

100%

14%

bulk_load_on_conflict

115.96s

42.92s

60%

75%

有主键

replace

insert

600.40s

574.31s

91%

5%

stream

550.29s

540.32s

100%

5%

bulk_load_on_conflict

188.05s

109.77s

93%

78%

行存

无主键

insert

132.38s

123.79s

94%

22%

stream

114.41s

103.81s

100%

17%

bulk_load

68.20s

41.22s

98%

32%

有主键

ignore

insert

190.48s

170.49s

89%

15%

stream

185.46s

172.48s

85%

14%

bulk_load_on_conflict

117.81s

47.69s

58%

75%

有主键

replace

insert

177.97s

170.78s

93%

15%

stream

142.44s

130.16s

100%

20%

bulk_load_on_conflict

137.69s

65.18s

92%

78%

行列共存

有主键

ignore

insert

172.19s

158.74s

86%

16%

stream

150.63s

149.76s

100%

12%

bulk_load_on_conflict

128.83s

42.09s

59%

79%

有主键

replace

insert

690.37s

662.00s

92%

5%

stream

625.84s

623.08s

100%

4%

bulk_load_on_conflict

202.07s

121.58s

93%

80%

说明

写入模式的对应关系如下:

  • insert:INSERT INTO VALUES

  • stream:FIXED COPY

  • bulk_load:COPY导入无主键表

  • bulk_load_on_conflict :COPY导入有主键表