同步 OceanBase 数据库的数据至 Kafka
Kafka 是目前广泛应用的高性能分布式流计算平台,数据传输支持 OceanBase 两种租户与自建 Kafka 数据源之间的数据实时同步,扩展消息处理能力,广泛应用于实时数据仓库搭建、数据查询和报表分流等业务场景。
前提条件
已购买传输实例。详情请参见 购买传输实例。
已为源端 OceanBase 数据库创建专用于数据同步项目的数据库用户,并为其赋予了相关权限。详情请参见 创建数据库用户。
已新建源端和目标端的数据源。详情请参见 新建 OceanBase 数据源 和 新建 Kafka 数据源。
使用限制
数据同步的对象仅支持物理表,不支持其它对象。
数据传输支持 Kafka 0.9、1.0、2.x 和 3.2.1 版本。
重要当 Kafka 版本为 0.9 时,不支持结构同步。
数据同步过程中,如果您在源端修改了同步范围内的表名称,且重命名后的名称不在同步对象中,则该部分数据将不被同步至目标 Kafka 实例中。
待同步的表名和其中的列名不能包含中文字符。
支持的源端和目标端实例类型
源端 | 目标端 |
OceanBase 数据库 (OceanBase 实例) 说明 OceanBase 数据库包括 MySQL 和 Oracle 两种租户类型。 | Kafka(阿里云 Kafka 实例) |
OceanBase 数据库(OceanBase 实例) | Kafka(VPC 内自建 Kafka 实例) |
OceanBase 数据库(OceanBase 实例) | Kafka(公网 Kafka 实例) |
同步 DDL 支持的范围
创建表
CREATE TABLE
重要创建的表需要在同步对象范围之内。目前仅支持对已经同步的表进行
DROP TABLE
操作后,再执行CREATE TABLE
。修改表
ALTER TABLE
删除表
DROP TABLE
清空表
TRUNCATE TABLE
说明延迟删除场景下,同一个事务中会有两条一样的
TRUNCATE TABLE
DDL。此时,下游消费需要按幂等方式处理。从指定的分区中删除数据
ALTER TABLE…TRUNCATE PARTITION
创建索引
CREATE INDEX
删除索引
DROP INDEX
添加表的备注
COMMENT ON TABLE
表重命名
RENAME TABLE
重要重命名后的表名需要在同步对象范围之内。
操作步骤
新建数据同步项目。
登录 OceanBase 管理控制台。
在左侧导航栏,单击 数据传输 > 数据同步。
在 数据同步 页面,单击右上角的 新建同步项目。
在 选择源和目标 页面,配置各项参数。
参数
描述
同步项目名称
建议使用中文、数字和字母的组合。名称中不能包含空格,长度不得超过 64 个字符。
源端
如果您已新建 OceanBase 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 OceanBase 数据源。
重要源端不支持 OceanBase 数据库 4.0.0 版本。
目标端
如果您已新建 Kafka 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 Kafka 数据源。
传输实例
从下拉列表中选择已经购买的 传输实例。如果您未购买,请单击右侧的 购买 进行操作。详情请参见 购买传输实例。
单击 下一步。在 选择同步类型 页面,选择当前数据同步项目的同步类型。
同步类型包括 结构同步、全量同步 和 增量同步。增量同步 支持 同步 DML(包括
Insert
、Delete
和Update
)和 同步 DDL。单击 下一步。在 选择同步对象 页面,选择需要同步的对象。
同步 OceanBase 数据库的数据至 Kafka 时,支持多表到多 Topic 的同步。
在选择区域左侧选中需要同步的对象。
单击 >。
在 将对象映射至 Topic 对话框中,选择需要的映射方式进行配置。
如果设置同步类型和配置时未勾选 结构同步,则仅支持选择 已有 Topic。如果设置同步类型和配置时已选择 结构同步,则仅支持选择一种映射方式进行 Topic 的创建或选择。
例如,已选择结构同步的情况下,您使用了新建 Topic 和选择已有 Topic 两种映射方式,或通过重命名的方式更改了 Topic 的名称,会因为选项冲突导致预检查报错。
参数
描述
新建 Topic
在文本框中输入新建 Topic 的名称。支持 3~64 位字符, 且只能包含英文、数字、短横线(-)、下划线(_)和英文句号(.)。
选择 Topic
数据传输提供查询 Kafka Topic 的能力,您可以单击 选择 Topic,在 已有 Topic 下拉列表中,搜索并选中需要同步的 Topic。
批量生成 Topic
批量生成 Topic 的规则为
Topic_${Database Name}_${Table Name}
。如果您选择 新建 Topic 或 批量生成 Topic,结构同步成功后,在 Kafka 侧能够查询到新建的 Topic。其分区数量默认为 3 个,分区副本数量默认为 1 个,且不支持修改。
单击 确定。
选择同步对象后,数据传输支持对目标端对象进行更改 Topic、设置行过滤、移除单个对象或全部对象等操作。目标端对象的结构为 Topic>DataBase>Table。
操作
步骤
导入对象
在选择区域的右侧列表中,单击右上角的 导入对象。
在对话框中,单击 确定。
重要导入会覆盖之前的操作选择,请谨慎操作。
在 导入同步对象 对话框中,导入需要同步的对象。 您可以通过导入 CSV 文件的方式进行设置行过滤条件、设置过滤列和设置分片列等操作。详情请参见 下载和导入同步对象配置。
单击 检验合法性。
通过合法性的检验后,单击 确定。
更改 Topic
在选择区域的右侧列表中,鼠标悬停至目标对象。
单击显示的 更改 Topic。
在 将对象映射至 Topic 的对话框中,更改需要同步的 Topic。
单击 确定。
重要确定后会将所选 Topic 及表合并至选中 Topic,请谨慎操作。
设置
数据传输支持
WHERE
条件实现行过滤,选择分片列和需要同步的列。在 设置 对话框中,您可以进行以下操作。
在 行过滤条件 区域的文本框中,输入标准的 SQL 语句中的
WHERE
子句,来配置行过滤。其作用范围为 全量同步 + 增量同步。只有满足
WHERE
条件的数据才会被同步至目标数据源,以实现数据的行过滤。如果语句中包含 SQL 保留关键字,请添加转义符(`)。在 分片列 下拉列表中,选择目标分片列。您可以选择多个字段作为分片列,该参数为选填。
选择分片列时,如果没有特殊情况,默认选择主键即可。如果存在主键负载不均衡的情况,请选择唯一性标识且负载相对均衡的字段作为分片列,避免潜在的性能问题。分片列的主要作用如下:
负载均衡:在目标端可以进行并发写入的情况下,通过分片列区分发送消息需要使用的特定线程。
有序性:由于存在并发写入可能导致的无序问题,数据传输确保在分片列的值相同的情况下,用户接收到的消息是有序的。此处的有序是指变更顺序(DML 对于一列的执行顺序)。
选择需要同步的列。
在 选择列 区域,选择需要同步的列。全选或不勾选任何列,数据传输会同步全部列。
移除/全部移除
数据传输支持在数据映射时,对暂时选中到目标端的单个或多个对象进行移除操作。
移除单个同步对象
在选择区域的右侧列表中,鼠标悬停至目标对象,单击显示的 移除,即可移除该同步对象。
移除全部同步对象
在选择区域的右侧列表中,单击右上角的 全部移除。在对话框中,单击 确定,即可移除全部同步对象。
单击 下一步。在 同步选项 页面,配置各项参数。
分类
参数
描述
同步设置
增量同步起始位点
如果选择同步类型时已选择 全量同步,此处默认为项目启动时间,不支持修改。
如果选择同步类型时未选择 全量同步,请在此处指定同步某个时间节点之后的数据,默认为当前系统时间。您可以选择时间节点,也可以直接输入时间戳。
重要仅支持选择当前时间,或当前时间之前的时间点。
该位点与当前归档日志的保留时间密切相关。如果无特殊要求,可以从当前位点开始启动。
高级选项
序列化方式
控制数据同步至 Kafka 的消息格式,目前支持 Default、Canal、Dataworks(支持 2.0 版本)、SharePlex、 DefaultExtendColumnType 和 Debezium。详情请参见 消息格式说明。
重要目前仅 OceanBase 数据库 MySQL 租户支持 Debezium。
当选择 Dataworks 时,同步 DDL 不支持
COMMENT ON TABLE
和ALTER TABLE…TRUNCATE PARTITION
。
开启事务内序号编排
根据需求,设置是否开启事务内保持排序。如果开启,数据传输可以为一个事务发送至下游进行顺序标识。
重要该参数仅对 SharePlex 格式生效,目的是为了确保您能够获取构成交易的 DML 语句的序号。
例如,在同一交易内包含 10 条 DML 语句(顺序为 1,2,3…10),则数据传输会按照 1,2,3 ...10 的顺序投递至目标端。
分区规则
同步 OceanBase 数据库的数据至 Kafka Topic 的规则,目前支持 Hash、Table 和 One。
Hash 表示数据传输使用一定的 Hash 算法,根据主键值或分片列值 Hash 选择 Kafka Topic 的分区。
Table 表示数据传输将一张表中的全部数据投递至同一个分区中,以表名作为 Hash 键。
One 表示 JSON 消息会投递至 Topic 下的某个分区,目的是为了保持排序。
业务系统标识(可选)
用于标识数据的业务系统来源,以便您后续进行自定义处理。该业务系统标识的长度限制为 1~20 个字符。
单击 预检查。
在 预检查 环节,数据传输会检测和目标端 Kafka 实例的连接情况。如果预检查报错:
您可以排查并处理问题后,重新执行预检查,直至预检查成功。
您也可以单击错误预检查项操作列中的 跳过,会弹出对话框提示您跳过本操作的具体影响,确认可以跳过后,请单击对话框中的 确定。
预检查成功后,单击 启动项目。如果您暂时无需启动项目,请单击 保存,后续在 同步项目列表 页面手动启动项目。
数据传输支持在数据同步项目运行过程中修改同步对象,详情请参见 查看和修改同步对象。数据同步项目启动后,会根据选择的同步类型依次执行,详情请参见 查看同步详情。
如果中途接入数据的过程出错(通常由于网络不通或进程启动过慢导致),您可以进入项目的详情页面,单击右上角的 恢复(此处为了延时,使用暂停再恢复)。