SQLServer CDC(公测中)

更新时间:
复制为 MD 格式

SQLServer CDC 连接器可以读取 SQLServer 数据库的快照数据和增量数据。本文为您介绍如何使用SQLServer CDC连接器。

背景信息

SQLServer CDC连接器支持的信息如下。

类别

详情

支持类型

源表

运行模式

仅支持流模式

数据格式

暂不适用

特有监控指标

特有的监控指标

  • currentFetchEventTimeLag:数据产生到拉取到Source Operator的间隔。

  • currentEmitEventTimeLag:数据产生到离开Source Operator的间隔。

  • sourceIdleTime:source至今有多久不产生新数据。

说明
  • currentFetchEventTimeLagcurrentEmitEventTimeLag指标仅在增量阶段有效,全量阶段该值恒为0。

  • 指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

不涉及

前提条件

SQLServer CDC连接器使用前,需要捕获的数据库和数据表需要启用变更数据捕获功能(CDC)。

数据表开启CDC功能可以执行如下SQL:

RDS SQLServer

EXEC sp_rds_cdc_enable_db;       -- 对应数据库开启CDC
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',         -- 指定源表的 schema
@source_name = N'MyTable',       -- 指定要捕获的表名
@role_name = N'MyRole',          -- 指定角色 MyRole
@supports_net_changes = 1;

自建SQLServer

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',        -- 指定源表的 schema
@source_name   = N'MyTable',    -- 指定要捕获的表名
@role_name     = N'MyRole',     -- 指定角色 MyRole,可将需要对源表捕获列
                                -- 拥有 SELECT 权限的用户添加到该角色。
                                -- sysadmin 或 db_owner 角色的用户也可以
                                -- 访问指定的变更表。设置为 NULL 则仅允许
                                -- sysadmin 或 db_owner 完全访问捕获信息。
@filegroup_name = N'MyDB_CT',   -- 指定文件组,SQL Server 会将变更表
                                -- 放置在该文件组中。该文件组必须已存在。
                                -- 建议不要将变更表与源表放在同一文件组中。
@supports_net_changes = 0;

验证CDC表的访问权限可使用如下SQL:

EXEC sys.sp_cdc_help_change_data_capture;

该查询返回数据库中已启用 CDC 的每个表的配置信息。如果对应表不存在,请检查表是否启用CDC,且用户是否拥有访问捕获实例和CDC表的权限。

使用限制

  • VVR 11.7及以上版本使用。

  • 使用前需确保目标数据库已启用 CDC。

由于连接器依赖 SQL Server 的 Change Data Capture 功能,存在以下限制:

  • Standard 版时需要 SQL Server 2016 SP1 及以上。

  • Enterprise 版需要 SQL Server 2012 及以上。

SQL

语法结构

CREATE TABLE sqlserver_cdc_source (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'sqlserver-cdc',
  'hostname'  = '<hostname>',
  'port'      = '<port>',
  'username'  = '<username>',
  'password'  = '<password>',
  'database-name' = '<database name>',
  'table-name'    = 'dbo.orders'
);

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

connector类型。

STRING

固定值为sqlserver-cdc

hostname

SQLServer数据库的IP地址或者Hostname。

STRING

无。

username

SQLServer数据库服务的用户名。

STRING

无。

password

SQLServer数据库服务的密码。

STRING

无。

database-name

数据库名称。

STRING

数据库名称。

table-name

需要捕获的表名。

STRING

多个表匹配可用,分隔多个<schemaName>.<tableName>,并支持使用正则表达式。例如:

配置值

含义

'table-name'='s1.t1,s2.t2'

包含s1.t1s2.t2两张表

'table-name'='s1\..*,s2\..*'

包含schema s1s2下的全部表

scan.startup.mode

消费数据时的启动模式。

STRING

initial

参数取值如下:

  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的日志数据。

  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从最新的变更日志开始读取,即只读取该连接器启动以后的最新变更。

重要

scan.startup.mode 选项依赖于 Debezium 的 snapshot.mode 配置,请勿同时使用两者。如果在表 DDL 中同时指定了两者,scan.startup.mode 可能不会生效。

port

SQLServer数据库服务的端口号。

INTEGER

1433

无。

server-time-zone

数据库服务器的会话时区,如 "Asia/Shanghai"

STRING

无。

scan.incremental.snapshot.enabled

是否开启增量快照。

BOOLEAN

true

参数取值如下:

  • false:不开启增量快照。

  • true(默认值):开启增量快照。

说明
  • 关闭增量快照框架(scan.incremental.snapshot.enabled = false)时,全量阶段无法执行 checkpoint。若表数据量较大,checkpoint 等待超时会触发作业失败重启。建议对大表开启增量快照框架。

  • 增量快照的功能优势,前提条件和使用限制详情请参见特色功能前提条件注意事项

chunk-meta.group.size

chunk 元数据的分组大小,超过该大小时元数据会被分成多组。

INTEGER

1000

无。

chunk-key.even-distribution.factor.lower-bound

chunk key 分布因子下界。

DOUBLE

0.05

分布因子用于判断表数据是否均匀分布。均匀分布时使用均匀计算优化,不均匀时会触发拆分查询。计算公式:(MAX(id) - MIN(id) + 1) / rowCount

chunk-key.even-distribution.factor.upper-bound

chunk key 分布因子上界。

DOUBLE

1000.0

分布因子用于判断表数据是否均匀分布。均匀分布时使用均匀计算优化,不均匀时会触发拆分查询。计算公式:(MAX(id) - MIN(id) + 1) / rowCount

scan.incremental.close-idle-reader.enabled

是否在快照结束后关闭空闲的Reader。

BOOLEAN

false

该配置生效需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue。

scan.incremental.snapshot.chunk.key-column

指定某一列作为快照阶段切分分片的切分列。

STRING

表快照的 chunk key 列。读取快照时按 chunk key 将表拆分为多个 chunk。默认使用主键的第一列,必须从主键列中选择。

scan.incremental.snapshot.unbounded-chunk-first.enabled

是否在快照阶段优先分配无界 chunk,有助于降低 TaskManager 在读取最大无界 chunk 时发生 OOM 的风险。

BOOLEAN

true

无。

scan.incremental.snapshot.backfill.skip

是否跳过全量阶段的日志读取。

Boolean

false

参数取值如下:

  • true:跳过。

    增量阶段从低水位线开始读取日志。

    如果下游算子或存储支持幂等性,建议跳过全量阶段日志的读取,缺点是仅能提供至少一次(At-Least Once)的语义保证

  • false(默认值):不跳过。

    全量阶段读取分片时,会读取低水位线和高水位线之间的日志来保证一致性。

    如果SQL要做聚合、关联等操作,不建议跳过全量阶段日志的读取。

debezium.*

Debezium属性参数。

STRING

更细粒度控制Debezium客户端的行为,详情请参见配置属性

无特别需求不建议自行配置Debezium参数,可能导致无法正常读取数据。

类型映射

SQLServerFlink字段类型对应关系如下。

SQLServer字段类型

Flink字段类型

bit

BOOLEAN

tinyint

SMALLINT

smallint

int

INT

bigint

BIGINT

float

DOUBLE

real

numeric

DECIMAL(p,s)

decimal(p,s)

money

smallmoney

date

DATE

time(n)

TIME(n)

datetime2

TIMESTAMP(n)

datetime

smalldatetime

datetimeoffset

TIMESTAMP_LTZ(3)

char(n)

CHAR(n)

varchar(n)

VARCHAR(n)

nvarchar(n)

nchar(n)

text

STRING

ntext

xml

附加说明

元数据

以下格式的元数据可作为只读(VIRTUAL)列在表定义中暴露:

元数据key

元数据类型

描述

database_name

STRING NOT NULL

包含该行记录的库名。

schema_name

STRING NOT NULL

包含该行记录的Schema名。

table_name

STRING NOT NULL

包含该行记录的表名。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

该行记录在数据库中的变更时间,如果该记录来自表的存量历史数据而不是Binlog中获取,则该值总是0。

使用元数据的建表示例:

CREATE TABLE products (
    table_name    STRING METADATA FROM 'table_name' VIRTUAL,
    schema_name   STRING METADATA FROM 'schema_name' VIRTUAL,
    db_name       STRING METADATA FROM 'database_name' VIRTUAL,
    operation_ts  TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    id            INT NOT NULL,
    name          STRING,
    description   STRING,
    weight        DECIMAL(10, 3)
) WITH (
    'connector'     = 'sqlserver-cdc',
    'hostname'      = 'localhost',
    'port'          = '1433',
    'username'      = 'sa',
    'password'      = 'Password!',
    'database-name' = 'inventory',
    'table-name'    = 'dbo.products'
);