Spark读写Hologres

更新时间:2025-03-24 03:40:01

Spark是用于大规模数据处理的统一分析引擎,Hologres已与Spark社区版及EMR Spark版实现高效集成,迅速帮助企业构建数据仓库。Hologres提供的Spark Connector支持在Spark集群中创建Hologres Catalog,以外部表的形式实现高性能的数据批量读取和导入,相较于原生JDBC,具有更为卓越的性能。

使用限制

V1.3及以上版本的Hologres实例支持Spark Connector。您可以在Hologres管理控制台实例详情页查看当前实例版本。若您的实例是V1.3以下版本,请使用实例升级或通过搜索(钉钉群号:32314975)加入实时数仓Hologres交流群申请升级实例。

准备工作

  • 您需要安装对应版本的Spark环境,能够运行spark-sql、spark-shellpyspark命令,建议使用Spark 3.3.0及以上版本,以避免依赖问题,并获得更丰富的功能体验。

    • 您可以使用阿里云EMR Spark来快速构建Spark环境对接Hologres实例,详情请参见EMR Spark功能

    • 您也可以独立在所需环境中搭建Spark环境,详情请参见Apache Spark

  • Spark读写Hologres需要引用的连接器JARhologres-connector-spark-3.x。本文使用当前最新版本1.5.2。您可以通过Maven中央仓库进行下载。更多关于Connector的资源都已开源,详情请参见Hologres-Connectors

  • 若您需要使用Java语言开发Spark作业,并在如IntelliJ IDEA工具进行本地调试时,可直接引入下方Maven依赖进行pom.xml文件配置。

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres Catalog

Spark Connector1.5.2版本开始支持Hologres Catalog,您可以通过外部表的方式方便地读写Hologres。

Spark中每个Hologres Catalog对应Hologres中的各个Database,每个Hologres Catalog中的Namespace对应Database中的各个Schema。以下部分将展示如何在Spark中使用Hologres Catalog。

说明

目前Hologres Catalog暂不支持创建表。

本文Hologres实例中对应数据库和表名如下:

test_db --数据库
  public.test_table1 --public模式下表
  public.test_table2
  test_schema.test_table3  -- test_schema模式下表 

Hologres Catalog初始化

Spark集群中启动spark-sql,加载Hologres Connector并指定Catalog参数。

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

Hologres Catalog常用命令

  • 加载Hologres Catalog

    Spark中的Hologres Catalog完全对应一个HologresDatabase,使用过程中无法更改。

    USE hologres_external_test_db;
  • 查询所有Namespace。

    Spark中的Namespace,对应Hologres中的Schema,默认为public,使用过程中可以使用USE指令调整默认的Schema。

    -- 查看Hologres Catalog中的所有Namespace, 即Hologres中所有的Schema。
    SHOW NAMESPACES;
  • 查询Namespace下的表。

    • 查询所有表。

      SHOW TABLES;
    • 查询指定Namespace下的表。

      USE test_schema;
      SHOW TABLES;
      
      -- 或者使用 
      SHOW TABLES IN test_schema;
  • 读取和写入表。

    使用SELECTINSERT语句对Catalog中的Hologres外部表进行读写。

    -- 读取表。
    SELECT * FROM public.test_table1;
    
    -- 写入表。
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

导入Hologres

本节测试Hologres数据来源TPC-H数据集中的customer表。Spark可以通过CSV格式读取Hologres表数据。您可以直接下载customer数据。customer表结构创建SQL如下。

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

使用Spark-SQL导入

在使用Spark-SQL时,借助Catalog加载Hologres表的元数据更加便捷,同时也可以通过创建临时表的方式来声明一个Hologres表。

说明
  • Hologres-Connector-Spark 1.5.2以下版本,不支持Catalog,只能通过创建临时表的方式声明Hologres表。

  • Hologres-Connector-Spark更多参数信息,详情请参见参数说明

  1. 初始化Hologres Catalog。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. CSV源表导入数据至Hologres外表。

    说明

    SparkINSERT INTO语法不支持通过column_list指定部分列进行写入,例如使用INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable来表示只写入c_custkey这一个字段。

    若您希望写入部分所需字段,可以使用CREATE TEMPORARY VIEW 的方式声明仅所需字段的Hologres临时表。

    CATALOG写入
    TEMPORARY VIEW写入
    -- 加载Hologres Catalog。
    USE hologres_external_test_db;
    
    -- 创建csv数据源
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- 本地测试,直接用文件所在绝对路径。
    );
    
    -- 将csv表的数据写入到Hologres中
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;
    -- 创建csv数据源
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- 创建hologres临时表
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

