SelectDB

本文介绍如何使用自定义SelectDB连接器写入数据至云数据库SelectDB版。

背景信息

云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB

自定义SelectDB连接器支持的信息如下:

类别

详情

支持类型

结果表,数据摄入目标端

运行模式

流模式和批模式

数据格式

JSONCSV

特有监控指标

API种类

DataStreamSQL

是否支持更新/删除

特色功能

  • 支持整库数据同步。

  • SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。

  • 兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。

注意事项

  • 仅实时计算Flink版的引擎VVR 8.0.10及以上版本支持使用SelectDB自定义连接器。

  • SelectDB自定义连接器使用过程如有问题,请先提交工单给云数据库SelectDB版。

  • 同步数据至云数据库SelectDB版时,需要满足以下条件:

    • 已创建云数据库 SelectDB 版实例,如何购买实例请参见创建实例

    • 已配置IP白名单,配置白名单详情请参见设置白名单

SQL

SelectDB连接器可在SQL作业中用作结果表。

使用方法

说明

实时计算VVR 11.1及以上版本已内置SelectDB连接器,可跳过以下步骤。

  1. 单击JAR获取SelectDB自定义连接器(需要为1.15~1.17)。

  2. 实时计算开发控制台上,上传SelectDB自定义连接器,详情请参见管理自定义连接器

  3. SQL作业中使用SelectDB自定义连接器,作业开发详情请参见SQL作业开发

    connector为表类型,固定值为doris。SelectDB自定义连接器结果表参数配置详情请参见Sink配置项

语法结构

CREATE TABLE selectdb_sink (
  emp_no       INT ,
  birth_date   DATE,
  first_name   STRING,
  last_name    STRING,
  gender       STRING,
  hire_date    DATE
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'test.employees',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'true'
);

类型映射

详情请参见Doris & Flink Column Type Mapping

使用示例

本文以MySQL数据写入SelectDB为例为您详细介绍如何使用SelectDB自定义连接器。

  1. 准备工作。

    1. 创建Flink工作空间、MySQLSelectDB实例,详情请参见开通实时计算Flink第一步:快捷创建RDS MySQL实例与配置数据库创建实例

    2. MySQL中创建名称为order_dw_mysql的数据库和名称为orders的表并导入测试数据。

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee decimal(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    3. 通过DMS连接云数据库SelectDB版实例后创建名称为selectdb的数据库和名称为selecttable的表。

      CREATE DATABASE selectdb;
      
      CREATE TABLE `selecttable` (
        order_id bigint,
        user_id varchar(50),
        shop_id bigint,
        product_id bigint,
        buy_fee DECIMAL,   
        create_time DATETIME,
        update_time DATETIME,
        state int
       )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
    4. 将实时计算Flink版的虚拟交换机的网段信息添加到SelectDB的白名单中,详情请参见如何设置白名单?

  2. 实时计算开发控制台上创建Flink SQL作业并启动。

    1. 创建名称为mysqlcatalogMySQL Catalog,详情请参见管理MySQL Catalog

    2. 单击JAR获取SelectDB自定义连接器(需要为1.15~1.17),注册SelectDB自定义连接器,详情请参见管理自定义连接器

    3. 数据开发 > ETL新建作业草稿,代码示例如下。

      CREATE TEMPORARY TABLE  selectdb_sink (
        order_id BIGINT,
        user_id STRING,
        shop_id BIGINT,
        product_id BIGINT,
        buy_fee DECIMAL,   
        create_time TIMESTAMP(6),
        update_time TIMESTAMP(6),
        state int
      ) 
        WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'selectdb.selecttable',
        'username' = 'admin',
        'password' = '${secret_values.selectdb}',
        'sink.enable-delete' = 'true'
      );
      
      INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
    4. 单击部署后无状态启动作业,详情请参见部署作业作业启动

  3. 通过DMS连接云数据库SelectDB版实例后,查询名称为selecttable的表数据。

    SELECT * FROM `selecttable` ;

数据摄入

SelectDB连接器可以用于数据摄入YAML作业开发,作为目标端写入。

语法结构

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink
   fenodes: 127.0.0.1:8030
   username: root
   password: ""
   table.create.properties.replication_num: 1

配置项

参数

说明

是否必填

默认值

数据类型

备注

type

目标端类型。

(none)

String

固定值为 doris

name

目标端名称。

(none)

String

无。

fenodes

云数据库 SelectDB 版实例的访问和HTTP协议址地端口。

(none)

String

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

benodes

(none)

String

Http address of Doris cluster BE, such as 127.0.0.1:8040

jdbc-url

云数据库 SelectDB 版实例的JDBC连接信息。

(none)

String

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

示例:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

username

云数据库 SelectDB 版实例的数据库用户名。

(none)

String

Username of Doris cluster

password

云数据库 SelectDB 版实例对应数据库用户名的密码。

(none)

String

Password for Doris cluster

auto-redirect

是否重定向Stream Load请求。开启后Stream Load将通过FE写入,不再显示获取BE信息。

false

String

Whether to write through FE redirection and directly connect to BE to write

charset-encoding

false

Boolean

Charset encoding for doris http client, default UTF-8

sink.enable.batch-mode

是否使用攒批模式写入SelectDB,开启后写入时机不依赖Checkpoint,通过sink.buffer-flush.max-rows、sink.buffer-flush.max-bytessink.buffer-flush.interval参数来控制写入时机。

同时开启后将不保证EOS语义,可借助Unique模型做到幂等。

true

Boolean

Whether to use the batch method to write to Doris

sink.flush.queue-size

批处理模式下,缓存的队列大小。

2

Integer

Queue size for batch writing

sink.buffer-flush.max-rows

批处理模式下,单个批次最多写入的数据行数。

50000

Integer

Maximum number of Flush records in a single batch

sink.buffer-flush.max-bytes

批处理模式下,单个批次最多写入的字节数。

10485760(10MB)

Integer

Maximum number of bytes flushed in a single batch

sink.buffer-flush.interval

批处理模式下,异步刷新缓存的间隔。最小1s。

10s

String

Flush interval duration. If this time is exceeded, the data will be flushed asynchronously

sink.properties.

Stream Load的导入参数,请填写属性配置。

  • CSV格式时请写入:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • JSON格式时请写入:

    sink.properties.format='json' 

更多参数,请参见:Stream Load

(none)

String

Parameters of StreamLoad. For example: sink.properties.strict_mode: true. See more about StreamLoad Properties

table.create.properties.*

创建表的Properties配置。

(none)

String

Create the Properties configuration of the table. For example: table.create.properties.replication_num: 1. See more about Doris Table Properties

类型映射

Flink CDC Type

SelectDB Type

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

说明

Doris中,字符串以UTF-8编码存储,因此英文字符占用1字节,中文字符占用3字节。这里的长度乘以3。CHAR的最大长度为255。一旦超过,它将自动转换为VARCHAR类型。

VARCHAR(n)

VARCHAR(n*3)

说明

同上。这里的长度乘以3。VARCHAR的最大长度为65533。一旦超过,它将自动转换为STRING类型。

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING