本文为您介绍如何使用PolarDB-X连接器。
背景信息
PolarDB 分布式版(PolarDB for Xscale,简称“PolarDB-X”)是阿里云自主设计研发的高性能云原生分布式数据库产品,为用户提供高吞吐、大存储、低延时、易扩展和超高可用的云时代数据库服务。
仅支持VVR 11.5及以上版本,配合PolarDB-X 2.0及更高版本使用。
目前,PolarDB-X CDC连接器只支持作为源表使用。如您需要对PolarDB-X实例进行维表查询、或作为结果表写入,请使用MySQL连接器(公测中)。
类别 | 详情 |
支持类型 | 源表 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用 |
特有监控指标 |
|
API种类 | SQL |
是否支持更新或删除结果表数据 | 否 |
特色功能
PolarDB-X CDC连接器针对Binlog解析阶段进行性能优化,支持PolarDB-X服务器侧对无关Binlog进行过滤和裁剪,从而提升吞吐量、节省网络带宽。
Binlog按需订阅示例
此版本支持在服务端对Binlog进行过滤、只发送所需的变更日志给客户端,从而起到降低网络流量压力、提升日志消费吞吐的优化作用。
例如,如果您只需订阅PolarDB-X服务器中db.table1和db.table2表的变更数据,可以像这样配置Flink SQL作业:
CREATE TABLE polardbx_table_foo (
... -- 在这里定义表结构
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
..., -- 其他参数
'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 只订阅对应表的数据
);相较于MySQL CDC连接器会将整个实例中全量的变更Binlog日志加载到本地、并在客户端进行过滤,PolarDB-X CDC连接器具备服务端过滤Binlog、客户端按需订阅Binlog的能力,能够大幅减少网络IO开销。
使用限制
如您需要使用服务端Binlog服务端过滤、按表订阅功能,需要保证PolarDB-X服务端版本为2.5.0及以上,且日志服务组件为5.4.20或更高版本。
SQL
语法结构
CREATE TABLE polardbx_customer_table(
`id` STRING,
[columnName dataType,]*
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
'username' = 'pdx_user',
'password' = 'pdx_password',
'database' = 'full_db',
'collection' = 'customers'
)WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 连接器名称。 | STRING | 是 | 无 | 固定值为polardbx-cdc。 |
hostname | PolarDB-X数据库的IP地址或者Hostname。 | STRING | 是 | 无 | 建议填写实例连接信息中的集群地址。 |
port | PolarDB-X数据库的服务端口号。 | INTEGER | 否 | 3306 | 无。 |
username | PolarDB-X数据库服务的用户名。 | STRING | 是 | 无 | 无。 |
password | PolarDB-X数据库服务的密码。 | STRING | 是 | 无 | 无。 |
database-name | PolarDB-X数据库名称。 | STRING | 是 | 无 | 支持使用正则表达式匹配读取多个数据库的数据。 说明 使用正则表达式时,不要使用^和$符号匹配开头和结尾。 |
table-name | PolarDB-X表名。 | STRING | 是 | 无 | 支持使用正则表达式匹配读取多张表的数据。 说明 使用正则表达式时,不要使用^和$符号匹配开头和结尾。 |
server-time-zone | 数据库在使用的会话时区。 | STRING | 否 | 作业运行环境可用区时区。 | 指定 IANA 时区标识符,例如 Asia/Shanghai。该参数控制了源表中的TIMESTAMP类型如何转换为STRING类型。 |
scan.incremental.snapshot.chunk.size | 增量快照分块读取时,每个chunk的大小(包含的行数)。 | INTEGER | 否 | 8096 | PolarDB-X 将表切分为多个分片(Chunk)进行读取,并在内存中缓存分片数据。减少单分片行数会增加分片总量。这虽然细化了故障恢复粒度,但也增加了内存溢出(OOM)风险并降低吞吐量。请配置合理的分片大小以平衡性能。 |
scan.snapshot.fetch.size | 当读取表的全量数据时,每次最多拉取的记录数。 | INTEGER | 否 | 1024 | 无。 |
connect.timeout | 连接PolarDB-X数据库服务器超时时,重试连接之前等待超时的最长时间。 | DURATION | 否 | 30s | 无。 |
connection.pool.size | 数据库连接池大小。 | INTEGER | 否 | 20 | 数据库连接池用于复用连接,可以降低数据库连接数量。 |
connect.max-retries | 连接PolarDB-X数据库时,连接失败后重试的最大次数。 | INTEGER | 否 | 3 | 无。 |
scan.startup.mode | 消费数据时的启动模式。 | STRING | 否 | initial | 参数取值如下:
重要 对于earliest-offset,specific-offset和timestamp启动模式,启动时的表结构必须与指定位点一致。结构不匹配将导致作业报错。请确保在指定 Binlog 位点至作业启动期间,表结构未发生变更。 |
scan.startup.specific-offset.file | 使用指定位点模式启动时,启动位点的Binlog文件名。 | STRING | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如 |
scan.startup.specific-offset.pos | 使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。 | INTEGER | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
scan.startup.specific-offset.gtid-set | 使用指定位点模式启动时,启动位点的GTID集合。 | STRING | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如 |
scan.startup.timestamp-millis | 使用指定时间模式启动时,启动位点的毫秒时间戳。 | LONG | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。 |
scan.startup.specific-offset.skip-events | 从指定的位点读取时,跳过多少Binlog事件。 | INTEGER | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
scan.startup.specific-offset.skip-rows | 从指定的位点读取时,跳过多少行变更(一个Binlog事件可能对应多行变更)。 | INTEGER | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
heartbeat.interval | Source通过心跳事件推动Binlog位点前进的时间间隔。 | DURATION | 否 | 无 | 心跳事件强制推进 Source 端的 Binlog 位点。此机制防止低频更新导致 Binlog 过期。Binlog 过期将引发作业失败,且仅能通过无状态重启恢复。 |
chunk-meta.group.size | chunk元信息的大小。 | INTEGER | 否 | 1000 | 如果元信息大于该值,元信息会分为多份传递。 |
chunk-key.even-distribution.factor.upper-bound | 是否可以均匀分片的chunk分布因子的上限。 | DOUBLE | 否 | 1000.0 | 分布因子大于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 |
chunk-key.even-distribution.factor.lower-bound | 是否可以均匀分片的chunk分布因子的下限。 | DOUBLE | 否 | 0.05 | 分布因子小于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 |
scan.newly-added-table.enabled | 从Checkpoint重启时,是否扫描新增的捕获表。 | BOOLEAN | 否 | false | 启用后,系统将同步此前未匹配的新增表,并从状态中移除不再匹配的表。从Checkpoint或Savepoint重启时生效。 |
scan.incremental.snapshot.chunk.key-column | 指定快照阶段用于数据分片的列。 | STRING | 见备注 | 无 |
|
scan.incremental.close-idle-reader.enabled | 是否在快照结束后关闭空闲的 Reader。 | BOOLEAN | 否 | false | 该配置生效需要同时设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。 |
scan.incremental.snapshot.backfill.skip | 是否在快照读取阶段跳过backfill。 | BOOLEAN | 否 | false | 参数取值如下:
如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。 重要 跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。 |
scan.parse.online.schema.changes.enabled | 在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。 | BOOLEAN | 否 | false | 参数取值如下:
实验性功能。建议在执行线上无锁变更前,先对Flink作业创建一个Savepoint以便恢复。 |
scan.only.deserialize.captured.tables.changelog.enabled | 在增量阶段,是否仅对指定表的变更事件进行反序列化。 | BOOLEAN | 否 | true | 参数取值如下:
|
scan.read-changelog-as-append-only.enabled | 是否将changelog数据流转换为append-only数据流。 | BOOLEAN | 否 | false | 参数取值如下:
|
scan.parallel-deserialize-changelog.enabled | 在增量阶段,是否使用多线程对变更事件进行解析。 | BOOLEAN | 否 | false | 参数取值如下:
|
scan.parallel-deserialize-changelog.handler.size | 多线程对变更事件进行解析时,事件处理器的数量。 | INTEGER | 否 | 2 | 无。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 快照读取阶段是否先分发无界的分片。 | BOOLEAN | 否 | false | 参数取值如下:
实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。 |
polardbx.binlog.ignore.archive-events.enabled | 是否忽略 PolarDB-X Binlog 中的归档事件(主要是 `DELETE` 事件)。 | BOOLEAN | 否 | false | |
polardbx.binlog.ignore.query-events.enabled | 是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。 | BOOLEAN | 否 | false | |
polardbx.binlog.include.tables | 仅订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。 | STRING | 否 | 无 | |
polardbx.binlog.exclude.tables | 不订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。 | STRING | 否 | 无 |
类型映射
PolarDB-X字段类型 | Flink字段类型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIME ZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | |
CHAR(n) | STRING |
VARCHAR(n) | |
TEXT | |
BINARY | BYTES |
VARBINARY | |
BLOB |
数据摄入
自实时计算引擎11.6版本开始,PolarDB-X连接器作为数据源可以在数据摄入YAML作业中使用。
语法结构
source:
type: polardbx
name: PolarDB-X Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: pdb.order_table
# 忽略 binlog 中的归档事件
polardbx.binlog.ignore.archive-events.enabled: true
# 忽略 binlog 中的查询事件
polardbx.binlog.ignore.query-events.enabled: true
# 只订阅 pdb.order_table 的 binlog 以减小带宽压力
polardbx.binlog.include.tables: pdb.order_table
sink:
type: values配置项
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
type | 数据源类型。 | 是 | STRING | 无 | 固定值为polardbx。 |
name | 数据源名称。 | 否 | STRING | 无 | 无。 |
hostname | PolarDB-X数据库的IP地址或者Hostname。 | 是 | STRING | 无 | 建议填写专有网络VPC地址。 说明 如果PolarDB-X与实时Flink版不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见空间管理与操作和Flink全托管集群如何访问公网?。 |
username | PolarDB-X数据库服务的用户名。 | 是 | STRING | 无 | 无。 |
password | PolarDB-X数据库服务的密码。 | 是 | STRING | 无 | 无。 |
tables | 需要同步的PolarDB-X数据表。 | 是 | STRING | 无 |
说明
|
tables.exclude | 需要在同步的表中排除的表。 | 否 | STRING | 无 |
说明 点号用于分割数据库名和表名,如果需要用点号匹配任意字符,需要对点号使用反斜杠进行转译。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 |
port | PolarDB-X数据库服务的端口号。 | 否 | INTEGER | 3306 | 无。 |
schema-change.enabled | 是否发送Schame变更事件。 | 否 | BOOLEAN | true | 无。 |
jdbc.properties.* | JDBC URL中的自定义连接参数。 | 否 | STRING | 无 | 您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'。 |
debezium.* | Debezium读取Binlog的自定义参数。 | 否 | STRING | 无 | 您可以传递自定义的Debezium参数,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'来指定解析错误时的处理逻辑。 |
scan.incremental.snapshot.chunk.size | 每个chunk的大小(包含的行数)。 | 否 | INTEGER | 8096 | PolarDB-X表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中。 每个chunk包含的行数越少,则表中的chunk的总数量越大,尽管这会降低故障恢复的粒度,但可能导致内存OOM和整体的吞吐量降低。因此,您需要进行权衡,并设置合理的chunk大小。 |
scan.snapshot.fetch.size | 当读取表的全量数据时,每次最多拉取的记录数。 | 否 | INTEGER | 1024 | 无。 |
scan.startup.mode | 消费数据时的启动模式。 | 否 | STRING | initial | 参数取值如下:
重要 对于earliest-offset,specific-offset和timestamp启动模式,如果启动时刻和指定的启动位点时刻的表结构不同,作业会因为表结构不同而报错。换一句话说,使用这三种启动模式,需要保证在指定的Binlog消费位置到作业启动的时间之间,对应表不能发生表结构变更。 |
scan.startup.specific-offset.file | 使用指定位点模式启动时,启动位点的Binlog文件名。 | 否 | STRING | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如 |
scan.startup.specific-offset.pos | 使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。 | 否 | INTEGER | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
scan.startup.specific-offset.gtid-set | 使用指定位点模式启动时,启动位点的GTID集合。 | 否 | STRING | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如 |
scan.startup.timestamp-millis | 使用指定时间模式启动时,启动位点的毫秒时间戳。 | 否 | LONG | 无 | 使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。 重要 在使用指定时间时,PolarDB-X CDC会尝试读取每个Binlog文件的初始事件以确定其时间戳,最终定位至指定时间对应的Binlog文件。请保证指定的时间戳对应的Binlog文件在数据库中没有被清理且可以被读取到。 |
server-time-zone | 数据库在使用的会话时区。 | 否 | STRING | 如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。 | 例如Asia/Shanghai,该参数控制了PolarDB-X中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。 |
scan.startup.specific-offset.skip-events | 从指定的位点读取时,跳过多少Binlog事件。 | 否 | INTEGER | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
scan.startup.specific-offset.skip-rows | 从指定的位点读取时,跳过多少行变更(一个Binlog事件可能对应多行变更)。 | 否 | INTEGER | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
connect.timeout | 连接PolarDB-X数据库服务器超时时,重试连接之前等待超时的最长时间。 | 否 | DURATION | 30s | 无。 |
connect.max-retries | 连接PolarDB-X数据库服务时,连接失败后重试的最大次数。 | 否 | INTEGER | 3 | 无。 |
connection.pool.size | 数据库连接池大小。 | 否 | INTEGER | 20 | 数据库连接池用于复用连接,可以降低数据库连接数量。 |
heartbeat.interval | Source通过心跳事件推动Binlog位点前进的时间间隔。 | 否 | DURATION | 30s | 心跳事件用于推动Source中的Binlog位点前进,这对PolarDB-X中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。 |
scan.incremental.snapshot.chunk.key-column | 可以指定某一列作为快照阶段切分分片的切分列。 | 否。 | STRING | 无 | 仅支持从主键中选择一列。 |
chunk-meta.group.size | chunk元信息的大小。 | 否 | INTEGER | 1000 | 如果元信息大于该值,元信息会分为多份传递。 |
chunk-key.even-distribution.factor.lower-bound | 是否可以均匀分片的chunk分布因子的下限。 | 否 | DOUBLE | 0.05 | 分布因子小于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 |
chunk-key.even-distribution.factor.upper-bound | 是否可以均匀分片的chunk分布因子的上限。 | 否 | DOUBLE | 1000.0 | 分布因子大于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 |
scan.incremental.close-idle-reader.enabled | 是否在快照结束后关闭空闲的Reader。 | 否 | BOOLEAN | false | 该配置生效,需要设置 |
scan.only.deserialize.captured.tables.changelog.enabled | 在增量阶段,是否仅对指定表的变更事件进行反序列化。 | 否 | BOOLEAN | true | 参数取值如下:
|
scan.parallel-deserialize-changelog.enabled | 在增量阶段,是否使用多线程对变更事件进行解析。 | 否 | BOOLEAN | false | 参数取值如下:
|
scan.parallel-deserialize-changelog.handler.size | 多线程对变更事件进行解析时,事件处理器的数量。 | 否 | INTEGER | 2 | 无。 |
metadata-column.include-list | 需要传给下游的元数据列。 | 否 | STRING | 无 | 可用的元数据包括 说明 PolarDB-X CDC YAML连接器无需也不支持添加库名表名和 重要
|
scan.newly-added-table.enabled | 从Checkpoint重启时,是否同步上一次启动时未匹配到的新增表或者移除状态中保存的当前不匹配的表。 | 否 | BOOLEAN | false | 从Checkpoint或Savepoint重启时生效。 |
scan.binlog.newly-added-table.enabled | 在增量阶段,是否发送匹配到的新增表的数据。 | 否 | BOOLEAN | false | 不能与 |
scan.incremental.snapshot.chunk.key-column | 为某些表指定一列作为快照阶段切分分片的切分列。 | 否 | STRING | 无 |
|
scan.parse.online.schema.changes.enabled | 在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。 | 否 | BOOLEAN | false | 参数取值如下:
实验性功能。建议在执行线上无锁变更前,先对Flink作业执行一次快照以便恢复。 |
scan.incremental.snapshot.backfill.skip | 是否在快照读取阶段跳过backfill。 | 否 | BOOLEAN | false | 参数取值如下:
如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。 重要 跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。 |
treat-tinyint1-as-boolean.enabled | 是否将TINYINT(1)类型当做Boolean类型处理。 | 否 | BOOLEAN | true | 参数取值如下:
|
treat-timestamp-as-datetime-enabled | 是否将TIMESTAMP类型当作DATETIME类型处理。 | 否 | BOOLEAN | false | 参数取值如下:
PolarDB-X TIMESTAMP类型存储的是UTC时间,受时区影响,PolarDB-X DATETIME类型存储的是字面时间,不受时区影响。 开启后会根据server-time-zone将PolarDB-X TIMESTAMP类型数据转换成DATETIME类型。 |
include-comments.enabled | 是否同步表注释和字段注释。 | 否 | BOOELEAN | false | 参数取值如下:
开启后会增加作业内存使用量。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 快照读取阶段是否先分发无界的分片。 | 否 | BOOELEAN | false | 参数取值如下:
实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。 |
binlog.session.network.timeout | Binlog连接的网络超时时间。 | 否 | DURATION | 10m | 设值为0s时,将会使用服务端的默认超时时间。 |
scan.rate-limit.records-per-second | 限制Source每秒下发的最大记录数。 | 否 | LONG | 无 | 适用于需要限制数据读取场景,此限制在全量和增量阶段都会生效。 Source的 在全量读取阶段,通常需要降低每个批次读取数据的条数进行配合,可以减少 |
include-binlog-meta.enable | 是否在消息中携带PolarDB-X Binlog的原始信息,如GTID,Binlog位点等 | 否 | Boolean | false | 适用于原始binlog同步场景,比如替换原有canal同步链路。 |
polardbx.binlog.ignore.archive-events.enabled | 是否忽略 PolarDB-X Binlog 中的归档事件(主要是 `DELETE` 事件)。 | BOOLEAN | 否 | false | |
polardbx.binlog.ignore.query-events.enabled | 是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。 | BOOLEAN | 否 | false | |
polardbx.binlog.include.tables | 仅订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。 | STRING | 否 | 无 | 说明 此参数只在消费PolarDB-X Binlog增量阶段有效,不影响快照阶段的全量读取。 |
polardbx.binlog.exclude.tables | 不订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。 | STRING | 否 | 无 | 说明 此参数只在消费PolarDB-X Binlog增量阶段有效,不影响快照阶段的全量读取。 |