文档

通过Spark导入数据

更新时间:
一键部署

云数据库 SelectDB 版支持通过Spark SelectDB Connector,利用Spark的分布式计算能力导入大批量数据。本文介绍使用Spark SelectDB Connector同步数据至云数据库 SelectDB 版的基本原理和使用方式。

功能介绍

Spark SelectDB Connector是云数据库 SelectDB 版导入大批量数据的方式之一。基于Spark的分布式计算能力,您可以将上游数据源(MySQL、PostgreSQL、HDFS、S3等)中的大量数据读取到DataFrame中,然后通过Spark SelectDB Connector导入到SelectDB表中。同时,您也可以使用Spark的JDBC方式来读取SelectDB表中的数据。

基础架构

Spark数据导入集成能力由Spark SelectDB Connector提供。该架构下,Spark SelectDB Connector通常作为外部数据写入到云数据库 SelectDB 版的桥梁,替代传统的低性能JDBC连接写入方式,以其分布式、高效的特性加速了整个数据链路的数据流动。

image

工作原理

Spark SelectDB Connector底层实现依赖于SelectDB的Stage导入方式,当前支持两种使用方式:

  • 通过在用户对象存储上创建外部Stage(External Stage),进行批量数据拉取和导入,适用于大规模数据导入的场景。

    这种导入方式依赖用户侧的对象存储。首先,需要在SelectDB中基于用户对象存储创建外部Stage,并将外部Stage的访问权限授予SelectDB用户。然后,将需要导入的数据存储在外部Stage对应的对象存储中,并通过Spark调用SelectDB的COPY INTO接口(/copy/query),将对象存储的数据导入SelectDB表中。

  • 通过SelectDB内置的Stage的推送导入,适用于中小批量数据导入。

    这种导入方式依赖SelectDB提供的内置对象存储。Spark调用SelectDB的Upload接口(/copy/upload)将数据上传至内置对象存储中,再调用SelectDB的COPY INTO接口(/copy/query)将对象存储的数据导SelectDB表中。

依赖管理

下载方式

以下是三个预编译的Connector包,详细版本以及下载地址请参见下表:

Connector

Runtime JAR

2.3-2.11-1.0.1

spark-selectdb-connector-2.3_2.11-1.0.1

3.1-2.12-1.0.1

spark-selectdb-connector-3.1_2.12-1.0.1

3.2-2.12-1.0.1

spark-selectdb-connector-3.2_2.12-1.0.1

说明

上述JAR包使用Java 8进行编译,如果您有其他版本的需求,请联系云数据库 SelectDB 版技术支持。

本地开发方式

在本地开发中,通常会通过引入Maven依赖的方式将Spark SelectDB Connector的包引入到项目中。在Maven中,使用以下方式添加依赖:

<dependency>
  <groupId>com.selectdb.spark</groupId>
  <artifactId>spark-selectdb-connector-3.1_2.12</artifactId>
  <version>1.0.1</version>
</dependency>

Spark Standalone&Cluster方式

使用Spark Standalone(Spark单机)或者Spark Cluster(Spark集群)的方式运行Spark程序,只需要将下载的JAR包放置于Spark安装目录的jars目录下即可。

  • 如果是Spark Cluster,那么需要在每个Spark节点的jars目录下放一份Spark SelectDB Connector的JAR包。

  • 如果是以Yarn集群模式运行的Spark Cluster,则需要将JAR文件放入预部署包中。例如:

    1. 将spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar上传到HDFS。

      hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar /spark-jars/

    2. 在集群中添加spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar依赖。

      spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar

使用方式

通过Spark SQL方式写入

val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
  
CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "jdbc.url"="${selectdbJdbc}",
 "http.port"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.file.type"="json"
);

INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;

