Spark实时同步

本文为您介绍如何通过Spark读取或写入数据至Hologres的操作方法。

背景信息

Spark是用于大规模数据处理的统一分析引擎,Hologres已经与Spark(社区版以及EMR Spark版)高效打通,快速助力企业搭建数据仓库。Hologres提供的Spark Connector,支持Spark以批处理的方式将数据写入Hologres,同时Spark支持读取多种数据源(例如文件、Hive、MySQL、PostgreSQL等)。

Hologres兼容PostgreSQL,因此Spark也可以用读取PostgreSQL的方式直接读取Hologres数据,进行ETL处理,再写入Hologres及其他数据源,完成大数据开发抽取、处理、加载的完整闭环。

前提条件

  • 实例版本需为V0.9及以上版本。请在Hologres管控台的实例详情页查看当前实例版本,如实例是V0.9以下版本,请您使用自助升级或加入Hologres钉钉交流群反馈,详情请参见如何获取更多的在线支持?

  • 需要安装对应版本的Spark环境,能够运行spark-shell命令。

连接数使用

Hologres Spark Connector在进行读写时,会使用一定的JDBC连接数。可能受到如下因素影响:

  • Spark的并发特性,在作业运行过程中,可以通过Spark UI观察到并行执行的Task数量。

  • Connector在操作时,对于固定复制(fixed copy)方式的写入,每个并发操作都只使用一个JDBC连接。而采用INSERT方式写入时,每个并发则会利用write_thread_size数量的JDBC连接。在进行读取操作时,每个并发同样使用一个JDBC连接。

  • 其他方面可能使用的连接数:在作业启动时,可能会有获取Schema信息的操作,这可能会短暂地建立1个连接。

因此作业使用的总连接数可以通过如下公式计算:

  • fixed copy模式:parallelism*1+1

  • 普通INSERT模式:parallelism*write_thread_size+1

说明

Spark Task并发可能受到用户设置的参数影响,也可能受到Hadoop对文件分块策略的影响。

通过Spark Connector写入(推荐使用)

Hologres当前支持使用内置的Spark Connector将Spark数据写入Hologres,相比其他写入方式,调用基于Holo Client实现Connector写入的方式性能更优。具体操作步骤如下,阿里云也为您提供了相关的使用示例,详情请参见通过Spark Connector写入使用示例

准备工作

  1. 获取JAR包。

    Spark2和Spark3上均已支持Connector写入,Spark写入Hologres时需要引用connector的JAR包,当前已经发布到Maven中央仓库,在项目中参照如下pom文件进行配置。

    说明

    相关Connector也已开源,详情请参见hologres-connectors

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.4.0</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

    当前Hologres已自动生成JAR文件,下载链接如下。

  2. 使用JAR包。

    启动Spark时执行以下命令。

    spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

    或者使用pyspark:

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

通过Spark Connector写入使用示例

根据如下示例步骤为您介绍,如何通过Spark Connector将数据写入Hologres。

  1. 创建Hologres表。

    在Hologres中执行如下SQL命令创建目标表,用来接收数据。

    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Spark准备数据并写入Hologres。

    1. 在命令行运行命令开启Spark。

      spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
    2. spark-shell里使用命令load spark-test.scala执行测试文件,加载测试示例。

      spark-test.scala文件示例如下。

      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), //false表示此Field不允许为null
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      //配置导入数据至Hologres的信息。
      df.write.format("hologres") //必须配置为hologres
        .option("username", "your_username") //阿里云账号的AccessKey ID。
        .option("password", "your_password") //阿里云账号的Accesskey SECRET。
        .option("endpoint", "Ip:Port") //Hologres的Ip和Port。
        .option("database", "test_database") //Hologres的数据库名称,示例为test_database。
        .option("table", "tb008") //Hologres用于接收数据的表名称,示例为tb008。
        .option("write_batch_size", 512) // 写入攒批大小,详见下方参数介绍
        .option("input_data_schema_ddl", df.schema.toDDL) // Dataframe对应的DDL,仅spark3.x需要
        .mode(SaveMode.Append) // spark DataFrameWriter接口的SaveMode, 必须为Append;注意与WRITE_MODE不是同一个参数, 自hologres-connector1.3.3版本开始,支持SaveMode.OverWrite,会清理原始表中的数据,请谨慎使用
        .save()
  3. 查询写入的数据。

    在Hologres侧查询目标表,即可确认写入的数据,示例如下图所示。测试示例数据

使用pyspark加载Connector写入示例

  1. 启动pyspark并加载Connector。

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

  2. 与spark-shell类似,使用元数据创建DataFrame之后调用Connector进行写入。

    data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]
    df = spark.createDataFrame(data, schema="id LONG, name STRING")
    df.show()
    
    df2.write.format("hologres").option(
      "username", "your_username").option(
      "password", "your_password").option(
      "endpoint", "hologres_endpoint").option(
      "database", "test_database").option(
      "table", "tb008").save()
    

使用Spark SQL加载Connector进行写入

