PolarDB-X CDC(公测中)

更新时间:
复制为 MD 格式

本文为您介绍如何使用PolarDB-X连接器。

背景信息

PolarDB 分布式版(PolarDB for Xscale,简称“PolarDB-X”)是阿里云自主设计研发的高性能云原生分布式数据库产品,为用户提供高吞吐、大存储、低延时、易扩展和超高可用的云时代数据库服务。

重要

仅支持VVR 11.5及以上版本,配合PolarDB-X 2.0及更高版本使用。

目前,PolarDB-X CDC连接器只支持作为源表使用。如您需要对PolarDB-X实例进行维表查询、或作为结果表写入,请使用MySQL连接器(公测中)。

类别

详情

支持类型

源表

运行模式

仅支持流模式

数据格式

暂不适用

特有监控指标

  • currentFetchEventTimeLag:数据产生到拉取到Source Operator的间隔。

    该指标仅在Binlog阶段有效,Snapshot阶段该值恒为0。

  • currentEmitEventTimeLag:数据产生到离开Source Operator的间隔。

    该指标仅在Binlog阶段有效,Snapshot阶段该值恒为0。

  • sourceIdleTime:源表至今有多久未产生新数据。

API种类

SQL

是否支持更新或删除结果表数据

特色功能

PolarDB-X CDC连接器针对Binlog解析阶段进行性能优化,支持PolarDB-X服务器侧对无关Binlog进行过滤和裁剪,从而提升吞吐量、节省网络带宽。

Binlog按需订阅示例

此版本支持在服务端对Binlog进行过滤、只发送所需的变更日志给客户端,从而起到降低网络流量压力、提升日志消费吞吐的优化作用。

例如,如果您只需订阅PolarDB-X服务器中db.table1db.table2表的变更数据,可以像这样配置Flink SQL作业:

CREATE TABLE polardbx_table_foo (
  ... -- 在这里定义表结构
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- 其他参数
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 只订阅对应表的数据
);

相较于MySQL CDC连接器会将整个实例中全量的变更Binlog日志加载到本地、并在客户端进行过滤,PolarDB-X CDC连接器具备服务端过滤Binlog、客户端按需订阅Binlog的能力,能够大幅减少网络IO开销。

使用限制

如您需要使用服务端Binlog服务端过滤、按表订阅功能,需要保证PolarDB-X服务端版本为2.5.0及以上,且日志服务组件为5.4.20或更高版本。

SQL

语法结构

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

连接器名称。

STRING

固定值为polardbx-cdc。

hostname

PolarDB-X数据库的IP地址或者Hostname。

STRING

建议填写实例连接信息中的集群地址。

port

PolarDB-X数据库的服务端口号。

INTEGER

3306

无。

username

PolarDB-X数据库服务的用户名。

STRING

无。

password

PolarDB-X数据库服务的密码。

STRING

无。

database-name

PolarDB-X数据库名称。

STRING

支持使用正则表达式匹配读取多个数据库的数据。

说明

使用正则表达式时,不要使用^$符号匹配开头和结尾。

table-name

PolarDB-X表名。

STRING

支持使用正则表达式匹配读取多张表的数据。

说明

使用正则表达式时,不要使用^$符号匹配开头和结尾。

server-time-zone

数据库在使用的会话时区。

STRING

作业运行环境可用区时区。

指定 IANA 时区标识符,例如 Asia/Shanghai。该参数控制了源表中的TIMESTAMP类型如何转换为STRING类型。

scan.incremental.snapshot.chunk.size

增量快照分块读取时,每个chunk的大小(包含的行数)。

INTEGER

8096

PolarDB-X 将表切分为多个分片(Chunk)进行读取,并在内存中缓存分片数据。减少单分片行数会增加分片总量。这虽然细化了故障恢复粒度,但也增加了内存溢出(OOM)风险并降低吞吐量。请配置合理的分片大小以平衡性能。

scan.snapshot.fetch.size

当读取表的全量数据时,每次最多拉取的记录数。

INTEGER

1024

无。

connect.timeout

连接PolarDB-X数据库服务器超时时,重试连接之前等待超时的最长时间。

DURATION

30s

无。

connection.pool.size

数据库连接池大小。

INTEGER

20

数据库连接池用于复用连接,可以降低数据库连接数量。

connect.max-retries

连接PolarDB-X数据库时,连接失败后重试的最大次数。

INTEGER

3

无。

scan.startup.mode

消费数据时的启动模式。

STRING

initial

参数取值如下:

  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。

  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。

  • earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。

  • specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置scan.startup.specific-offset.filescan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动,也可以只配置scan.startup.specific-offset.gtid-set来指定从某个GTID集合启动。

  • timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。

重要

对于earliest-offsetspecific-offsettimestamp启动模式,启动时的表结构必须与指定位点一致。结构不匹配将导致作业报错。请确保在指定 Binlog 位点至作业启动期间,表结构未发生变更。

scan.startup.specific-offset.file

使用指定位点模式启动时,启动位点的Binlog文件名。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如mysql-bin.000003

scan.startup.specific-offset.pos

使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

scan.startup.specific-offset.gtid-set

使用指定位点模式启动时,启动位点的GTID集合。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

使用指定时间模式启动时,启动位点的毫秒时间戳。

LONG

使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。

scan.startup.specific-offset.skip-events

从指定的位点读取时,跳过多少Binlog事件。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

scan.startup.specific-offset.skip-rows

从指定的位点读取时,跳过多少行变更(一个Binlog事件可能对应多行变更)。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

heartbeat.interval

Source通过心跳事件推动Binlog位点前进的时间间隔。

DURATION

心跳事件强制推进 Source 端的 Binlog 位点。此机制防止低频更新导致 Binlog 过期。Binlog 过期将引发作业失败,且仅能通过无状态重启恢复。

chunk-meta.group.size

chunk元信息的大小。

INTEGER

1000

如果元信息大于该值,元信息会分为多份传递。

chunk-key.even-distribution.factor.upper-bound

是否可以均匀分片的chunk分布因子的上限。

DOUBLE

1000.0

分布因子大于该值会使用非均匀分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。

chunk-key.even-distribution.factor.lower-bound

是否可以均匀分片的chunk分布因子的下限。

DOUBLE

0.05

分布因子小于该值会使用非均匀分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。

scan.newly-added-table.enabled

Checkpoint重启时,是否扫描新增的捕获表。

BOOLEAN

false

启用后,系统将同步此前未匹配的新增表,并从状态中移除不再匹配的表。从CheckpointSavepoint重启时生效。

scan.incremental.snapshot.chunk.key-column

指定快照阶段用于数据分片的列。

STRING

见备注

  • 无主键表必填,选择的列必须是非空类型(NOT NULL)。

  • 有主键的表为选填,仅支持从主键中选择一列。

scan.incremental.close-idle-reader.enabled

是否在快照结束后关闭空闲的 Reader。

BOOLEAN

false

该配置生效需要同时设置execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue。

scan.incremental.snapshot.backfill.skip

是否在快照读取阶段跳过backfill。

BOOLEAN

false

参数取值如下:

  • true:快照读取阶段跳过backfill。

  • false(默认):快照读取阶段不跳过backfill。

如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。

重要

跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。

scan.parse.online.schema.changes.enabled

在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。

BOOLEAN

false

参数取值如下:

  • true:解析 RDS 无锁变更 DDL 事件。

  • false(默认):不解析 RDS 无锁变更 DDL 事件。

实验性功能。建议在执行线上无锁变更前,先对Flink作业创建一个Savepoint以便恢复。

scan.only.deserialize.captured.tables.changelog.enabled

在增量阶段,是否仅对指定表的变更事件进行反序列化。

BOOLEAN

true

参数取值如下:

  • true:仅对目标表的变更数据进行反序列化,加快Binlog读取速度。

  • false(默认):对所有表的变更数据进行反序列化。

scan.read-changelog-as-append-only.enabled

是否将changelog数据流转换为append-only数据流。

BOOLEAN

false

参数取值如下:

  • true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成INSERT类型的消息。仅在需要保存上游表删除消息等特殊场景下开启使用。

  • false(默认):所有类型的消息都保持原样下发。

scan.parallel-deserialize-changelog.enabled

在增量阶段,是否使用多线程对变更事件进行解析。

BOOLEAN

false

参数取值如下:

  • true:在变更事件的反序列化阶段采用多线程处理,同时保证Binlog事件顺序不变,从而加快读取速度。

  • false(默认):在事件的反序列化阶段使用单线程处理。

scan.parallel-deserialize-changelog.handler.size

多线程对变更事件进行解析时,事件处理器的数量。

INTEGER

2

无。

scan.incremental.snapshot.unbounded-chunk-first.enabled

快照读取阶段是否先分发无界的分片。

BOOLEAN

false

参数取值如下:

  • true:快照读取阶段优先分发无界的分片。

  • false(默认):快照读取阶段不优先分发无界的分片。

实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。

polardbx.binlog.ignore.archive-events.enabled

是否忽略 PolarDB-X Binlog 中的归档事件(主要是 `DELETE` 事件)。

BOOLEAN

false

polardbx.binlog.ignore.query-events.enabled

是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。

BOOLEAN

false

polardbx.binlog.include.tables

仅订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。

STRING

polardbx.binlog.exclude.tables

不订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。

STRING

类型映射

PolarDB-X字段类型

Flink字段类型

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB

数据摄入

自实时计算引擎11.6版本开始,PolarDB-X连接器作为数据源可以在数据摄入YAML作业中使用。

语法结构

source:
   type: polardbx
   name: PolarDB-X Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: pdb.order_table
   # 忽略 binlog 中的归档事件
   polardbx.binlog.ignore.archive-events.enabled: true
   # 忽略 binlog 中的查询事件
   polardbx.binlog.ignore.query-events.enabled: true
   # 只订阅 pdb.order_table 的 binlog 以减小带宽压力
   polardbx.binlog.include.tables: pdb.order_table

sink:
  type: values

配置项

参数

说明

是否必填

数据类型

默认值

备注

type

数据源类型。

STRING

固定值为polardbx。

name

数据源名称。

STRING

无。

hostname

PolarDB-X数据库的IP地址或者Hostname。

STRING

建议填写专有网络VPC地址。

说明

如果PolarDB-X与实时Flink版不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见空间管理与操作Flink全托管集群如何访问公网?

username

PolarDB-X数据库服务的用户名。

STRING

无。

password

PolarDB-X数据库服务的密码。

STRING

无。

tables

需要同步的PolarDB-X数据表。

STRING

  • 表名支持正则表达式以读取多个表的数据。

  • 可以用逗号分隔多个正则表达式。

说明
  • 正则表达式中请不要使用首尾匹配字符^$

  • 点号用于分割数据库名和表名,如果需要用点号匹配任意字符,需要对点号使用反斜杠进行转义。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

tables.exclude

需要在同步的表中排除的表。

STRING

  • 表名支持正则表达式以排除多个表的数据。

  • 可以用逗号分隔多个正则表达式。

说明

点号用于分割数据库名和表名,如果需要用点号匹配任意字符,需要对点号使用反斜杠进行转译。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

port

PolarDB-X数据库服务的端口号。

INTEGER

3306

无。

schema-change.enabled

是否发送Schame变更事件。

BOOLEAN

true

无。

jdbc.properties.*

JDBC URL中的自定义连接参数。

STRING

您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'

debezium.*

Debezium读取Binlog的自定义参数。

STRING

您可以传递自定义的Debezium参数,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'来指定解析错误时的处理逻辑。

scan.incremental.snapshot.chunk.size

每个chunk的大小(包含的行数)。

INTEGER

8096

PolarDB-X表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中。

每个chunk包含的行数越少,则表中的chunk的总数量越大,尽管这会降低故障恢复的粒度,但可能导致内存OOM和整体的吞吐量降低。因此,您需要进行权衡,并设置合理的chunk大小。

scan.snapshot.fetch.size

当读取表的全量数据时,每次最多拉取的记录数。

INTEGER

1024

无。

scan.startup.mode

消费数据时的启动模式。

STRING

initial

参数取值如下:

  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。

  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。

  • earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。

  • specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置scan.startup.specific-offset.filescan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动,也可以只配置scan.startup.specific-offset.gtid-set来指定从某个GTID集合启动。

  • timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。

重要

对于earliest-offsetspecific-offsettimestamp启动模式,如果启动时刻和指定的启动位点时刻的表结构不同,作业会因为表结构不同而报错。换一句话说,使用这三种启动模式,需要保证在指定的Binlog消费位置到作业启动的时间之间,对应表不能发生表结构变更。

scan.startup.specific-offset.file

使用指定位点模式启动时,启动位点的Binlog文件名。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如mysql-bin.000003

scan.startup.specific-offset.pos

使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

scan.startup.specific-offset.gtid-set

使用指定位点模式启动时,启动位点的GTID集合。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

使用指定时间模式启动时,启动位点的毫秒时间戳。

LONG

使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。

重要

在使用指定时间时,PolarDB-X CDC会尝试读取每个Binlog文件的初始事件以确定其时间戳,最终定位至指定时间对应的Binlog文件。请保证指定的时间戳对应的Binlog文件在数据库中没有被清理且可以被读取到。

server-time-zone

数据库在使用的会话时区。

STRING

如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。

例如Asia/Shanghai,该参数控制了PolarDB-X中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型

scan.startup.specific-offset.skip-events

从指定的位点读取时,跳过多少Binlog事件。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

scan.startup.specific-offset.skip-rows

从指定的位点读取时,跳过多少行变更(一个Binlog事件可能对应多行变更)。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

connect.timeout

连接PolarDB-X数据库服务器超时时,重试连接之前等待超时的最长时间。

DURATION

30s

无。

connect.max-retries

连接PolarDB-X数据库服务时,连接失败后重试的最大次数。

INTEGER

3

无。

connection.pool.size

数据库连接池大小。

INTEGER

20

数据库连接池用于复用连接,可以降低数据库连接数量。

heartbeat.interval

Source通过心跳事件推动Binlog位点前进的时间间隔。

DURATION

30s

心跳事件用于推动Source中的Binlog位点前进,这对PolarDB-X中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。

scan.incremental.snapshot.chunk.key-column

可以指定某一列作为快照阶段切分分片的切分列。

否。

STRING

仅支持从主键中选择一列。

chunk-meta.group.size

chunk元信息的大小。

INTEGER

1000

如果元信息大于该值,元信息会分为多份传递。

chunk-key.even-distribution.factor.lower-bound

是否可以均匀分片的chunk分布因子的下限。

DOUBLE

0.05

分布因子小于该值会使用非均匀分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。

chunk-key.even-distribution.factor.upper-bound

是否可以均匀分片的chunk分布因子的上限。

DOUBLE

1000.0

分布因子大于该值会使用非均匀分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。

scan.incremental.close-idle-reader.enabled

是否在快照结束后关闭空闲的Reader。

BOOLEAN

false

该配置生效,需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue。

scan.only.deserialize.captured.tables.changelog.enabled

在增量阶段,是否仅对指定表的变更事件进行反序列化。

BOOLEAN

true

参数取值如下:

  • true:仅对目标表的变更数据进行反序列化,加快Binlog读取速度。

  • false(默认):对所有表的变更数据进行反序列化。

scan.parallel-deserialize-changelog.enabled

在增量阶段,是否使用多线程对变更事件进行解析。

BOOLEAN

false

参数取值如下:

  • true:在变更事件的反序列化阶段采用多线程处理,同时保证Binlog事件顺序不变,从而加快读取速度。

  • false(默认):在事件的反序列化阶段使用单线程处理。

scan.parallel-deserialize-changelog.handler.size

多线程对变更事件进行解析时,事件处理器的数量。

INTEGER

2

无。

metadata-column.include-list

需要传给下游的元数据列。

STRING

可用的元数据包括op_tses_tsquery_logfilepos,您可以使用英文逗号分隔多个元数据列。

说明

PolarDB-X CDC YAML连接器无需也不支持添加库名表名和op_type元数据列。您可以直接在Transform表达式中使用__data_event_type__来获取变化数据类型,或在Transform表达式中使用__schema_name____table_name__来获取数据库名和表名。

重要

file元数据列代表该数据所在的binlog文件,全量阶段为"", 增量阶段为binlog文件名;pos元数据列代表数据所在的binlog文件中的偏移量,全量阶段为"0", 增量阶段为数据在binlog文件中的偏移量。

scan.newly-added-table.enabled

Checkpoint重启时,是否同步上一次启动时未匹配到的新增表或者移除状态中保存的当前不匹配的表。

BOOLEAN

false

CheckpointSavepoint重启时生效。

scan.binlog.newly-added-table.enabled

在增量阶段,是否发送匹配到的新增表的数据。

BOOLEAN

false

不能与scan.newly-added-table.enabled同时开启。

scan.incremental.snapshot.chunk.key-column

为某些表指定一列作为快照阶段切分分片的切分列。

STRING

  • 通过英文冒号:连接表名和字段名,表示一个指定规则,表名可以使用正则表达式。支持定义多个指定规则,不同指定规则通过英文分号;分割。例如:db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2

  • 对于无主键表必填,选择的列必须是非空类型(NOT NULL)。有主键的表为选填,仅支持从主键中选择一列。

scan.parse.online.schema.changes.enabled

在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。

BOOLEAN

false

参数取值如下:

  • true:解析 RDS 无锁变更 DDL 事件。

  • false(默认):不解析 RDS 无锁变更 DDL 事件。

实验性功能。建议在执行线上无锁变更前,先对Flink作业执行一次快照以便恢复。

scan.incremental.snapshot.backfill.skip

是否在快照读取阶段跳过backfill。

BOOLEAN

false

参数取值如下:

  • true:快照读取阶段跳过backfill。

  • false(默认):快照读取阶段不跳过backfill。

如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。

重要

跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。

treat-tinyint1-as-boolean.enabled

是否将TINYINT(1)类型当做Boolean类型处理。

BOOLEAN

true

参数取值如下:

  • true(默认):将TINYINT(1)类型当作Boolean类型处理。

  • false:不将TINYINT(1)类型当作Boolean类型处理。

treat-timestamp-as-datetime-enabled

是否将TIMESTAMP类型当作DATETIME类型处理。

BOOLEAN

false

参数取值如下:

  • true:将PolarDB-X TIMESTAMP类型当作DATETIME类型处理,映射到CDC TIMESTAMP类型。

  • false(默认):将PolarDB-X TIMESTAMP类型映射到CDC TIMESTAMP_LTZ类型。

PolarDB-X TIMESTAMP类型存储的是UTC时间,受时区影响,PolarDB-X DATETIME类型存储的是字面时间,不受时区影响。

开启后会根据server-time-zonePolarDB-X TIMESTAMP类型数据转换成DATETIME类型。

include-comments.enabled

是否同步表注释和字段注释。

BOOELEAN

false

参数取值如下:

  • true:同步表注释和字段注释。

  • false(默认):不同步表注释和字段注释。

开启后会增加作业内存使用量。

scan.incremental.snapshot.unbounded-chunk-first.enabled

快照读取阶段是否先分发无界的分片。

BOOELEAN

false

参数取值如下:

  • true:快照读取阶段优先分发无界的分片。

  • false(默认):快照读取阶段不优先分发无界的分片。

实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。

binlog.session.network.timeout

Binlog连接的网络超时时间。

DURATION

10m

设值为0s时,将会使用服务端的默认超时时间。

scan.rate-limit.records-per-second

限制Source每秒下发的最大记录数。

LONG

适用于需要限制数据读取场景,此限制在全量和增量阶段都会生效。

SourcenumRecordsOutPerSecond指标反映整个数据流每秒钟输出的记录数,可以根据这个指标对此参数进行调整。

在全量读取阶段,通常需要降低每个批次读取数据的条数进行配合,可以减少scan.incremental.snapshot.chunk.size参数值。

include-binlog-meta.enable

是否在消息中携带PolarDB-X Binlog的原始信息,如GTID,Binlog位点等

Boolean

false

适用于原始binlog同步场景,比如替换原有canal同步链路。

polardbx.binlog.ignore.archive-events.enabled

是否忽略 PolarDB-X Binlog 中的归档事件(主要是 `DELETE` 事件)。

BOOLEAN

false

polardbx.binlog.ignore.query-events.enabled

是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。

BOOLEAN

false

polardbx.binlog.include.tables

仅订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。

STRING

说明

此参数只在消费PolarDB-X Binlog增量阶段有效,不影响快照阶段的全量读取。

polardbx.binlog.exclude.tables

不订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。

STRING

说明

此参数只在消费PolarDB-X Binlog增量阶段有效,不影响快照阶段的全量读取。