SelectDB

更新时间:
复制为 MD 格式

本文介绍如何使用SelectDB连接器。

背景信息

云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB

自定义SelectDB连接器支持的信息如下:

类别

详情

支持类型

源表,结果表,维表和数据摄入目标端

运行模式

流模式和批模式

数据格式

JSONCSV

特有监控指标

API种类

DataStreamSQL

是否支持更新/删除

特色功能

  • 支持整库数据同步。

  • SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。

  • 兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。

注意事项

  • 仅实时计算Flink版的引擎VVR 8.0.10及以上版本支持使用SelectDB自定义连接器。

  • SelectDB自定义连接器使用过程如有问题,请先提交工单给云数据库SelectDB版。

  • 同步数据至云数据库SelectDB版时,需要满足以下条件:

    • 已创建云数据库 SelectDB 版实例,如何购买实例请参见创建实例

    • 已配置IP白名单,配置白名单详情请参见设置白名单

SQL

使用方法

说明

实时计算VVR 11.1及以上版本已内置SelectDB连接器,可跳过以下步骤。

  1. 单击JAR获取SelectDB自定义连接器(需要为1.15~1.17)。

  2. 实时计算开发控制台上,上传SelectDB自定义连接器,详情请参见管理自定义连接器

  3. SQL作业中使用SelectDB自定义连接器,connector固定值为doris

语法结构

说明

作为源表,需要开通集群直连,启用Arrow Flight功能。

云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。

CREATE TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为doris

    fenodes

    云数据库 SelectDB 版实例的访问和HTTP协议址地端口。

    String

    可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

    示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

    jdbc-url

    jdbc 连接信息,

    String

    可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

    示例:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

    table.identifier

    数据库表名。

    String

    示例:db.tbl

    username

    用户名

    String

    如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。

    password

    密码

    String

    doris.request.retries

    发送请求的重试次数。

    Integer

    3

    无。

    doris.request.connect.timeout

    发送请求的连接超时时间。

    Duration

    30s

    无。

    doris.request.read.timeout

    发送请求的读取超时时间。

    Duration

    30s

    无。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    doris.request.query.timeout

    查询超时时间,默认值为 6 小时

    Duration

    21600s

    固定值为doris

    doris.request.tablet.size

    一个 Partition 对应的 Tablet 个数。

    Integer

    1

    此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对数据库造成更大的压力。

    doris.batch.size

    一次从 BE 读取数据的最大行数。

    Integer

    4064

    增大此数值可减少 Flink 与数据库之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。

    doris.exec.mem.limit

    单个查询的内存限制。

    Integer

    8192mb

    默认为 8GB,单位为字节。

    source.use-flight-sql

    是否使用 Arrow Flight SQL 读取。

    Boolean

    false

    无需配置。请直接从云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。

    source.flight-sql-port

    使用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port。

    Integer

    -

    无。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sink.label-prefix

    Stream Load 导入使用的 label 前缀。

    String

    --

    多作业场景下要求全局唯一,用来保证 Flink 的 EOS 语义。相同的Label只能导入一次,确保不重复写入。

    sink.properties.*

    Stream Load 的导入参数。

    String

    --

    CSV 格式配置

    'sink.properties.column_separator' = ',', -- 使用逗号分隔
    -- 如果数据中可能包含逗号,建议使用不可见字符,如:
    -- 'sink.properties.column_separator' = '\x01'

    JSON 格式配置

    'sink.properties.format' = 'json',
    'sink.properties.read_json_by_line' = 'true' -- 或使用 strip_outer_array

    sink.enable-delete

    是否启用删除。此选项需要 Doris 表开启批量删除功能。

    Boolean

    true

    只支持 Unique 模型。

    sink.enable-2pc

    是否开启两阶段提交 (2PC)。

    Boolean

    true

    保证 Exactly-Once 语义。更多两阶段提交请参考显式事务操作

    sink.buffer-size

    写数据缓存 buffer 大小。

    Integer

    1MB

    单位字节。不建议修改,默认配置即可。

    sink.buffer-count

    写数据缓存 buffer 个数。

    Integer

    3

    不建议修改,默认配置即可

    sink.max-retries

    Commit 失败后的最大重试次数。

    Integer

    3

    无。

    sink.enable.batch-mode

    是否使用攒批模式写入。

    Boolean

    false

    开启后写入时机不依赖 Checkpoint,通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数来控制写入时机。

    同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。

    sink.flush.queue-size

    攒批模式下,缓存的队列大小。

    Integer

    2

    无。

    sink.buffer-flush.max-rows

    攒批模式下,单个批次最多写入的数据行数。

    Integer

    500000

    无。

    sink.buffer-flush.max-bytes

    攒批模式下,单个批次最多写入的字节数。

    Integer

    100MB

    单位字节。

    sink.buffer-flush.interval

    攒批模式下,异步刷新缓存的间隔。

    String

    10s

    单位毫秒。

    sink.ignore.update-before

    是否忽略 update-before 事件。

    Boolean

    true

    无。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    lookup.cache.max-rows

    lookup 缓存的最大行数。

    Integer

    -1

    -1默认为不开启缓存。

    lookup.cache.ttl

    lookup 缓存的最大时间。

    String

    10s

    单位毫秒。

    lookup.max-retries

    lookup 查询失败后的重试次数

    Integer

    1

    无。

    lookup.jdbc.async

    是否开启异步的 lookup。

    Boolean

    false

    无。

    lookup.jdbc.read.batch.size

    异步 lookup 下,每次查询的最大批次大小。

    Integer

    128

    无。

    lookup.jdbc.read.batch.queue-size

    异步 lookup 时,中间缓冲队列的大小。

    Integer

    256

    无。

    lookup.jdbc.read.thread-size

    每个 task 中 lookup 的 jdbc 线程数。

    Integer

    3

    无。

使用示例

源表

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

结果表

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****',
--  'sink.label-prefix' = 'flink_orders' --相同的Label只能导入一次,确保不重复写入。
);

维表

CREATE TEMPORARY TABLE fact_table (
  `id` BIGINT,
  `name` STRING,
  `city` STRING,
  `process_time` as proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

create TEMPORARY table dim_city(
  `city` STRING,
  `level` INT ,
  `province` STRING,
  `country` STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
  'table.identifier' = 'dim.dim_city',
  'username' = 'admin',
  'password' = '****'
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

数据摄入(公测中)

SelectDB连接器可以用于数据摄入YAML作业开发,作为目标端写入。

语法结构

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink
   fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
   username: root
   password: ""

配置项

参数

说明

是否必填

默认值

数据类型

备注

type

目标端类型。

(none)

String

固定值为 doris

name

目标端名称。

(none)

String

无。

fenodes

云数据库 SelectDB 版实例的访问和HTTP协议址地端口。

(none)

String

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

jdbc-url

云数据库 SelectDB 版实例的JDBC连接信息。

(none)

String

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

示例:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

username

云数据库 SelectDB 版实例的数据库用户名。

(none)

String

如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。

password

云数据库 SelectDB 版实例对应数据库用户名的密码。

(none)

String

sink.enable.batch-mode

是否使用攒批模式写入SelectDB

true

Boolean

开启后写入时机不依赖 Checkpoint,通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数来控制写入时机。

同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。

sink.flush.queue-size

批处理模式下,缓存的队列大小。

2

Integer

Queue size for batch writing

sink.buffer-flush.max-rows

批处理模式下,单个批次最多写入的数据行数。

500000

Integer

无。

sink.buffer-flush.max-bytes

批处理模式下,单个批次最多写入的字节数。

100MB

Integer

无。

sink.buffer-flush.interval

批处理模式下,异步刷新缓存的间隔。最小1s。

10s

String

无。

sink.properties.*

Stream Load 的导入参数。

(none)

String

CSV 格式配置

sink.properties.column_separator: ',', -- 使用逗号分隔
-- 如果数据中可能包含逗号,建议使用不可见字符,如:
-- sink.properties.column_separator: '\x01'

JSON 格式配置

'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true' -- 或使用 strip_outer_array

类型映射

Flink CDC Type

SelectDB Type

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

说明

Doris中,字符串以UTF-8编码存储,因此英文字符占用1字节,中文字符占用3字节。这里的长度乘以3。CHAR的最大长度为255。一旦超过,它将自动转换为VARCHAR类型。

VARCHAR(n)

VARCHAR(n*3)

说明

同上。这里的长度乘以3。VARCHAR的最大长度为65533。一旦超过,它将自动转换为STRING类型。

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING