首页 云数据库 OceanBase 数据传输 数据同步 同步 OceanBase 数据库的数据至 RocketMQ

同步 OceanBase 数据库的数据至 RocketMQ

消息队列 RocketMQ 是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可靠的分布式消息中间件。数据传输的数据同步功能可以帮助您实现 OceanBase 数据库的物理表和 RocketMQ 数据源之间的数据实时同步,扩展消息处理能力。

同步 OceanBase 数据库的数据至 RocketMQ 时,两种租户对应的数据格式请参见 数据库传输到文本协议的格式说明

前提条件

使用限制

  • 目前仅华东 1(杭州)地域支持同步 OceanBase 数据库的数据至 RocketMQ。

  • 数据传输支持 4.x 和 5.x 版本的 RocketMQ 实例,包含商业版和社区版。

  • 数据同步的对象仅支持物理表,不支持其它对象。

  • 数据同步过程中,数据传输支持删除表之后再新建表,即支持对已经同步的表进行 DROP TABLE 操作后,再执行 CREATE TABLE。数据传输不支持通过重命名的方式新建表,即不支持执行 RENAME TABLE a TO a_tmp 操作。

  • 当项目意外中断进行断点续传时,RocketMQ 实例中可能会存在部分重复数据(最近一分钟内),因此下游系统需要具备排重能力。

  • 待同步的表名和其中的列名不能包含中文字符。

支持的源端和目标端实例类型

源端

目标端

OceanBase 数据库 (OceanBase 实例)

说明

OceanBase 数据库包括 MySQL 和 Oracle 两种租户类型。

阿里云 RocketMQ 实例

OceanBase 数据库(OceanBase 实例)

VPC 内自建 RocketMQ 实例

OceanBase 数据库(OceanBase 实例)

公网 RocketMQ 实例

