本文介绍如何使用自定义SelectDB连接器写入数据至云数据库SelectDB版。
背景信息
云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB版。
自定义SelectDB连接器支持的信息如下:
类别 | 详情 |
支持类型 | 源表,结果表,维表和数据摄入目标端 |
运行模式 | 流模式和批模式 |
数据格式 | JSON和CSV |
特有监控指标 | 无 |
API种类 | DataStream和SQL |
是否支持更新/删除 | 是 |
特色功能
支持整库数据同步。
SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。
兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。
注意事项
SQL
SelectDB连接器可在SQL作业中用作结果表。
使用方法
实时计算VVR 11.1及以上版本已内置SelectDB连接器,可跳过以下步骤。
语法结构
作为源表,需要开通集群直连,启用Arrow Flight功能。
从云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。
CREATE TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
doris。fenodes
云数据库 SelectDB 版实例的访问和HTTP协议址地端口。
String
是
无
可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。
示例:
selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080。jdbc-url
jdbc 连接信息,
String
否
无
可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。
示例:
jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030。table.identifier
数据库表名。
String
是
无
示例:
db.tbl。username
用户名
String
是
无
如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。
password
密码
String
是
无
doris.request.retries
发送请求的重试次数。
Integer
否
3
无。
doris.request.connect.timeout
发送请求的连接超时时间。
Duration
否
30s
无。
doris.request.read.timeout
发送请求的读取超时时间。
Duration
否
30s
无。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
doris.request.query.timeout
查询超时时间,默认值为 6 小时
Duration
否
21600s
固定值为
doris。doris.request.tablet.size
一个 Partition 对应的 Tablet 个数。
Integer
否
1
此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对数据库造成更大的压力。
doris.batch.size
一次从 BE 读取数据的最大行数。
Integer
否
4064
增大此数值可减少 Flink 与数据库之间建立连接的次数。从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit
单个查询的内存限制。
Integer
否
8192mb
默认为 8GB,单位为字节。
source.use-flight-sql
是否使用 Arrow Flight SQL 读取。
Boolean
否
false
无需配置。请直接从云数据库 SelectDB 版控制台的实例详情 > 网络信息中单击开通集群直连。
source.flight-sql-port
使用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port。
Integer
否
-
无。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
sink.label-prefix
Stream Load 导入使用的 label 前缀。
String
否
--
多作业场景下要求全局唯一,用来保证 Flink 的 EOS 语义。相同的Label只能导入一次,确保不重复写入。
sink.properties.*
Stream Load 的导入参数。
String
否
--
CSV 格式配置
设置列分隔符:配置
'sink.properties.column_separator' = ','。转义特殊字符:配置
'sink.properties.escape_delimiters' = 'true'。这会将\x01等字符转换为二进制字节。
JSON 格式配置
指定格式:配置
'sink.properties.format' = 'json'。按行读取:配置
'sink.properties.read_json_by_line' = 'true'。
sink.enable-delete
是否启用删除。此选项需要 Doris 表开启批量删除功能。
Boolean
否
true
只支持 Unique 模型。
sink.enable-2pc
是否开启两阶段提交 (2PC)。
Boolean
否
true
保证 Exactly-Once 语义。更多两阶段提交请参考显式事务操作。
sink.buffer-size
写数据缓存 buffer 大小。
Integer
否
1MB
单位字节。不建议修改,默认配置即可。
sink.buffer-count
写数据缓存 buffer 个数。
Integer
否
3
不建议修改,默认配置即可
sink.max-retries
Commit 失败后的最大重试次数。
Integer
否
3
无。
sink.enable.batch-mode
是否使用攒批模式写入。
Boolean
否
false
开启后写入时机不依赖 Checkpoint,通过
sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数来控制写入时机。同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。
sink.flush.queue-size
攒批模式下,缓存的队列大小。
Integer
否
2
无。
sink.buffer-flush.max-rows
攒批模式下,单个批次最多写入的数据行数。
Integer
否
500000
无。
sink.buffer-flush.max-bytes
攒批模式下,单个批次最多写入的字节数。
Integer
否
100MB
单位字节。
sink.buffer-flush.interval
攒批模式下,异步刷新缓存的间隔。
String
否
10s
单位毫秒。
sink.ignore.update-before
是否忽略 update-before 事件。
Boolean
否
true
无。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
lookup.cache.max-rows
lookup 缓存的最大行数。
Integer
否
-1
-1默认为不开启缓存。lookup.cache.ttl
lookup 缓存的最大时间。
String
否
10s
单位毫秒。
lookup.max-retries
lookup 查询失败后的重试次数
Integer
否
1
无。
lookup.jdbc.async
是否开启异步的 lookup。
Boolean
否
false
无。
lookup.jdbc.read.batch.size
异步 lookup 下,每次查询的最大批次大小。
Integer
否
128
无。
lookup.jdbc.read.batch.queue-size
异步 lookup 时,中间缓冲队列的大小。
Integer
否
256
无。
lookup.jdbc.read.thread-size
每个 task 中 lookup 的 jdbc 线程数。
Integer
否
3
无。
类型映射
Doris Type | Flink Type |
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
STRING | STRING |
ARRAY | ARRAY |
MAP | STRING |
JSON | STRING |
VARIANT | STRING |
IPV4 | STRING |
IPV6 | STRING |
使用示例
源表
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);结果表
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****',
-- 'sink.label-prefix' = 'flink_orders' --相同的Label只能导入一次,确保不重复写入。
);维表
CREATE TEMPORARY TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create TEMPORARY table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'admin',
'password' = '****'
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city数据摄入
SelectDB连接器可以用于数据摄入YAML作业开发,作为目标端写入。
语法结构
source:
type: xxx
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: ""
配置项
参数 | 说明 | 是否必填 | 默认值 | 数据类型 | 备注 |
type | 目标端类型。 | 是 | (none) | String | 固定值为 |
name | 目标端名称。 | 否 | (none) | String | 无。 |
fenodes | 云数据库 SelectDB 版实例的访问和HTTP协议址地端口。 | 是 | (none) | String | 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例: |
jdbc-url | 云数据库 SelectDB 版实例的JDBC连接信息。 | 否 | (none) | String | 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。 示例: |
username | 云数据库 SelectDB 版实例的数据库用户名。 | 是 | (none) | String | 如果遗忘密码,可以从云数据库 SelectDB 版控制台的实例详情右上角进行重置。 |
password | 云数据库 SelectDB 版实例对应数据库用户名的密码。 | 是 | (none) | String | |
sink.enable.batch-mode | 是否使用攒批模式写入SelectDB。 | 否 | true | Boolean | 开启后写入时机不依赖 Checkpoint,通过 同时开启后将不保证 Exactly-once 语义,但可借助 Uniq 模型做到幂等。 |
sink.flush.queue-size | 批处理模式下,缓存的队列大小。 | 否 | 2 | Integer | Queue size for batch writing |
sink.buffer-flush.max-rows | 批处理模式下,单个批次最多写入的数据行数。 | 否 | 500000 | Integer | 无。 |
sink.buffer-flush.max-bytes | 批处理模式下,单个批次最多写入的字节数。 | 否 | 100MB | Integer | 无。 |
sink.buffer-flush.interval | 批处理模式下,异步刷新缓存的间隔。最小1s。 | 否 | 10s | String | 无。 |
sink.properties.* | Stream Load 的导入参数。 | 否 | (none) | String | CSV 格式配置
JSON 格式配置
|
类型映射
Flink CDC Type | SelectDB Type |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP [(p)] | DATETIME [(p)] |
TIMESTAMP_LTZ [(p)] | DATETIME [(p)] |
CHAR(n) | CHAR(n*3) 说明 在Doris中,字符串以UTF-8编码存储,因此英文字符占用1字节,中文字符占用3字节。这里的长度乘以3。CHAR的最大长度为255。一旦超过,它将自动转换为VARCHAR类型。 |
VARCHAR(n) | VARCHAR(n*3) 说明 同上。这里的长度乘以3。VARCHAR的最大长度为65533。一旦超过,它将自动转换为STRING类型。 |
BINARY(n) | STRING |
VARBINARY(N) | STRING |
STRING | STRING |