说明

仅Spark3版本的Connector支持SQL方式。

  1. 启动Spark SQL并加载Connector。

    spark-sql --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
  2. 通过Spark SQL DDL,分别创建CSV和Hologres View,进行写入。

    CREATE TEMPORARY VIEW csvTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING csv OPTIONS (
      path "resources/customer1.tbl", sep "|"
    );
    
    CREATE TEMPORARY VIEW hologresTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING hologres OPTIONS (
      jdbcurl "jdbc:postgresql://hologres_endpoint/test_database",
      username "your_username", 
      password "your_password", 
      table "customer_holo_table", 
      copy_write_mode "true", 
      bulk_load "true", 
      copy_write_format "text"
    );
    
    -- 目前通过sql创建的hologres view不支持写入部分列(如insert into hologresTable(c_custkey) select c_custkey from csvTable),写入时需要写入DDL中声明的所有字段。如果希望写入部分列,可以建表时仅声明需要写入的字段。
    INSERT INTO hologresTable SELECT * FROM csvTable;

通过Spark读取数据源数据并写入Hologres

  1. Spark从数据源读取数据。

    Spark支持从不同数据源读取数据,具体数据源分类如下。

    • Spark支持以Hologres为数据源。

      Hologres兼容PostgreSQL,因为Spark可以用读取PostgreSQL的方式读取Hologres中的数据。读取代码如下。

      说明

      在使用JDBC方式进行读取前,请前往官网下载Postgresql JDBC Jar,本文以postgresql-42.2.18版本为例,在spark-shell启动时执行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar加载该jar,可以与hologres-connector的jar包一同加载。

      // Read from some table, for example: tb008
      val readDf = spark.read
        .format("jdbc") //使用postgresql jdbc driver读取holo
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()

      Spark Connector从V1.3.2版本开始支持读取Hologres,并提供了优化的并发读取功能。相较于使用PostgreSQL JDBC驱动的方法,还可以设置读取的并发参数,根据Hologres表的Shard进行分片,从而实现并发读取,显著提升了性能。示例如下。

      val spark = SparkSession
      .builder
      .appName("ReadFromHologres")
      .master("local[*]")
      .getOrCreate()
      
      spark.sparkContext.setLogLevel("WARN")
      
      import spark.implicits._
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false),
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType)
      ))
      
      val readDf = spark.read
      .format("hologres")
      .schema(schema) // 可选,如果不指定schema,默认读取holo表全部字段
      .option("username", "your_username")
      .option("password", "your_password")
      .option("jdbcurl", "jdbc:postgresql://hologres_endpoint/test_db")
      .option("table", "tb008")
      .option("scan_parallelism", "10") //读取Hologres时的默认并发数,最大为holo表的shardcount
      .load()

    • Spark支持其他数据源(如Parquet格式的文件)。

      Spark支持从其他数据源中读取数据写入Hologres中,例如使用Spark从Hive中读取数据,代码如下。

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // Read from some table, for example: phone
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. Spark将读到的数据写入Hologres。

    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    -- Write to hologres table
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 写入攒批大小
      .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 仅spark3.x需要
      .mode(SaveMode.Append) // 仅spark3.x需要
      .save()

