本文介绍如何使用自定义SelectDB连接器写入数据至云数据库SelectDB版。
背景信息
云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB版。
自定义SelectDB连接器支持的信息如下:
类别 | 详情 |
支持类型 | 结果表 |
运行模式 | 流模式和批模式 |
数据格式 | JSON和CSV |
特有监控指标 | 无 |
API种类 | DataStream和SQL |
是否支持更新/删除 | 是 |
特色功能
支持整库数据同步。
SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。
兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。
注意事项
使用方法
单击JAR包获取SelectDB自定义连接器(需要为1.15~1.17)。
在实时计算开发控制台上,上传SelectDB自定义连接器,详情请参见管理自定义连接器。
在SQL作业中使用SelectDB自定义连接器,作业开发详情请参见SQL作业开发。
具体的语法结构如下。
CREATE TABLE selectdb_sink ( emp_no INT , birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.employees', 'username' = 'admin', 'password' = '****', 'sink.enable-delete' = 'true' );
connector为表类型,固定值为
doris
。SelectDB自定义连接器结果表参数配置详情请参见Sink配置项。
类型映射
使用示例
本文以MySQL数据写入SelectDB为例为您详细介绍如何使用SelectDB自定义连接器。
准备工作。
创建Flink工作空间、MySQL和SelectDB实例,详情请参见开通实时计算Flink版、第一步:快捷创建RDS MySQL实例与配置数据库和创建实例。
在MySQL中创建名称为order_dw_mysql的数据库和名称为orders的表并导入测试数据。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee decimal(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
通过DMS连接云数据库SelectDB版实例后创建名称为selectdb的数据库和名称为selecttable的表。
CREATE DATABASE selectdb; CREATE TABLE `selecttable` ( order_id bigint, user_id varchar(50), shop_id bigint, product_id bigint, buy_fee DECIMAL, create_time DATETIME, update_time DATETIME, state int )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
在 实时计算开发控制台上创建Flink SQL作业并启动。
创建名称为mysqlcatalog的MySQL Catalog,详情请参见管理MySQL Catalog。
单击JAR包获取SelectDB自定义连接器(需要为1.15~1.17),注册SelectDB自定义连接器,详情请参见管理自定义连接器。
在
新建作业草稿,代码示例如下。CREATE TEMPORARY TABLE selectdb_sink ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee DECIMAL, create_time TIMESTAMP(6), update_time TIMESTAMP(6), state int ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'selectdb.selecttable', 'username' = 'admin', 'password' = '${secret_values.selectdb}', 'sink.enable-delete' = 'true' ); INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
通过DMS连接云数据库SelectDB版实例后,查询名称为selecttable的表数据。
SELECT * FROM `selecttable` ;