操作步骤

  1. 新建同步项目。

    1. 登录 OceanBase 管理控制台

    2. 在左侧导航栏,单击 数据同步

    3. 数据同步 页面,单击右上角的 新建同步项目

  2. 选择源和目标 页面,配置各项参数。

    参数

    描述

    同步项目名称

    建议使用中文、数字和字母的组合。名称中不能包含空格,长度不得超过 64 个字符。

    源端

    如果您已新建 OceanBase 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 OceanBase 数据源

    重要

    源端不支持 OceanBase 数据库 4.0 版本。

    目标端

    如果您已新建 RocketMQ 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 RocketMQ 数据源

    传输实例

    从下拉列表中选择已经购买的 传输实例。如果您未购买,请单击右侧的 购买 进行操作。详情请参见 购买传输实例

  3. 单击 下一步,在 选择同步类型 页面,选择当前数据同步项目的同步类型。

    同步类型支持 全量同步增量同步增量同步 仅支持 同步 DML(包括 InsertDeleteUpdate)。

  4. 单击 下一步,在 选择同步对象 页面,选择同步范围。

    同步 OceanBase 数据库的数据至 RocketMQ 时,支持多表到多 Topic 的同步。

    1. 在选择区域左侧选中需要同步的对象。

    2. 单击 >

    3. 将对象映射至 Topic 对话框的 已有 Topic 下拉列表中,搜索并选中需要同步的 Topic。

      您也可以输入已有 Topic 后选中显示的 Topic 名称。

    4. 单击 确定

      数据传输支持通过文本导入对象,并支持对目标端对象进行更改 Topic、设置行过滤、移除单个对象或全部对象等操作。目标端对象的结构为 Topic>DataBase>Table。

      操作

      步骤

      导入对象

      1. 在选择区域的右侧列表中,单击右上角的 导入对象

      2. 在对话框中,单击 确定

        重要

        导入会覆盖之前的操作选择,请谨慎操作。

      3. 导入同步对象 对话框中,导入需要同步的对象。 您可以通过导入 CSV 文件的方式进行设置行过滤条件、设置过滤列和设置分片列等操作。详情请参见 下载和导入同步对象配置

      4. 单击 检验合法性

      5. 通过合法性的检验后,单击 确定

      更改 Topic

      1. 在选择区域的右侧列表中,鼠标悬停至目标对象。

      2. 单击显示的 更改 Topic

      3. 将对象映射至 Topic 的对话框中,更改需要同步的 Topic。

      4. 单击 确定

        重要

        确定后会将所选 Topic 及表合并至选中 Topic,请谨慎操作。

      设置

      数据传输支持 WHERE 条件实现行过滤,选择分片列和需要同步的列。

      设置 对话框中,您可以进行以下操作。

      • 行过滤条件 区域的文本框中,输入标准的 SQL 语句中的 WHERE 子句,来配置行过滤。其作用范围为 全量同步 + 增量同步

        只有满足 WHERE 条件的数据才会被同步至目标数据源,以实现数据的行过滤。如果语句中包含 SQL 保留关键字,请添加转义符(`)。

      • 分片列 下拉列表中,选择目标分片列。您可以选择多个字段作为分片列,该参数为选填。

        选择分片列时,如果没有特殊情况,默认选择主键即可。如果存在主键负载不均衡的情况,请选择唯一性标识且负载相对均衡的字段作为分片列。

        请确保分片列的正确性。分片列填写错误,会导致数据同步项目失败。分片列的主要作用如下:

        • 负载均衡:在目标端可以进行并发写入的情况下,通过分片列区分发送消息需要使用的特定线程。

        • 有序性:由于存在并发写入可能导致的无序问题,数据传输确保在分片列的值相同的情况下,用户接收到的消息是有序的。此处的有序是指变更顺序(DML 对于一列的执行顺序)。

      • 选择需要同步的列。

        选择列 区域,选择需要同步的列。全选或不勾选任何列,数据传输会同步全部列。

      移除/全部移除

      数据传输支持在数据映射时,对暂时选中到目标端的单个或多个对象进行移除操作。

      • 移除单个同步对象

        在选择区域的右侧列表中,鼠标悬停至目标对象,单击显示的 移除,即可移除该同步对象。

      • 移除全部同步对象

        在选择区域的右侧列表中,单击右上角的 全部移除。在对话框中,单击 确定,即可移除全部同步对象。

  5. 单击 下一步,在 同步选项 页面,配置各项参数。

    参数

    描述

    增量同步起始位点

    • 如果设置同步类型和配置时已勾选 全量同步,此处默认为项目启动时间,不支持修改。

    • 如果设置同步类型和配置时未勾选 全量同步,请在此处指定同步某个时间节点之后的数据,默认为当前系统时间。您可以选择时间节点,也可以直接输入时间戳。

      重要

      仅支持选择当前时间,或当前时间之前的时间点。

      该位点与当前归档日志的保留时间密切相关。如果无特殊要求,可以从当前位点开始启动。

    序列化方式

    控制数据同步至 Kafka 的消息格式,目前支持 DefaultCanalDataworks(支持 2.0 版本)、SharePlexDefaultExtendColumnType。详情请参见 序列化方式的格式说明

    开启事务内序号编排

    根据需求,设置是否开启事务内保持排序。如果开启,数据传输可以为一个事务发送至下游进行顺序标识。

    重要

    该参数仅对 SharePlex 格式生效,目的是为了确保您能够获取构成交易的 DML 语句的序号。

    例如,在同一交易内包含 10 条 DML 语句(顺序为 1,2,3…10),则数据传输会按照 1,2,3 ...10 的顺序投递至目标端。

    分区规则

    同步源端数据至 RocketMQ 的规则,目前仅支持 HashHash 表示数据传输使用一定的 Hash 算法,根据主键值或分片列值 Hash 选择 RocketMQ 的队列(MessageQueue)。

    业务系统标识(可选)

    用于标识数据的业务系统来源,以便您后续进行自定义处理。该业务系统标识的长度限制为 1~20 个字符。

    请输入生产者群组

    生产者群组用于标识一组生产者,能够向多个 Topic 中写入数据。

    是否允许消息追踪

    如果允许消息追踪,则可以追踪到一条消息从生产者发送到消息队列 RocketMQ 版服务端,再到消费者消费处理,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整项目信息。该消息轨迹可以作为生产环境中排查问题强有力的数据支持。

  6. 单击 预检查

    预检查 环节,数据传输会检测和目标端的连接情况。如果预检查报错:

    • 您可以排查并处理问题后,重新执行预检查,直至预检查成功。

    • 您也可以单击错误预检查项操作列中的 跳过,会弹出对话框提示您跳过本操作的具体影响,确认可以跳过后,请单击对话框中的 确定

  7. 单击 启动项目。如果您暂时无需启动项目,请单击 保存,跳转至数据同步项目的详情页面,您需要根据需要进行手动启动。

    数据传输支持在数据同步项目运行过程中修改同步对象,详情请参见 查看和修改同步对象。数据同步项目启动后,会根据选择的同步类型依次执行,详情请参见 查看同步详情

如果中途接入数据的过程出错(通常由于网络不通或进程启动过慢导致),您可以进入项目的详情页面,单击右上角的 恢复(此处为了延时,使用暂停再恢复)。

阿里云首页 云数据库 OceanBase 版 相关技术圈