云数据库 SelectDB 版兼容Apache Doris,支持通过Spark Doris Connector,利用Spark的分布式计算能力导入大批量数据。本文介绍使用Spark Doris Connector同步数据至云数据库 SelectDB 版的基本原理和使用方式。
功能介绍
Spark Doris Connector是云数据库 SelectDB 版导入大批量数据的方式之一。基于Spark的分布式计算能力,您可以将上游数据源(MySQL、PostgreSQL、HDFS、S3等)中的大量数据读取到DataFrame中,再通过Spark Doris Connector导入到云数据库 SelectDB 版表中。同时,您也可以使用Spark的JDBC方式来读取云数据库 SelectDB 版表中的数据。
工作原理
云数据库 SelectDB 版通过Spark Doris Connector导入数据的工作原理如下图所示。在这种架构下,Spark Doris Connector通常作为外部数据写入到云数据库 SelectDB 版的桥梁,利用其分布式计算集群对数据做预处理,加速了整个数据链路的数据流动,从而替代了传统的低性能JDBC连接写入方式。
前提条件
若使用Spark Doris Connector进行数据导入,必须确保使用的Connector包版本为 1.3.1 及之上。
引入Spark Doris Connector依赖
可以选择如下任一的方式获取Doris Connector依赖。
采用Maven时,引入依赖的方式如下所示。更多依赖版本请参见Maven仓库。
<dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.2</version> </dependency>
通过JAR包的方式引入Connector。
以下为您提供了三个常用的Connector,建议根据Spark版本选择对应的Connector包。更多依赖版本请参见Maven仓库。
说明下述JAR包使用Java 8进行编译,如果您有其他版本的需求,请联系云数据库 SelectDB 版技术支持。
下述列表中Connector列从左到右版本依次含义为该jar包支持的Spark版本、使用的Scala版本以及Connector版本。
Connector
Runtime JAR
2.4-2.12-1.3.2
3.1-2.12-1.3.2
3.2-2.12-1.3.2
获取到Jar包后,可通过如下方式使用:
Local方式运行Spark,将下载的JAR包放置于Spark安装目录的jars目录下。
Yarn集群模式运行Spark,将JAR文件放入预部署包中。示例如下:
将spark-doris-connector-3.2_2.12-1.3.2.jar上传到HDFS。
hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/
在集群中添加spark-doris-connector-3.2_2.12-1.3.2.jar依赖。
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar
使用方式
在Spark Client运行Spark后,或者引入Connector包进入Spark开发环境后,您可以通过Spark SQL方式或者Dataframe方式进行数据的同步操作。以下为如何将上游的Spark数据同步到云数据库 SelectDB 版的示例。
Spark SQL方式
val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
"table.identifier"="${selectdbTable}",
"fenodes"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.format"="json"
);
INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;
参数配置项说明
参数 | 默认值 | 是否必填 | 描述 |
fenodes | 无 | 是 | 云数据库 SelectDB 版实例的HTTP协议访问地址。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例: |
table.identifier | 无 | 是 | 云数据库 SelectDB 版实例的表名,格式为: |
request.retries | 3 | 否 | 向SelectDB发送请求的重试次数 |
request.connect.timeout.ms | 30000 | 否 | 向SelectDB发送请求的连接超时时间 |
request.read.timeout.ms | 30000 | 否 | 向SelectDB发送请求的读取超时时间 |
request.query.timeout.s | 3600 | 否 | 查询SelectDB的超时时间,默认值为1小时,-1表示无超时限制 |
request.tablet.size | Integer.MAX_VALUE | 否 | 一个RDD Partition对应的SelectDB Tablet个数。 此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对SelectDB造成更大的压力。 |
read.field | 无 | 否 | 读取SelectDB表的列名列表,多列之间使用逗号分隔 |
batch.size | 1024 | 否 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与SelectDB之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 |
exec.mem.limit | 2147483648 | 否 | 单个查询的内存限制。默认为 2GB,单位为字节。 |
deserialize.arrow.async | false | 否 | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch。 |
deserialize.queue.size | 64 | 否 | 异步转换Arrow格式的内部处理队列,当 |
write.fields | 无 | 否 | 指定写入SelectDB表的字段或者字段顺序,多列之间使用逗号分隔。默认写入时要按照SelectDB表字段顺序写入全部字段。 |
sink.batch.size | 100000 | 否 | 单次写BE的最大行数。 |
sink.max-retries | 0 | 否 | 写BE失败之后的重试次数。 |
sink.properties.format | csv | 否 | Stream Load的数据格式。共支持3种格式:csv,json,arrow。 |
sink.properties.* | -- | 否 | Stream Load的导入参数。例如:指定列分隔符: |
sink.task.partition.size | 无 | 否 | SelectDB写入任务对应的Partition个数。Spark RDD经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个Partition对应的记录数比较少,导致写入频率增加和计算资源浪费。 此数值设置越小,可以降低SelectDB写入频率,减少SelectDB合并压力。该参数配合 |
sink.task.use.repartition | false | 否 | 是否采用repartition方式控制SelectDB写入Partition数。默认值为 false,采用coalesce方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。 如果设置为true,则采用repartition方式(注意:可设置最后Partition数,但会额外增加shuffle开销)。 |
sink.batch.interval.ms | 50 | 否 | 每个批次sink的间隔时间,单位 ms。 |
sink.enable-2pc | false | 否 | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 |
sink.auto-redirect | true | 否 | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。 |
user | 无 | 是 | 访问云数据库 SelectDB 版实例的用户名。 |
password | 无 | 是 | 访问云数据库 SelectDB 版实例的密码。 |
filter.query.in.max.count | 100 | 否 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
ignore-type | 无 | 否 | 指在定临时视图中,读取 schema 时要忽略的字段类型。 例如: |
DataFrame方式
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "待付款"),
("2", 200, null),
("3", 300, "已收货")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("doris")
.option("fenodes", selectdbHttpPort)
.option("table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 100000)
.option("sink.max-retries", 3)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()
参数配置项说明
参数 | 默认值 | 是否必填 | 描述 |
fenodes | 无 | 是 | 云数据库 SelectDB 版实例的HTTP协议访问地址。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例: |
table.identifier | 无 | 是 | 云数据库 SelectDB 版实例的表名,格式为: |
request.retries | 3 | 否 | 向SelectDB发送请求的重试次数 |
request.connect.timeout.ms | 30000 | 否 | 向SelectDB发送请求的连接超时时间 |
request.read.timeout.ms | 30000 | 否 | 向SelectDB发送请求的读取超时时间 |
request.query.timeout.s | 3600 | 否 | 查询SelectDB的超时时间,默认值为1小时,-1表示无超时限制 |
request.tablet.size | Integer.MAX_VALUE | 否 | 一个RDD Partition对应的SelectDB Tablet个数。 此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对SelectDB造成更大的压力。 |
read.field | 无 | 否 | 读取SelectDB表的列名列表,多列之间使用逗号分隔 |
batch.size | 1024 | 否 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与SelectDB之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。 |
exec.mem.limit | 2147483648 | 否 | 单个查询的内存限制。默认为 2GB,单位为字节。 |
deserialize.arrow.async | false | 否 | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch。 |
deserialize.queue.size | 64 | 否 | 异步转换Arrow格式的内部处理队列,当 |
write.fields | 无 | 否 | 指定写入SelectDB表的字段或者字段顺序,多列之间使用逗号分隔。默认写入时要按照SelectDB表字段顺序写入全部字段。 |
sink.batch.size | 100000 | 否 | 单次写BE的最大行数。 |
sink.max-retries | 0 | 否 | 写BE失败之后的重试次数。 |
sink.properties.format | csv | 否 | Stream Load的数据格式。共支持3种格式:csv,json,arrow。 |
sink.properties.* | -- | 否 | Stream Load的导入参数。例如:指定列分隔符: |
sink.task.partition.size | 无 | 否 | SelectDB写入任务对应的Partition个数。Spark RDD经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个Partition对应的记录数比较少,导致写入频率增加和计算资源浪费。 此数值设置越小,可以降低SelectDB写入频率,减少SelectDB合并压力。该参数配合 |
sink.task.use.repartition | false | 否 | 是否采用repartition方式控制SelectDB写入Partition数。默认值为 false,采用coalesce方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。 如果设置为true,则采用repartition方式(注意:设置最后Partition数,但会额外增加shuffle开销)。 |
sink.batch.interval.ms | 50 | 否 | 每个批次sink的间隔时间,单位 ms。 |
sink.enable-2pc | false | 否 | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 |
sink.auto-redirect | true | 否 | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入, 不再显式获取 BE 信息。 |
user | 无 | 是 | 访问云数据库 SelectDB 版实例的用户名。 |
password | 无 | 是 | 访问云数据库 SelectDB 版实例的密码。 |
filter.query.in.max.count | 100 | 否 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
ignore-type | 无 | 否 | 指在定临时视图中,读取 schema 时要忽略的字段类型。 例如: |
sink.streaming.passthrough | false | 否 | 将第一列的值不经过处理直接写入。 |
使用示例
示例环境中各个软件的版本如下:
软件 | Java | Spark | Scala | SelectDB |
版本 | 1.8 | 3.1.2 | 2.12 | 3.0.4 |
环境准备
配置Spark环境。
下载并解压Spark安装包。本示例中使用Spark安装包:spark-3.1.2-bin-hadoop3.2.tgz。
wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
将spark-doris-connector-3.2_2.12-1.3.2.jar放到SPARK_HOME/jars目录下。
构造需要导入的数据。本文以MySQL为例,构造少量样例数据来完成导入。
创建MySQL测试表。
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
使用DMS构建测试数据,详情请参见测试数据构建。
配置云数据库 SelectDB 版实例。
创建云数据库 SelectDB 版实例,详情请参见创建实例。
通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例。
创建测试数据库和测试表。
创建测试数据库。
CREATE DATABASE test_db;
创建测试表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址。
将Spark环境的公网IP添加到IP白名单中,详情请参见设置白名单。
同步MySQL数据到SelectDB
Spark SQL方式
本示例为如何使用Spark SQL方式将上游的MySQL数据导入至云数据库 SelectDB 版。
启动spark-sql服务。
bin/spark-sql
在spark-sql上提交任务。
CREATE TEMPORARY VIEW mysql_tbl USING jdbc OPTIONS( "url"="jdbc:mysql://host:port/test_db", "dbtable"="employees", "driver"="com.mysql.jdbc.Driver", "user"="admin", "password"="****" ); CREATE TEMPORARY VIEW selectdb_tbl USING doris OPTIONS( "table.identifier"="test_db.employees", "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080", "user"="admin", "password"="****", "sink.properties.format"="json" ); INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
Spark任务执行完成后,登录云数据库 SelectDB 版,查看通过Spark导入的数据。
DataFrame方式
本示例为如何使用DataFrame方式将上游的MySQL数据导入至云数据库 SelectDB 版。
启动spark-shell服务。
bin/spark-shell
在spark-shell上提交任务。
val mysqlDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://host:port/test_db") .option("dbtable", "employees") .option("driver", "com.mysql.jdbc.Driver") .option("user", "admin") .option("password", "****") .load() mysqlDF.write.format("doris") .option("fenodes", "host:httpPort") .option("table.identifier", "test_db.employees") .option("user", "admin") .option("password", "****") .option("sink.batch.size", 100000) .option("sink.max-retries", 3) .option("sink.properties.format", "json") .save()
Spark任务执行完成后,登录云数据库 SelectDB 版,查看通过Spark导入的数据。