本文介绍如何使用自定义SelectDB连接器写入数据至云数据库SelectDB版。
背景信息
云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB版。
自定义SelectDB连接器支持的信息如下:
类别 | 详情 |
支持类型 | 结果表,数据摄入目标端 |
运行模式 | 流模式和批模式 |
数据格式 | JSON和CSV |
特有监控指标 | 无 |
API种类 | DataStream和SQL |
是否支持更新/删除 | 是 |
特色功能
支持整库数据同步。
SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。
兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。
注意事项
SQL
SelectDB连接器可在SQL作业中用作结果表。
使用方法
实时计算VVR 11.1及以上版本已内置SelectDB连接器,可跳过以下步骤。
语法结构
CREATE TABLE selectdb_sink (
emp_no INT ,
birth_date DATE,
first_name STRING,
last_name STRING,
gender STRING,
hire_date DATE
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'test.employees',
'username' = 'admin',
'password' = '****',
'sink.enable-delete' = 'true'
);
类型映射
使用示例
本文以MySQL数据写入SelectDB为例为您详细介绍如何使用SelectDB自定义连接器。
准备工作。
创建Flink工作空间、MySQL和SelectDB实例,详情请参见开通实时计算Flink版、第一步:快捷创建RDS MySQL实例与配置数据库和创建实例。
在MySQL中创建名称为order_dw_mysql的数据库和名称为orders的表并导入测试数据。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee decimal(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
通过DMS连接云数据库SelectDB版实例后创建名称为selectdb的数据库和名称为selecttable的表。
CREATE DATABASE selectdb; CREATE TABLE `selecttable` ( order_id bigint, user_id varchar(50), shop_id bigint, product_id bigint, buy_fee DECIMAL, create_time DATETIME, update_time DATETIME, state int )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
将实时计算Flink版的虚拟交换机的网段信息添加到SelectDB的白名单中,详情请参见如何设置白名单?。
在 实时计算开发控制台上创建Flink SQL作业并启动。
创建名称为mysqlcatalog的MySQL Catalog,详情请参见管理MySQL Catalog。
单击JAR包获取SelectDB自定义连接器(需要为1.15~1.17),注册SelectDB自定义连接器,详情请参见管理自定义连接器。
在
新建作业草稿,代码示例如下。CREATE TEMPORARY TABLE selectdb_sink ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee DECIMAL, create_time TIMESTAMP(6), update_time TIMESTAMP(6), state int ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'selectdb.selecttable', 'username' = 'admin', 'password' = '${secret_values.selectdb}', 'sink.enable-delete' = 'true' ); INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
通过DMS连接云数据库SelectDB版实例后,查询名称为selecttable的表数据。
SELECT * FROM `selecttable` ;
数据摄入
SelectDB连接器可以用于数据摄入YAML作业开发,作为目标端写入。
语法结构
source:
type: xxx
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.replication_num: 1
配置项
参数 | 说明 | 是否必填 | 默认值 | 数据类型 | 备注 |
type | 目标端类型。 | 是 | (none) | String | 固定值为 |
name | 目标端名称。 | 否 | (none) | String | 无。 |
fenodes | 云数据库 SelectDB 版实例的访问和HTTP协议址地端口。 | 是 | (none) | String | 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例: |
benodes | 否 | (none) | String | Http address of Doris cluster BE, such as 127.0.0.1:8040 | |
jdbc-url | 云数据库 SelectDB 版实例的JDBC连接信息。 | 否 | (none) | String | 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。 示例: |
username | 云数据库 SelectDB 版实例的数据库用户名。 | 是 | (none) | String | Username of Doris cluster |
password | 云数据库 SelectDB 版实例对应数据库用户名的密码。 | 否 | (none) | String | Password for Doris cluster |
auto-redirect | 是否重定向Stream Load请求。开启后Stream Load将通过FE写入,不再显示获取BE信息。 | 否 | false | String | Whether to write through FE redirection and directly connect to BE to write |
charset-encoding | 否 | false | Boolean | Charset encoding for doris http client, default UTF-8 | |
sink.enable.batch-mode | 是否使用攒批模式写入SelectDB,开启后写入时机不依赖Checkpoint,通过sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes和sink.buffer-flush.interval参数来控制写入时机。 同时开启后将不保证EOS语义,可借助Unique模型做到幂等。 | 否 | true | Boolean | Whether to use the batch method to write to Doris |
sink.flush.queue-size | 批处理模式下,缓存的队列大小。 | 否 | 2 | Integer | Queue size for batch writing |
sink.buffer-flush.max-rows | 批处理模式下,单个批次最多写入的数据行数。 | 否 | 50000 | Integer | Maximum number of Flush records in a single batch |
sink.buffer-flush.max-bytes | 批处理模式下,单个批次最多写入的字节数。 | 否 | 10485760(10MB) | Integer | Maximum number of bytes flushed in a single batch |
sink.buffer-flush.interval | 批处理模式下,异步刷新缓存的间隔。最小1s。 | 否 | 10s | String | Flush interval duration. If this time is exceeded, the data will be flushed asynchronously |
sink.properties. | Stream Load的导入参数,请填写属性配置。
更多参数,请参见:Stream Load。 | 否 | (none) | String | Parameters of StreamLoad. For example: |
table.create.properties.* | 创建表的Properties配置。 | 否 | (none) | String | Create the Properties configuration of the table. For example: |
类型映射
Flink CDC Type | SelectDB Type |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP [(p)] | DATETIME [(p)] |
TIMESTAMP_LTZ [(p)] | DATETIME [(p)] |
CHAR(n) | CHAR(n*3) 说明 在Doris中,字符串以UTF-8编码存储,因此英文字符占用1字节,中文字符占用3字节。这里的长度乘以3。CHAR的最大长度为255。一旦超过,它将自动转换为VARCHAR类型。 |
VARCHAR(n) | VARCHAR(n*3) 说明 同上。这里的长度乘以3。VARCHAR的最大长度为65533。一旦超过,它将自动转换为STRING类型。 |
BINARY(n) | STRING |
VARBINARY(N) | STRING |
STRING | STRING |