通过DataFrame方式写入

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "待付款"),
  ("2", 200, null),
  ("3", 300, "已收货")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("selectdb")
  .option("selectdb.http.port", selectdbHttpPort)
  .option("selectdb.table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

DataFrame参数配置

参数

默认值

是否必填

描述

selectdb.http.port

云数据库 SelectDB 版实例的HTTP协议访问地址。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

selectdb.jdbc.url

云数据库 SelectDB 版实例的JDBC连接串jdbc:mysql://<ip>:<port>/<dbname>,此配置用于Spark SQL。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

示例:jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030

selectdb.table.identifier

云数据库 SelectDB 版实例的表名,格式为:库名.表名。例如:test_db.test_table

user

访问云数据库 SelectDB 版实例的用户名。

password

访问云数据库 SelectDB 版实例的密码。

sink.batch.size

100000

单次写入云数据库 SelectDB 版实例的最大行数。

sink.max-retries

3

写入云数据库 SelectDB 版失败之后的重试次数。

sink.properties.*

COPY INTO接口的导入参数。例如:"sink.properties.file.type"="json"。更多COPY INTO的参数说明,请参见COPY INTO

使用示例

示例环境中各个软件的版本如下:

软件

Java

Spark

Scala

SelectDB

版本

1.8

3.1.2

2.12

3.0.4

环境准备

  • 配置Spark环境。

    1. 下载并解压Spark安装包。本示例中使用Spark安装包:spark-3.1.2-bin-hadoop3.2.tgz。

      wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
      tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
    2. 将spark-selectdb-connector-3.2_2.12-1.0.1.jar放到SPARK_HOME/jars目录下。

  • 构造需要导入的数据。本文以MySQL为例,构造少量样例数据来完成导入。

    1. 创建MySQL测试表。

      CREATE TABLE `employees` (
        `emp_no` int NOT NULL,
        `birth_date` date NOT NULL,
        `first_name` varchar(14) NOT NULL,
        `last_name` varchar(16) NOT NULL,
        `gender` enum('M','F') NOT NULL,
        `hire_date` date NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. 使用DMS构建测试数据,详情请参见测试数据构建

  • 配置云数据库 SelectDB 版实例。

    1. 创建云数据库 SelectDB 版实例,详情请参见创建实例

    2. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

    3. 创建测试数据库和测试表。

      1. 创建测试数据库。

        CREATE DATABASE test_db;
      2. 创建测试表。

        USE test_db;
        CREATE TABLE employees (
            emp_no       int NOT NULL,
            birth_date   date,
            first_name   varchar(20),
            last_name    varchar(20),
            gender       char(2),
            hire_date    date
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
    4. 开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址

    5. 将Spark环境的公网IP添加到IP白名单中,详情请参见设置白名单

通过DataFrame方式同步MySQL数据到SelectDB

本文以MySQL为例,介绍如何通过DataFrame方式将上游的MySQL数据导入至云数据库 SelectDB 版

  1. 启动spark-shell服务。

    bin/spark-shell
  2. 在spark-shell上提交任务。

    val mysqlDF = spark.read.format("jdbc")
    	.option("url", "jdbc:mysql://host:port/test_db")
    	.option("dbtable", "employees")
    	.option("driver", "com.mysql.jdbc.Driver")
    	.option("user", "admin")
    	.option("password", "****")
    	.load()
    
    mysqlDF.write.format("selectdb")
    	.option("selectdb.http.port", "host:port")
    	.option("selectdb.table.identifier", "test_db.employees")
    	.option("user", "admin")
    	.option("password", "****")
    	.option("sink.batch.size", 100000)
    	.option("sink.max-retries", 3)
    	.option("sink.properties.file.type", "json")
    	.save()
  3. Spark任务执行完成后,登录云数据库 SelectDB 版,查看通过Spark导入的数据。

通过Spark SQL方式同步MySQL数据到SelectDB

本文以MySQL为例,介绍如何通过Spark SQL方式将上游的MySQL数据导入至云数据库 SelectDB 版

  1. 启动spark-sql服务。

    bin/spark-sql
  2. 在spark-sql上提交任务。

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING selectdb
    OPTIONS(
     "table.identifier"="test_db.employees",
     "jdbc.url"="jdbc:mysql://selectdb-cn-****public.selectdbfe.rds.aliyuncs.com:9030",
     "http.port"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.file.type"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. Spark任务执行完成后,登录云数据库 SelectDB 版,查看通过Spark导入的数据。

  • 本页导读 (1)