本文介绍如何使用SelectDB连接器。
背景信息
云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB版。
自定义SelectDB连接器支持的信息如下:
|
类别 |
详情 |
|
支持类型 |
源表,结果表,维表和数据摄入目标端 |
|
运行模式 |
流模式和批模式 |
|
数据格式 |
JSON和CSV |
|
特有监控指标 |
无 |
|
API种类 |
DataStream、SQL,数据摄入YAML作业 |
|
是否支持更新/删除 |
是 |
特色功能
-
支持整库数据同步。
-
SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。
-
兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。
注意事项
SQL
使用方法
实时计算VVR 11.1及以上版本已内置SelectDB连接器,可跳过以下步骤。
语法结构
作为源表,需要开通集群直连,启用Arrow Flight功能。
从云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。
CREATE TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);
WITH参数
-
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
doris。fenodes
云数据库 SelectDB 版实例的访问和HTTP协议址地端口。
String
是
无
可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。
示例:
selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080。jdbc-url
jdbc 连接信息,
String
否
无
可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。
示例:
jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030。table.identifier
数据库表名。
String
是
无
示例:
db.tbl。username
用户名
String
是
无
如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。
password
密码
String
是
无
doris.request.retries
发送请求的重试次数。
Integer
否
3
无。
doris.request.connect.timeout
发送请求的连接超时时间。
Duration
否
30s
无。
doris.request.read.timeout
发送请求的读取超时时间。
Duration
否
30s
无。
-
源表独有
参数
说明
数据类型
是否必填
默认值
备注
doris.request.query.timeout
查询超时时间,默认值为 6 小时
Duration
否
21600s
固定值为
doris。doris.request.tablet.size
一个 Partition 对应的 Tablet 个数。
Integer
否
1
此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对数据库造成更大的压力。
doris.batch.size
一次从 BE 读取数据的最大行数。
Integer
否
4064
增大此数值可减少 Flink 与数据库之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit
单个查询的内存限制。
Integer
否
8192mb
默认为 8GB,单位为字节。
source.use-flight-sql
是否使用 Arrow Flight SQL 读取。
Boolean
否
false
无需配置。请直接从云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。
source.flight-sql-port
使用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port。
Integer
否
-
无。
-
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
sink.label-prefix
Stream Load 导入使用的 label 前缀。
String
否
--
多作业场景下要求全局唯一,用来保证 Flink 的 EOS 语义。相同的Label只能导入一次,确保不重复写入。
sink.properties.*
Stream Load 的导入参数。
String
否
--
CSV 格式配置
'sink.properties.column_separator' = ',', -- 使用逗号分隔 -- 如果数据中可能包含逗号,建议使用不可见字符,如: -- 'sink.properties.column_separator' = '\x01'JSON 格式配置
'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true' -- 或使用 strip_outer_arraysink.enable-delete
是否启用删除。此选项需要 Doris 表开启批量删除功能。
Boolean
否
true
只支持 Unique 模型。
sink.enable-2pc
是否开启两阶段提交 (2PC)。
Boolean
否
true
保证 Exactly-Once 语义。更多两阶段提交请参考显式事务操作。
sink.buffer-size
写数据缓存 buffer 大小。
Integer
否
1MB
单位字节。不建议修改,默认配置即可。
sink.buffer-count
写数据缓存 buffer 个数。
Integer
否
3
不建议修改,默认配置即可
sink.max-retries
Commit 失败后的最大重试次数。
Integer
否
3
无。
sink.enable.batch-mode
是否使用攒批模式写入。
Boolean
否
false
开启后写入时机不依赖 Checkpoint,通过
sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数来控制写入时机。同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。
sink.flush.queue-size
攒批模式下,缓存的队列大小。
Integer
否
2
无。
sink.buffer-flush.max-rows
攒批模式下,单个批次最多写入的数据行数。
Integer
否
500000
无。
sink.buffer-flush.max-bytes
攒批模式下,单个批次最多写入的字节数。
Integer
否
100MB
单位字节。
sink.buffer-flush.interval
攒批模式下,异步刷新缓存的间隔。
String
否
10s
单位毫秒。
sink.ignore.update-before
是否忽略 update-before 事件。
Boolean
否
true
无。
-
维表独有
参数
说明
数据类型
是否必填
默认值
备注
lookup.cache.max-rows
lookup 缓存的最大行数。
Integer
否
-1
-1默认为不开启缓存。lookup.cache.ttl
lookup 缓存的最大时间。
String
否
10s
单位毫秒。
lookup.max-retries
lookup 查询失败后的重试次数
Integer
否
1
无。
lookup.jdbc.async
是否开启异步的 lookup。
Boolean
否
false
无。
lookup.jdbc.read.batch.size
异步 lookup 下,每次查询的最大批次大小。
Integer
否
128
无。
lookup.jdbc.read.batch.queue-size
异步 lookup 时,中间缓冲队列的大小。
Integer
否
256
无。
lookup.jdbc.read.thread-size
每个 task 中 lookup 的 jdbc 线程数。
Integer
否
3
无。
使用示例
源表
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);
结果表
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****',
-- 'sink.label-prefix' = 'flink_orders' --相同的Label只能导入一次,确保不重复写入。
);
维表
CREATE TEMPORARY TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create TEMPORARY table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'admin',
'password' = '****'
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
数据摄入(公测中)
SelectDB连接器可以用于数据摄入YAML作业开发,作为目标端写入。
语法结构
source:
type: xxx
sink:
type: doris
name: Doris Sink
fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
username: root
password: ""
配置项
|
参数 |
说明 |
是否必填 |
默认值 |
数据类型 |
备注 |
|
type |
目标端类型。 |
是 |
(none) |
String |
固定值为 |
|
name |
目标端名称。 |
否 |
(none) |
String |
无。 |
|
fenodes |
云数据库 SelectDB 版实例的访问和HTTP协议址地端口。 |
是 |
(none) |
String |
您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例: |
|
jdbc-url |
云数据库 SelectDB 版实例的JDBC连接信息。 |
否 |
(none) |
String |
您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。 示例: |
|
username |
云数据库 SelectDB 版实例的数据库用户名。 |
是 |
(none) |
String |
如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。 |
|
password |
云数据库 SelectDB 版实例对应数据库用户名的密码。 |
是 |
(none) |
String |
|
|
sink.enable.batch-mode |
是否使用攒批模式写入SelectDB。 |
否 |
true |
Boolean |
开启后写入时机不依赖 Checkpoint,通过 同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。 |
|
sink.flush.queue-size |
批处理模式下,缓存的队列大小。 |
否 |
2 |
Integer |
Queue size for batch writing |
|
sink.buffer-flush.max-rows |
批处理模式下,单个批次最多写入的数据行数。 |
否 |
500000 |
Integer |
无。 |
|
sink.buffer-flush.max-bytes |
批处理模式下,单个批次最多写入的字节数。 |
否 |
100MB |
Integer |
无。 |
|
sink.buffer-flush.interval |
批处理模式下,异步刷新缓存的间隔。最小1s。 |
否 |
10s |
String |
无。 |
|
sink.properties.* |
Stream Load 的导入参数。 |
否 |
(none) |
String |
CSV 格式配置
JSON 格式配置
|
类型映射
|
Flink CDC Type |
SelectDB Type |
|
TINYINT |
TINYINT |
|
SMALLINT |
SMALLINT |
|
INT |
INT |
|
BIGINT |
BIGINT |
|
DECIMAL |
DECIMAL |
|
FLOAT |
FLOAT |
|
DOUBLE |
DOUBLE |
|
BOOLEAN |
BOOLEAN |
|
DATE |
DATE |
|
TIMESTAMP [(p)] |
DATETIME [(p)] |
|
TIMESTAMP_LTZ [(p)] |
DATETIME [(p)] |
|
CHAR(n) |
CHAR(n*3) 说明
在Doris中,字符串以UTF-8编码存储,因此英文字符占用1字节,中文字符占用3字节。这里的长度乘以3。CHAR的最大长度为255。一旦超过,它将自动转换为VARCHAR类型。 |
|
VARCHAR(n) |
VARCHAR(n*3) 说明
同上。这里的长度乘以3。VARCHAR的最大长度为65533。一旦超过,它将自动转换为STRING类型。 |
|
BINARY(n) |
STRING |
|
VARBINARY(N) |
STRING |
|
STRING |
STRING |