使用DataFrame导入

使用spark-shell、pyspark等开发Spark作业,您也可以调用DataFramewrite接口来进行写入。不同的开发语言会将读取到CSV文件数据转为DataFrame后,再写入Hologres实例,相关示例代码如下。Hologres-Connector-Spark更多参数信息,详情请参见参数说明

Scala
Java
Python
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// csv源的schema
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// 从csv文件读取数据为DataFrame
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// 将读取到的DataFrame写入到Hologres中
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // csv源的schema
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // 使用本地模式运行
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // 从csv文件读取数据为DataFrame
        // 本地测试采用customer数据的绝对路径
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // 将读取到的DataFrame写入到Hologres中
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}

Maven文件所需的配置如下。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>
from pyspark.sql.types import *

# csv源的schema
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# 从csv文件读取数据为DataFrame
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# 将读取到的DataFrame写入到Hologres中
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

Spark执行不同语言的作业,具体操作如下:

  • Scala

    • 您可以用示例代码生成sparktest.scala文件,通过如下方式执行作业。

      -- 加载依赖 
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- 本地测试使用绝对路径加载文件
      scala> :load D:/sparktest.scala
    • 您也可以在加载依赖完成后,直接将示例代码粘贴进去执行。

  • Java

    您可以使用开发工具引入示例代码,通过Maven工具完成打包。例如包名spark_test.jar。通过下方代码执行作业。

    -- 作业jar包采用绝对路径
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    您可以在下方代码执行完成后,直接将示例代码粘贴进去执行。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

读取Hologres

  • spark-connector1.3.2版本起,支持读取Hologres。相比Spark默认的jdbc-connectorspark-connector可以按照Hologres表的shard进行并发读取,性能更好。读取的并发与表的shard数有关,spark-connector可以通过参数read.max_task_count进行限制,最终作业会生成Min(shardCount, max_task_count)个读取Task。而且也支持Schema推断,不传入Schema时,会根据Hologres表的Schema推断出Spark侧的Schema。

  • spark-connector1.5.0版本起,读取Hologres表支持了谓词下推,LIMIT下推以及字段裁剪。同时,也支持传入HologresSELECT QUERY来读取数据。此版本开始支持了批量模式读取,相比之前版本,读取性能提升3-4倍。

使用Spark-SQL读取

使用Spark-SQL时,通过Catalog加载Hologres表的元数据更加方便,您也可以通过创建临时表的方式声明一个Hologres表。

说明
  • Hologres-Connector-Spark 1.5.2以下版本,不支持Catalog,只能通过创建临时表的方式声明Hologres表。

  • Hologres-Connector-Spark更多参数信息,详情请参见参数说明

  1. 初始化Hologres Catalog。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. 读取Hologres数据。

    • 通过Catalog读取。

      -- 加载Hologres Catalog。
      USE hologres_external_test_db;
      
      -- 读取Hologres表,支持字段裁剪和谓词下推。
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • 通过创建临时表读取。

      CREATE TEMPORARY VIEW(table)
      CREATE TEMPORARY VIEW(query)
      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", // Hologres表最多分为多少个task进行读取
        table "customer_holo_table"
      );
      
      -- 支持字段裁剪和谓词下推
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;
      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

读取Hologres数据为DataFrame

使用spark-shell、pyspark等开发Spark作业,可以调用SparkRead接口将数据读取为DataFrame以进行后续的计算。不同语言读取Hologres表为DataFrame的示例如下。Hologres-Connector-Spark更多参数信息,详情请参见参数说明

Scala
Java
Python
val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // Hologres表最多分为多少个task进行读取
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // 使用本地模式运行
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // Hologres表最多分为多少个task进行读取
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

Maven文件所需的配置如下。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Spark执行不同语言的作业,具体操作如下:

  • Scala

    • 您可以用示例代码生成sparkselect.scala文件,通过如下方式执行作业。

      -- 加载依赖 
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- 本地测试使用绝对路径加载文件
      scala> :load D:/sparkselect.scala
    • 您也可以在加载依赖完成后,直接将示例代码粘贴进去执行。

  • Java

    您可以使用开发工具引入示例代码,通过Maven工具完成打包。例如包名spark_select.jar。通过下方代码执行作业。

    -- 作业jar包采用绝对路径
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    您可以在下方代码执行完成后,直接将示例代码粘贴进去执行。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

参数说明

通用参数

参数

默认值

是否必填

说明

参数

默认值

是否必填

说明

username

password

  • 当前账号的AccessKey Secret。获取方式请参见创建AccessKey

  • 创建的账户名所对应的密码。

table

Hologres读写数据的表名称。

说明

读取数据时也可以选择使用read.query参数来替换。

jdbcurl

Hologres实时数据APIjdbcUrl,格式为jdbc:postgresql://<host>:<port>/<db_name>。您可以进入Hologres管理控制台,单击左侧导航栏实例列表,选择目标实例,在实例详情页下网络信息中获取主机和端口号。

enable_serverless_computing

false

是否使用Serverless资源。仅对读取和bulk_load模式写入有效,详情请参见Serverless Computing使用指南

serverless_computing_query_priority

3

Serverless Computing执行优先级。

statement_timeout_seconds

28800(8 小时)

单位为s,表示QUERY执行的超时时间。

retry_count

3

当连接故障时的重试次数。

direct_connect

对于可以直连的环境会默认使用直连。

批量数据读写的瓶颈通常是Endpoint的网络吞吐,因此我们会测试当前环境能否直连Hologres Frontend(接入节点),支持的情况下默认使用直连。此参数设置为false则不进行直连。

写入参数

Hologres Connector支持SparkSaveMode参数。对SQL来说,即INSERT INTO或者 INSERT OVERWRITE。对DataFrame来说,即WRITE时设置SaveModeAppend或者Overwrite。其中Overwrite会创建临时表进行写入,并在写入成功之后替换原始表,请在必要时使用。

参数名

参数曾用名

默认值

是否必填

说明

参数名

参数曾用名

默认值

是否必填

说明

write.mode

copy_write_mode

auto

写入的模式,取值如下:

  • auto(默认值)。Connector会根据版本和目标表的元信息自动选择最佳的模式,选择逻辑如下:

    1. Hologres实例版本大于V2.2.25,表有主键,选择bulk_load_on_conflict模式。

    2. Hologres实例版本大于V2.1.0,表无主键,选择bulk_load模式。

    3. Hologres实例版本大于V1.3,选择stream模式。

    4. 其他情况,选择insert模式。

  • stream,即Fixed Plan加速SQL执行。在Fixed Plan中,COPYHologres V1.3新增的功能。与INSERT方法相比,COPY方式具有更高的吞吐量(因其采用流模式)、更低的数据延迟以及更低的客户端内存消耗(因其不攒批)。

    说明

    需要Hologres Connector 1.3.0及以上版本,Hologres v1.3.34及以上版本。

  • bulk_load,即批量COPY。批量COPY相比流式的Fixed PlanCOPY,在RPS条件下,Hologres实例的负载更低,但仅支持写入无主键表。

    说明

    需要Hologres Connector 1.4.2及以上版本,Hologres v2.1.0及以上版本。

  • bulk_load_on_conflict,批量COPY写入有主键表时,支持处理主键重复的情况。Hologres主键表的批量数据导入默认会触发表锁,限制了多个连接同时进行并发写入的能力。当前Connector支持根据目标Hologres表的Distribution Key对数据进行重分布,使每个SparkTask只负责写一个Shard的数据,将原本的表锁降低至Shard级别,实现并发写入,提升写入性能。由于每个连接只需要维护很少Shard的数据,此优化也可以显著降低小文件的数量,降低Hologres的内存使用。测试表明,对数据进行 Repartition之后再并发写入,相比Stream模式写入,可以减少约67%的系统负载。

    说明

    需要Hologres Connector 1.4.2及以上版本,Hologres v2.2.25及以上版本。

  • insert,使用INSERT方式写入。

write.copy.max_buffer_size

max_cell_buffer_size

52428800(50MB)

使用COPY模式写入时,本地buffer的最大长度通常无需调整,但在写入字段较大(如超长字符串)导致buffer溢出时,可以调大。

write.copy.dirty_data_check

copy_write_dirty_data_check

false

是否进行脏数据校验。如开启此功能,一旦出现脏数据,能够精确定位到写入失败的具体行。然而,这会对写入性能产生一定影响,因此在非排查环节,建议不予开启。

write.on_conflict_action

write_mode

NSERT_OR_REPLACE

INSERT目标表为有主键的表时,采用不同策略:

  • INSERT_OR_IGNORE当主键冲突时,不写入。

  • INSERT_OR_UPDATE当主键冲突时,更新相应列。

  • INSERT_OR_REPLACE当主键冲突时,更新所有列。

以下参数仅write.modeinsert时生效。

参数名

参数曾用名

默认值

是否必填

说明

参数名

参数曾用名

默认值

是否必填

说明

write.insert.dynamic_partition

dynamic_partition

false

copy_write_modeinsert时生效。true表示写入分区表父表时,自动创建不存在的分区。

write.insert.batch_size

write_batch_size

512

每个写入线程的最大批次大小。在经过WriteMode合并后的Put数量达到WriteBatchSize时进行一次批量提交。

write.insert.batch_byte_size

write_batch_byte_size

2097152(2 * 1024 * 1024)

每个写入线程的最大批次大小,单位为Byte,默认2MB。在经过WriteMode合并后的Put数据字节数达到WriteBatchByteSize时进行一次批量提交

write.insert.max_interval_ms

write_max_interval_ms

10000

距离上次提交超过WriteMaxIntervalMs的时间会触发一次批量提交。

write.insert.thread_size

write_thread_size

1

写入并发线程数(每个并发占用1个数据库连接)。

读取参数

参数名

参数曾用名

(1.5.0及之前版本)

默认值

是否必填

说明

参数名

参数曾用名

(1.5.0及之前版本)

默认值

是否必填

说明

read.mode

bulk_read

auto

读取的模式,取值如下:

  • auto(默认值)。Hologres Connector会根据版本和目标表的元信息自动选择最佳的模式,选择逻辑如下:

    1. 如果读取的字段中包含JSONB类型,选择select模式。

    2. 如果实例版本高于3.0.24,选择bulk_read_compressed模式。

    3. 其他情况,选择bulk_read模式。

  • bulk_read,使用COPY OUT的方式以arrow格式读取数据,性能是select模式的数倍以上。暂不支持读取Hologres中的JSONB类型。

  • bulk_read_compressed,使用COPY OUT的方式读取压缩过的arrow格式数据,相比压缩前可以节省约45%的带宽。

  • select,使用普通的SELECT方式读取。

read.max_task_count

max_partition_count

80

将读取的Hologres表分为多个,每个分区对应一个Spark Task。如果Hologres表的ShardCount小于此参数,分区数量最多为ShardCount。

read.copy.max_buffer_size

/

52428800(50MB)

使用COPY模式读取时,本地Buffer的最大长度,在字段较大时出现异常应调大长度。

read.push_down_predicate

push_down_predicate

true

是否进行谓词下推,例如在查询时应用的一些过滤条件。目前支持常见Filter过滤条件的下推,以及列裁剪。

read.push_down_limit

push_down_limit

true

是否进行Limit下推。

read.select.batch_size

scan_batch_size

256

read_mode设置为select时生效。读取Hologres时,Scan操作一次Fetch的行数.

read.select.timeout_seconds

scan_timeout_seconds

60

read_mode设置为select时生效。读取Hologres时,Scan操作的超时时间。

read.query

query

使用传入的query去读取Hologres,此参数与table参数二者只能设置一个。

说明
  • 使用query方式读取时,只能单Task读取。且不支持谓词下推。

  • 使用table方式读取时,会根据Hologres表的ShardCount分为多个Task并发读取。

数据类型映射

Spark类型

Hologres类型

Spark类型

Hologres类型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

连接数计算

Hologres-Connector-Spark在进行读写时,会使用一定的JDBC连接数。可能受到如下因素影响:

  • Spark的并发,在作业运行时在Spark UI处可以看到的同时运行的Task数量。

  • Connector每个并发使用的连接数:

    • COPY方式写入,每个并发仅使用一个JDBC连接。

    • INSERT方式写入,每个并发会使用write_thread_sizeJDBC连接。

    • 读取时,每个并发使用一个JDBC连接。

  • 其他方面可能使用的连接数:作业启动时,将执行Schema获取等操作,可能会短暂建立一个连接。

因此作业使用的总的连接数可以通过如下公式计算:

工作项

使用连接数

工作项

使用连接数

Catalog查询元数据

1

读取数据

parallelism * 1 + 1

写入COPY模式

parallelism * 1 + 1

写入INSERT模式

parallelism * write_thread_size + 1

以上连接数计算假设Spark可同时运行的Task数大于任务生成Task数。

Spark同时可以运行的Task并发可能受到用户设置的参数影响,如spark.executor.instances,也可能受到Hadoop对文件分块策略的影响,详情请参见Apache Hadoop

  • 本页导读 (1)
  • 使用限制
  • 准备工作
  • Hologres Catalog
  • Hologres Catalog初始化
  • Hologres Catalog常用命令
  • 导入Hologres
  • 使用Spark-SQL导入
  • 使用DataFrame导入
  • 读取Hologres
  • 使用Spark-SQL读取
  • 读取Hologres数据为DataFrame
  • 参数说明
  • 通用参数
  • 写入参数
  • 读取参数
  • 数据类型映射
  • 连接数计算