通过Spark实时写入数据至Hologres

  1. 在Hologres创建一张表,用于接收数据,创建代码如下。

    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. 读取本地端口输入行,进行词频统计并实时写入Hologres中,相关示例代码如下。

    • 代码:

       val spark = SparkSession
            .builder
            .appName("StreamToHologres")
            .master("local[*]")
            .getOrCreate()
      
          spark.sparkContext.setLogLevel("WARN")
          import spark.implicits._
      
          val lines = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load()
      
          -- Split the lines into words
          val words = lines.as[String].flatMap(_.split(" "))
      
          -- Generate running word count
          val wordCounts = words.groupBy("value").count()
      
          wordCounts.writeStream
              .outputMode(OutputMode.Complete())
              .format("hologres")
              .option(SourceProvider.USERNAME, "your_username")
              .option(SourceProvider.PASSWORD, "your_password")
              .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
              .option(SourceProvider.TABLE, "test_table_stream")
              .option("batchsize", 1)
              .option("isolationLevel", "NONE")
              .option("checkpointLocation", checkpointLocation)
              .start()
              .awaitTermination()
    • 参数释义:

      参数名

      默认值

      是否必填

      参数描述

      username

      登录Hologres账号的AccessKey ID。您可以单击AccessKey 管理来获取。

      建议您使用环境变量的方式调用用户名和密码,降低密码泄露风险。

      password

      登录Hologres账号的AccessKey Secret。您可以单击AccessKey 管理来获取。

      建议您使用环境变量的方式调用用户名和密码,降低密码泄露风险。

      table

      Hologres用于接收数据的表名称。

      endpoint

      JDBCURL二选一

      Hologres实例的网络域名。

      您可以进入Hologres管理控制台实例详情页,从网络信息获取主机和端口号。

      database

      JDBCURL二选一

      Hologres接收数据的表所在数据库名称。

      jdbcurl

      ENDPOINT+DATABASE组合设置二选一

      Hologres的JDBCURL

      copy_write_mode

      true

      是否使用Fixed Copy方式写入,Fixed Copy是Hologres V1.3新增的能力,相比INSERT方法,Fixed Copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批)。

      说明

      需要Connector为V1.3.0及以上版本,Hologres引擎版本为V1.3.34及以上版本。

      copy_write_format

      false

      仅Copy模式生效,是否进行脏数据校验,打开之后如果有脏数据,可以定位到写入失败的具体行。

      说明

      RecordChecker会对写入性能造成一定影响,非排查环节不建议开启。

      bulk_load

      true

      是否采用批量Copy方式写入(与fixed copy不同,fixed copy是流式的)。

      说明
      • Hologre V2.1版本对无主键表的写入性能进行了优化。在Hologre V2.1版本中,无主键表的批量写入操作不再会导致表锁,而是采用了行锁机制。这可以使其能够与Fixed Plan并行执行,从而提高了数据处理的效率和并发性。

      • Connector为V1.4.0及以上版本,Hologres引擎需要V2.1.0及以上版本。

      max_cell_buffer_size

      20971520(20 MB)

      使用Copy模式写入时,单个字段的最大长度。

      copy_write_dirty_data_check

      false

      是否进行脏数据校验,打开之后如果有脏数据,可以定位到写入失败的具体行,RecordChecker会对写入性能造成一定影响,非排查环节不建议开启。

      说明

      仅Copy模式生效。

      copy_write_direct_connect

      对于可以直连的环境会默认使用直连。

      仅Copy模式生效,Copy的瓶颈往往是VPC Endpoint的网络吞吐,因此Hologres会测试当前环境能否直连holo fe,如果支持则默认使用直连。此参数设置为false表示不使用直连。

      input_data_schema_ddl

      spark3.x必填,值为<your_DataFrame>.schema.toDDL

      Spark中DataFrame的DDL。

      write_mode

      INSERT_OR_REPLACE

      当INSERT目标表为有主键的表时采用不同策略。

      • INSERT_OR_IGNORE:当主键冲突时,不写入。

      • INSERT_OR_UPDATE:当主键冲突时,更新相应列。

      • INSERT_OR_REPLACE:当主键冲突时,更新所有列。

      write_batch_size

      512

      每个写入线程的最大批次大小,在经过write_mode合并后的Put数量达到write_batch_size时进行一次批量提交。

      write_batch_byte_size

      2 MB

      每个写入线程的最大批次Byte大小,在经过WRITE_MODE合并后的Put数据字节数达到WRITE_BATCH_BYTE_SIZE时进行一次批量提交。

      write_max_interval_ms

      10000 ms

      距离上次提交超过write_max_interval_ms会触发一次批量提交。

      write_fail_strategy

      TYR_ONE_BY_ONE

      当某一批次提交失败时,会将批次内的记录逐条提交(保序),单条提交失败的记录将会跟随异常被抛出。

      write_thread_size

      1

      写入并发线程数(每个并发占用1个数据库连接)。

      在一个Spark作业中,占用的总连接数与Spark并发相关,关系为总连接数= spark.default.parallelism * WRITE_THREAD_SIZE

      dynamic_partition

      false

      若为true,写入分区表父表时,当分区不存在时自动创建分区。

      retry_count

      3

      当连接故障时,写入和查询的重试次数。

      retry_sleep_init_ms

      1000 ms

      每次重试的等待时间=retry_sleep_init_ms+retry_count*retry_sleep_step_ms

      retry_sleep_step_ms

      10*1000 ms

      每次重试的等待时间=retry_sleep_init_ms+retry_count*retry_sleep_step_ms

      connection_max_idle_ms

      60000 ms

      写入线程和点查线程数据库连接的最大IDLE时间,超过此时间的连接将被释放。

      fixed_connection_mode

      false

      非Copy模式(如INSERT默认)下,写入和点查场景不占用连接数。

      说明

      Beta功能,需要Connector版本为V1.2.0及以上版本,Hologres引擎版本为V1.3.0及以上版本。

      scan_batch_size

      256

      在从Hologres读取数据时,Scan操作一次获取的行数。

      scan_timeout_seconds

      60

      在从Hologres读取数据时,Scan操作的超时时间,单位:秒(s)。

      scan_parallelism

      10

      读取Hologres时的分片数量,最大为Hologres表的Shardcount。作业运行时,这些分片会被分配到Spark Task上进行读取。

数据类型映射

Spark与Hologres的数据类型映射如下表所示。

Spark类型

Hologres类型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT、JSONB、JSON

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA、ROARINGBITMAP

ArrayType(IntegerType)

int4[]

ArrayType(LongType)

int8[]

ArrayType(FloatType

float4[]

ArrayType(DoubleType)

float8[]

ArrayType(BooleanType)

boolean[]

ArrayType(StringType)

text[]