SelectDB

本文介绍如何使用自定义SelectDB连接器写入数据至云数据库SelectDB版。

背景信息

云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB

自定义SelectDB连接器支持的信息如下:

类别

详情

支持类型

源表,结果表,维表和数据摄入目标端

运行模式

流模式和批模式

数据格式

JSONCSV

特有监控指标

API种类

DataStreamSQL

是否支持更新/删除

特色功能

  • 支持整库数据同步。

  • SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。

  • 兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。

注意事项

  • 仅实时计算Flink版的引擎VVR 8.0.10及以上版本支持使用SelectDB自定义连接器。

  • SelectDB自定义连接器使用过程如有问题,请先提交工单给云数据库SelectDB版。

  • 同步数据至云数据库SelectDB版时,需要满足以下条件:

    • 已创建云数据库 SelectDB 版实例,如何购买实例请参见创建实例

    • 已配置IP白名单,配置白名单详情请参见设置白名单

SQL

SelectDB连接器可在SQL作业中用作结果表。

使用方法

说明

实时计算VVR 11.1及以上版本已内置SelectDB连接器,可跳过以下步骤。

  1. 单击JAR获取SelectDB自定义连接器(需要为1.15~1.17)。

  2. 实时计算开发控制台上,上传SelectDB自定义连接器,详情请参见管理自定义连接器

  3. SQL作业中使用SelectDB自定义连接器,作业开发详情请参见作业开发地图

    connector为表类型,固定值为doris。SelectDB自定义连接器结果表参数配置详情请参见Sink配置项

语法结构

说明

作为源表,需要开通集群直连,启用Arrow Flight功能。

云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。

CREATE TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为doris

    fenodes

    云数据库 SelectDB 版实例的访问和HTTP协议址地端口。

    String

    可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

    示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

    jdbc-url

    jdbc 连接信息,

    String

    可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

    示例:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

    table.identifier

    数据库表名。

    String

    示例:db.tbl

    username

    用户名

    String

    如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。

    password

    密码

    String

    doris.request.retries

    发送请求的重试次数。

    Integer

    3

    无。

    doris.request.connect.timeout

    发送请求的连接超时时间。

    Duration

    30s

    无。

    doris.request.read.timeout

    发送请求的读取超时时间。

    Duration

    30s

    无。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    doris.request.query.timeout

    查询超时时间,默认值为 6 小时

    Duration

    21600s

    固定值为doris

    doris.request.tablet.size

    一个 Partition 对应的 Tablet 个数。

    Integer

    1

    此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对数据库造成更大的压力。

    doris.batch.size

    一次从 BE 读取数据的最大行数。

    Integer

    4064

    增大此数值可减少 Flink 与数据库之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。

    doris.exec.mem.limit

    单个查询的内存限制。

    Integer

    8192mb

    默认为 8GB,单位为字节。

    source.use-flight-sql

    是否使用 Arrow Flight SQL 读取。

    Boolean

    false

    无需配置。请直接从云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。

    source.flight-sql-port

    使用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port。

    Integer

    -

    无。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sink.label-prefix

    Stream Load 导入使用的 label 前缀。

    String

    --

    多作业场景下要求全局唯一,用来保证 Flink 的 EOS 语义。相同的Label只能导入一次,确保不重复写入。

    sink.properties.*

    Stream Load 的导入参数。

    String

    --

    CSV 格式配置

    • 设置列分隔符:配置'sink.properties.column_separator' = ','

    • 转义特殊字符:配置'sink.properties.escape_delimiters' = 'true'。这会将\x01等字符转换为二进制字节。

    JSON 格式配置

    • 指定格式:配置'sink.properties.format' = 'json'

    • 按行读取:配置'sink.properties.read_json_by_line' = 'true'

    sink.enable-delete

    是否启用删除。此选项需要 Doris 表开启批量删除功能。

    Boolean

    true

    只支持 Unique 模型。

    sink.enable-2pc

    是否开启两阶段提交 (2PC)。

    Boolean

    true

    保证 Exactly-Once 语义。更多两阶段提交请参考显式事务操作

    sink.buffer-size

    写数据缓存 buffer 大小。

    Integer

    1MB

    单位字节。不建议修改,默认配置即可。

    sink.buffer-count

    写数据缓存 buffer 个数。

    Integer

    3

    不建议修改,默认配置即可

    sink.max-retries

    Commit 失败后的最大重试次数。

    Integer

    3

    无。

    sink.enable.batch-mode

    是否使用攒批模式写入。

    Boolean

    false

    开启后写入时机不依赖 Checkpoint,通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数来控制写入时机。

    同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。

    sink.flush.queue-size

    攒批模式下,缓存的队列大小。

    Integer

    2

    无。

    sink.buffer-flush.max-rows

    攒批模式下,单个批次最多写入的数据行数。

    Integer

    500000

    无。

    sink.buffer-flush.max-bytes

    攒批模式下,单个批次最多写入的字节数。

    Integer

    100MB

    单位字节。

    sink.buffer-flush.interval

    攒批模式下,异步刷新缓存的间隔。

    String

    10s

    单位毫秒。

    sink.ignore.update-before

    是否忽略 update-before 事件。

    Boolean

    true

    无。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    lookup.cache.max-rows

    lookup 缓存的最大行数。

    Integer

    -1

    -1默认为不开启缓存。

    lookup.cache.ttl

    lookup 缓存的最大时间。

    String

    10s

    单位毫秒。

    lookup.max-retries

    lookup 查询失败后的重试次数

    Integer

    1

    无。

    lookup.jdbc.async

    是否开启异步的 lookup。

    Boolean

    false

    无。

    lookup.jdbc.read.batch.size

    异步 lookup 下,每次查询的最大批次大小。

    Integer

    128

    无。

    lookup.jdbc.read.batch.queue-size

    异步 lookup 时,中间缓冲队列的大小。

    Integer

    256

    无。

    lookup.jdbc.read.thread-size

    每个 task 中 lookup 的 jdbc 线程数。

    Integer

    3

    无。

类型映射

Doris Type

Flink Type

NULL_TYPE

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

CHAR

STRING

LARGEINT

STRING

VARCHAR

STRING

STRING

STRING

ARRAY

ARRAY

MAP

STRING

JSON

STRING

VARIANT

STRING

IPV4

STRING

IPV6

STRING

使用示例

源表

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

结果表

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****',
--  'sink.label-prefix' = 'flink_orders' --相同的Label只能导入一次,确保不重复写入。
);

维表

CREATE TEMPORARY TABLE fact_table (
  `id` BIGINT,
  `name` STRING,
  `city` STRING,
  `process_time` as proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

create TEMPORARY table dim_city(
  `city` STRING,
  `level` INT ,
  `province` STRING,
  `country` STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
  'table.identifier' = 'dim.dim_city',
  'username' = 'admin',
  'password' = '****'
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

数据摄入

SelectDB连接器可以用于数据摄入YAML作业开发,作为目标端写入。

语法结构

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink
   fenodes: 127.0.0.1:8030
   username: root
   password: ""

配置项

参数

说明

是否必填

默认值

数据类型

备注

type

目标端类型。

(none)

String

固定值为 doris

name

目标端名称。

(none)

String

无。

fenodes

云数据库 SelectDB 版实例的访问和HTTP协议址地端口。

(none)

String

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

jdbc-url

云数据库 SelectDB 版实例的JDBC连接信息。

(none)

String

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

示例:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

username

云数据库 SelectDB 版实例的数据库用户名。

(none)

String

如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。

password

云数据库 SelectDB 版实例对应数据库用户名的密码。

(none)

String

sink.enable.batch-mode

是否使用攒批模式写入SelectDB

true

Boolean

开启后写入时机不依赖 Checkpoint,通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数来控制写入时机。

同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。

sink.flush.queue-size

批处理模式下,缓存的队列大小。

2

Integer

Queue size for batch writing

sink.buffer-flush.max-rows

批处理模式下,单个批次最多写入的数据行数。

500000

Integer

无。

sink.buffer-flush.max-bytes

批处理模式下,单个批次最多写入的字节数。

100MB

Integer

无。

sink.buffer-flush.interval

批处理模式下,异步刷新缓存的间隔。最小1s。

10s

String

无。

sink.properties.*

Stream Load 的导入参数。

(none)

String

CSV 格式配置

  • 设置列分隔符:配置'sink.properties.column_separator' = ','

  • 转义特殊字符:配置'sink.properties.escape_delimiters' = 'true'。这会将\x01等字符转换为二进制字节。

JSON 格式配置

  • 指定格式:配置'sink.properties.format' = 'json'

  • 按行读取:配置'sink.properties.read_json_by_line' = 'true'

类型映射

Flink CDC Type

SelectDB Type

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

说明

Doris中,字符串以UTF-8编码存储,因此英文字符占用1字节,中文字符占用3字节。这里的长度乘以3。CHAR的最大长度为255。一旦超过,它将自动转换为VARCHAR类型。

VARCHAR(n)

VARCHAR(n*3)

说明

同上。这里的长度乘以3。VARCHAR的最大长度为65533。一旦超过,它将自动转换为STRING类型。

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING