SQLServer CDC 连接器可以读取 SQLServer 数据库的快照数据和增量数据。本文为您介绍如何使用SQLServer CDC连接器。
背景信息
SQLServer CDC连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用 |
特有监控指标 | |
API种类 | SQL |
是否支持更新或删除结果表数据 | 不涉及 |
前提条件
SQLServer CDC连接器使用前,需要捕获的数据库和数据表需要启用变更数据捕获功能(CDC)。
数据表开启CDC功能可以执行如下SQL:
RDS SQLServer
EXEC sp_rds_cdc_enable_db; -- 对应数据库开启CDC
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo', -- 指定源表的 schema
@source_name = N'MyTable', -- 指定要捕获的表名
@role_name = N'MyRole', -- 指定角色 MyRole
@supports_net_changes = 1;自建SQLServer
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo', -- 指定源表的 schema
@source_name = N'MyTable', -- 指定要捕获的表名
@role_name = N'MyRole', -- 指定角色 MyRole,可将需要对源表捕获列
-- 拥有 SELECT 权限的用户添加到该角色。
-- sysadmin 或 db_owner 角色的用户也可以
-- 访问指定的变更表。设置为 NULL 则仅允许
-- sysadmin 或 db_owner 完全访问捕获信息。
@filegroup_name = N'MyDB_CT', -- 指定文件组,SQL Server 会将变更表
-- 放置在该文件组中。该文件组必须已存在。
-- 建议不要将变更表与源表放在同一文件组中。
@supports_net_changes = 0;验证CDC表的访问权限可使用如下SQL:
EXEC sys.sp_cdc_help_change_data_capture;该查询返回数据库中已启用 CDC 的每个表的配置信息。如果对应表不存在,请检查表是否启用CDC,且用户是否拥有访问捕获实例和CDC表的权限。
使用限制
仅VVR 11.7及以上版本使用。
使用前需确保目标数据库已启用 CDC。
由于连接器依赖 SQL Server 的 Change Data Capture 功能,存在以下限制:
Standard 版时需要 SQL Server 2016 SP1 及以上。
Enterprise 版需要 SQL Server 2012 及以上。
SQL
语法结构
CREATE TABLE sqlserver_cdc_source (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '<hostname>',
'port' = '<port>',
'username' = '<username>',
'password' = '<password>',
'database-name' = '<database name>',
'table-name' = 'dbo.orders'
);WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | ||||||
connector | connector类型。 | STRING | 是 | 无 | 固定值为 | ||||||
hostname | SQLServer数据库的IP地址或者Hostname。 | STRING | 是 | 无 | 无。 | ||||||
username | SQLServer数据库服务的用户名。 | STRING | 是 | 无 | 无。 | ||||||
password | SQLServer数据库服务的密码。 | STRING | 是 | 无 | 无。 | ||||||
database-name | 数据库名称。 | STRING | 是 | 无 | 数据库名称。 | ||||||
table-name | 需要捕获的表名。 | STRING | 是 | 无 | 多个表匹配可用
| ||||||
scan.startup.mode | 消费数据时的启动模式。 | STRING | 否 | initial | 参数取值如下:
重要
| ||||||
port | SQLServer数据库服务的端口号。 | INTEGER | 否 | 1433 | 无。 | ||||||
server-time-zone | 数据库服务器的会话时区,如 | STRING | 否 | 无 | 无。 | ||||||
scan.incremental.snapshot.enabled | 是否开启增量快照。 | BOOLEAN | 否 | true | 参数取值如下:
| ||||||
chunk-meta.group.size | chunk 元数据的分组大小,超过该大小时元数据会被分成多组。 | INTEGER | 否 | 1000 | 无。 | ||||||
chunk-key.even-distribution.factor.lower-bound | chunk key 分布因子下界。 | DOUBLE | 否 | 0.05 | 分布因子用于判断表数据是否均匀分布。均匀分布时使用均匀计算优化,不均匀时会触发拆分查询。计算公式: | ||||||
chunk-key.even-distribution.factor.upper-bound | chunk key 分布因子上界。 | DOUBLE | 否 | 1000.0 | 分布因子用于判断表数据是否均匀分布。均匀分布时使用均匀计算优化,不均匀时会触发拆分查询。计算公式: | ||||||
scan.incremental.close-idle-reader.enabled | 是否在快照结束后关闭空闲的Reader。 | BOOLEAN | 否 | false | 该配置生效需要设置 | ||||||
scan.incremental.snapshot.chunk.key-column | 指定某一列作为快照阶段切分分片的切分列。 | STRING | 否 | 无 | 表快照的 chunk key 列。读取快照时按 chunk key 将表拆分为多个 chunk。默认使用主键的第一列,必须从主键列中选择。 | ||||||
scan.incremental.snapshot.unbounded-chunk-first.enabled | 是否在快照阶段优先分配无界 chunk,有助于降低 TaskManager 在读取最大无界 chunk 时发生 OOM 的风险。 | BOOLEAN | 否 | true | 无。 | ||||||
scan.incremental.snapshot.backfill.skip | 是否跳过全量阶段的日志读取。 | Boolean | 否 | false | 参数取值如下:
| ||||||
debezium.* | Debezium属性参数。 | STRING | 否 | 无 | 更细粒度控制Debezium客户端的行为,详情请参见配置属性。 无特别需求不建议自行配置Debezium参数,可能导致无法正常读取数据。 |
类型映射
SQLServer和Flink字段类型对应关系如下。
SQLServer字段类型 | Flink字段类型 |
bit | BOOLEAN |
tinyint | SMALLINT |
smallint | |
int | INT |
bigint | BIGINT |
float | DOUBLE |
real | |
numeric | DECIMAL(p,s) |
decimal(p,s) | |
money | |
smallmoney | |
date | DATE |
time(n) | TIME(n) |
datetime2 | TIMESTAMP(n) |
datetime | |
smalldatetime | |
datetimeoffset | TIMESTAMP_LTZ(3) |
char(n) | CHAR(n) |
varchar(n) | VARCHAR(n) |
nvarchar(n) | |
nchar(n) | |
text | STRING |
ntext | |
xml |
附加说明
元数据
以下格式的元数据可作为只读(VIRTUAL)列在表定义中暴露:
元数据key | 元数据类型 | 描述 |
database_name | STRING NOT NULL | 包含该行记录的库名。 |
schema_name | STRING NOT NULL | 包含该行记录的Schema名。 |
table_name | STRING NOT NULL | 包含该行记录的表名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该行记录在数据库中的变更时间,如果该记录来自表的存量历史数据而不是Binlog中获取,则该值总是0。 |
使用元数据的建表示例:
CREATE TABLE products (
table_name STRING METADATA FROM 'table_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10, 3)
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'localhost',
'port' = '1433',
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'table-name' = 'dbo.products'
);