通过Spark导入数据

云数据库 SelectDB 版兼容Apache Doris,支持通过Spark Doris Connector,利用Spark的分布式计算能力导入大批量数据。本文介绍使用Spark Doris Connector同步数据至云数据库 SelectDB 版的基本原理和使用方式。

功能介绍

Spark Doris Connector是云数据库 SelectDB 版导入大批量数据的方式之一。基于Spark的分布式计算能力,您可以将上游数据源(MySQL、PostgreSQL、HDFS、S3等)中的大量数据读取到DataFrame中,再通过Spark Doris Connector导入到云数据库 SelectDB 版表中。同时,您也可以使用Spark的JDBC方式来读取云数据库 SelectDB 版表中的数据。

工作原理

云数据库 SelectDB 版通过Spark Doris Connector导入数据的工作原理如下图所示。在这种架构下,Spark Doris Connector通常作为外部数据写入到云数据库 SelectDB 版的桥梁,利用其分布式计算集群对数据做预处理,加速了整个数据链路的数据流动,从而替代了传统的低性能JDBC连接写入方式。

image

前提条件

若使用Spark Doris Connector进行数据导入,必须确保使用的Connector包版本为 1.3.1 及之上。

引入Spark Doris Connector依赖

可以选择如下任一的方式获取Doris Connector依赖。

  • 采用Maven时,引入依赖的方式如下所示。更多依赖版本请参见Maven仓库

    <dependency>
        <groupId>org.apache.doris</groupId>
        <artifactId>spark-doris-connector-3.2_2.12</artifactId>
        <version>1.3.2</version>
    </dependency>
  • 通过JAR包的方式引入Connector。

    以下为您提供了三个常用的Connector,建议根据Spark版本选择对应的Connector包。更多依赖版本请参见Maven仓库

    说明
    • 下述JAR包使用Java 8进行编译,如果您有其他版本的需求,请联系云数据库 SelectDB 版技术支持。

    • 下述列表中Connector列从左到右版本依次含义为该jar包支持的Spark版本、使用的Scala版本以及Connector版本。

    Connector

    Runtime JAR

    2.4-2.12-1.3.2

    spark-doris-connector-2.4_2.12-1.3.2

    3.1-2.12-1.3.2

    spark-doris-connector-3.1_2.12-1.3.2

    3.2-2.12-1.3.2

    spark-doris-connector-3.2_2.12-1.3.2

    获取到Jar包后,可通过如下方式使用:

    • Local方式运行Spark,将下载的JAR包放置于Spark安装目录的jars目录下。

    • Yarn集群模式运行Spark,将JAR文件放入预部署包中。示例如下:

      1. 将spark-doris-connector-3.2_2.12-1.3.2.jar上传到HDFS。

        hdfs dfs -mkdir /spark-jars/ 
        hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/
      2. 在集群中添加spark-doris-connector-3.2_2.12-1.3.2.jar依赖。

        spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar

使用方式

在Spark Client运行Spark后,或者引入Connector包进入Spark开发环境后,您可以通过Spark SQL方式或者Dataframe方式进行数据的同步操作。以下为如何将上游的Spark数据同步到云数据库 SelectDB 版的示例。

Spark SQL方式

val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
  
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;

参数配置项说明

参数

默认值

是否必填

描述

fenodes

云数据库 SelectDB 版实例的HTTP协议访问地址。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

云数据库 SelectDB 版实例的表名,格式为:库名.表名。例如:test_db.test_table

request.retries

3

向SelectDB发送请求的重试次数

request.connect.timeout.ms

30000

向SelectDB发送请求的连接超时时间

request.read.timeout.ms

30000

向SelectDB发送请求的读取超时时间

request.query.timeout.s

3600

查询SelectDB的超时时间,默认值为1小时,-1表示无超时限制

request.tablet.size

Integer.MAX_VALUE

一个RDD Partition对应的SelectDB Tablet个数。

此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对SelectDB造成更大的压力。

read.field

读取SelectDB表的列名列表,多列之间使用逗号分隔

batch.size

1024

一次从BE读取数据的最大行数。增大此数值可减少Spark与SelectDB之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。

exec.mem.limit

2147483648

单个查询的内存限制。默认为 2GB,单位为字节。

deserialize.arrow.async

false

是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch。

deserialize.queue.size

64

异步转换Arrow格式的内部处理队列,当deserialize.arrow.async为true时生效。

write.fields

指定写入SelectDB表的字段或者字段顺序,多列之间使用逗号分隔。默认写入时要按照SelectDB表字段顺序写入全部字段。

sink.batch.size

100000

单次写BE的最大行数。

sink.max-retries

0

写BE失败之后的重试次数。

sink.properties.format

csv

Stream Load的数据格式。共支持3种格式:csv,json,arrow。

sink.properties.*

--

Stream Load的导入参数。例如:指定列分隔符:'sink.properties.column_separator' = ','。参数更多信息,请参见Stream Load

sink.task.partition.size

SelectDB写入任务对应的Partition个数。Spark RDD经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个Partition对应的记录数比较少,导致写入频率增加和计算资源浪费。

此数值设置越小,可以降低SelectDB写入频率,减少SelectDB合并压力。该参数配合 sink.task.use.repartition使用。

sink.task.use.repartition

false

是否采用repartition方式控制SelectDB写入Partition数。默认值为 false,采用coalesce方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。

如果设置为true,则采用repartition方式(注意:可设置最后Partition数,但会额外增加shuffle开销)。

sink.batch.interval.ms

50

每个批次sink的间隔时间,单位 ms。

sink.enable-2pc

false

是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。

sink.auto-redirect

true

是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。

user

访问云数据库 SelectDB 版实例的用户名。

password

访问云数据库 SelectDB 版实例的密码。

filter.query.in.max.count

100

谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。

ignore-type

指在定临时视图中,读取 schema 时要忽略的字段类型。

例如:'ignore-type'='bitmap,hll'

DataFrame方式

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "待付款"),
  ("2", 200, null),
  ("3", 300, "已收货")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

参数配置项说明

参数

默认值

是否必填

描述

fenodes

云数据库 SelectDB 版实例的HTTP协议访问地址。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

云数据库 SelectDB 版实例的表名,格式为:库名.表名。例如:test_db.test_table

request.retries

3

向SelectDB发送请求的重试次数

request.connect.timeout.ms

30000

向SelectDB发送请求的连接超时时间

request.read.timeout.ms

30000

向SelectDB发送请求的读取超时时间

request.query.timeout.s

3600

查询SelectDB的超时时间,默认值为1小时,-1表示无超时限制

request.tablet.size

Integer.MAX_VALUE

一个RDD Partition对应的SelectDB Tablet个数。

此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对SelectDB造成更大的压力。

read.field

读取SelectDB表的列名列表,多列之间使用逗号分隔

batch.size

1024

一次从BE读取数据的最大行数。增大此数值可减少Spark与SelectDB之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。

exec.mem.limit

2147483648

单个查询的内存限制。默认为 2GB,单位为字节。

deserialize.arrow.async

false

是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch。

deserialize.queue.size

64

异步转换Arrow格式的内部处理队列,当deserialize.arrow.async为true时生效。

write.fields

指定写入SelectDB表的字段或者字段顺序,多列之间使用逗号分隔。默认写入时要按照SelectDB表字段顺序写入全部字段。

sink.batch.size

100000

单次写BE的最大行数。

sink.max-retries

0

写BE失败之后的重试次数。

sink.properties.format

csv

Stream Load的数据格式。共支持3种格式:csv,json,arrow。

sink.properties.*

--

Stream Load的导入参数。例如:指定列分隔符:'sink.properties.column_separator' = ','。更多参数请参考:Stream Load

sink.task.partition.size

SelectDB写入任务对应的Partition个数。Spark RDD经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个Partition对应的记录数比较少,导致写入频率增加和计算资源浪费。

此数值设置越小,可以降低SelectDB写入频率,减少SelectDB合并压力。该参数配合 sink.task.use.repartition使用。

sink.task.use.repartition

false

是否采用repartition方式控制SelectDB写入Partition数。默认值为 false,采用coalesce方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。

如果设置为true,则采用repartition方式(注意:设置最后Partition数,但会额外增加shuffle开销)。

sink.batch.interval.ms

50

每个批次sink的间隔时间,单位 ms。

sink.enable-2pc

false

是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。

sink.auto-redirect

true

是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入, 不再显式获取 BE 信息。

user

访问云数据库 SelectDB 版实例的用户名。

password

访问云数据库 SelectDB 版实例的密码。

filter.query.in.max.count

100

谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。

ignore-type

指在定临时视图中,读取 schema 时要忽略的字段类型。

例如:'ignore-type'='bitmap,hll'

sink.streaming.passthrough

false

将第一列的值不经过处理直接写入。

使用示例

示例环境中各个软件的版本如下:

软件

Java

Spark

Scala

SelectDB

版本

1.8

3.1.2

2.12

3.0.4

环境准备

  • 配置Spark环境。

    1. 下载并解压Spark安装包。本示例中使用Spark安装包:spark-3.1.2-bin-hadoop3.2.tgz。

      wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
      tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
    2. 将spark-doris-connector-3.2_2.12-1.3.2.jar放到SPARK_HOME/jars目录下。

  • 构造需要导入的数据。本文以MySQL为例,构造少量样例数据来完成导入。

    1. 创建MySQL测试表。

      CREATE TABLE `employees` (
        `emp_no` int NOT NULL,
        `birth_date` date NOT NULL,
        `first_name` varchar(14) NOT NULL,
        `last_name` varchar(16) NOT NULL,
        `gender` enum('M','F') NOT NULL,
        `hire_date` date NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. 使用DMS构建测试数据,详情请参见测试数据构建

  • 配置云数据库 SelectDB 版实例。

    1. 创建云数据库 SelectDB 版实例,详情请参见创建实例

    2. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

    3. 创建测试数据库和测试表。

      1. 创建测试数据库。

        CREATE DATABASE test_db;
      2. 创建测试表。

        USE test_db;
        CREATE TABLE employees (
            emp_no       int NOT NULL,
            birth_date   date,
            first_name   varchar(20),
            last_name    varchar(20),
            gender       char(2),
            hire_date    date
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
    4. 开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址

    5. 将Spark环境的公网IP添加到IP白名单中,详情请参见设置白名单

同步MySQL数据到SelectDB

Spark SQL方式

本示例为如何使用Spark SQL方式将上游的MySQL数据导入至云数据库 SelectDB 版

  1. 启动spark-sql服务。

    bin/spark-sql
  2. 在spark-sql上提交任务。

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. Spark任务执行完成后,登录云数据库 SelectDB 版,查看通过Spark导入的数据。

DataFrame方式

本示例为如何使用DataFrame方式将上游的MySQL数据导入至云数据库 SelectDB 版

  1. 启动spark-shell服务。

    bin/spark-shell
  2. 在spark-shell上提交任务。

    val mysqlDF = spark.read.format("jdbc")
    	.option("url", "jdbc:mysql://host:port/test_db")
    	.option("dbtable", "employees")
    	.option("driver", "com.mysql.jdbc.Driver")
    	.option("user", "admin")
    	.option("password", "****")
    	.load()
    
    mysqlDF.write.format("doris")
    	.option("fenodes", "host:httpPort")
    	.option("table.identifier", "test_db.employees")
    	.option("user", "admin")
    	.option("password", "****")
    	.option("sink.batch.size", 100000)
    	.option("sink.max-retries", 3)
    	.option("sink.properties.format", "json")
    	.save()
  3. Spark任务执行完成后,登录云数据库 SelectDB 版,查看通过Spark导入的数据。