首页 云数据库 OceanBase 数据传输 数据同步 同步 OceanBase 数据库的数据至 Kafka

同步 OceanBase 数据库的数据至 Kafka

Kafka 是目前广泛应用的高性能分布式流计算平台,数据传输支持 OceanBase 两种租户与自建 Kafka 数据源之间的数据实时同步,扩展消息处理能力,广泛应用于实时数据仓库搭建、数据查询和报表分流等业务场景。

前提条件

使用限制

  • 数据同步的对象仅支持物理表,不支持其它对象。

  • 支持的 Kafka 版本为 0.9、1.0 和 2.x。

  • 数据同步过程中,如果您在源端修改了同步范围内的表名称,且重命名后的名称不在同步对象中,则该部分数据将不被同步至目标 Kafka 实例中。

  • 待同步的表名和其中的列名不能包含中文字符。

支持的源端和目标端实例类型

源端

目标端

OceanBase 数据库 (OceanBase 实例)

说明

OceanBase 数据库包括 MySQL 和 Oracle 两种租户类型。

Kafka(阿里云 Kafka 实例)

OceanBase 数据库(OceanBase 实例)

Kafka(VPC 内自建 Kafka 实例)

OceanBase 数据库(OceanBase 实例)

Kafka(公网 Kafka 实例)

操作步骤

  1. 新建数据同步项目。

    1. 登录 OceanBase 管理控制台

    2. 在左侧导航栏,单击 数据传输 > 数据同步

    3. 数据同步 页面,单击右上角的 新建同步项目

  2. 选择源和目标 页面,配置各项参数。

    参数

    描述

    同步项目名称

    建议使用中文、数字和字母的组合。名称中不能包含空格,长度不得超过 64 个字符。

    源端

    如果您已新建 OceanBase 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 OceanBase 数据源

    重要

    源端不支持 OceanBase 数据库 4.0 版本。

    目标端

    如果您已新建 Kafka 数据源,请从下拉列表中进行选择。如果未新建,请单击下拉列表中的 新建数据源,在右侧对话框进行新建。参数详情请参见 新建 Kafka 数据源

    传输实例

    从下拉列表中选择已经购买的 传输实例。如果您未购买,请单击右侧的 购买 进行操作。详情请参见 购买传输实例

  3. 单击 下一步

  4. 选择同步类型 页面,勾选当前数据同步项目的 同步类型和配置

    同步类型和配置 包括 结构同步全量同步增量同步增量同步 仅支持 同步 DML(包括 InsertDeleteUpdate)。

  5. 选择同步对象 页面,选择同步对象。

    同步 OceanBase 数据库的数据至 Kafka 时,支持多表到多 Topic 的同步。

    1. 在选择区域左侧选中需要同步的对象。

    2. 单击 >

    3. 将对象映射至 Topic 对话框中,选择需要的映射方式进行配置。

      如果设置同步类型和配置时未勾选 结构同步,则仅支持选择 已有 Topic。如果设置同步类型和配置时已选择 结构同步,则仅支持选择一种映射方式进行 Topic 的创建或选择。

      例如,已选择结构同步的情况下,您使用了新建 Topic 和选择已有 Topic 两种映射方式,或通过重命名的方式更改了 Topic 的名称,会因为选项冲突导致预检查报错。

      参数

      描述

      新建 Topic

      在文本框中输入新建 Topic 的名称。支持 3~64 位字符, 且只能包含英文、数字、短横线(-)、下划线(_)和英文句号(.)。

      选择 Topic

      数据传输提供查询 Kafka Topic 的能力,您可以单击 选择 Topic,在 已有 Topic 下拉列表中,搜索并选中需要同步的 Topic。

      批量生成 Topic

      批量生成 Topic 的规则为 Topic_${Database Name}_${Table Name}

      如果您选择 新建 Topic批量生成 Topic,结构同步成功后,在 Kafka 侧能够查询到新建的 Topic。其分区数量默认为 3 个,分区副本数量默认为 1 个,且不支持修改。

    4. 单击 确定

      选择同步对象后,数据传输支持对目标端对象进行更改 Topic、设置行过滤、移除单个对象或全部对象等操作。目标端对象的结构为 Topic>DataBase>Table。

      操作

      步骤

      导入对象

      1. 在选择区域的右侧列表中,单击右上角的 导入对象

      2. 在对话框中,单击 确定

        重要

        导入会覆盖之前的操作选择,请谨慎操作。

      3. 导入同步对象 对话框中,导入需要同步的对象。 您可以通过导入 CSV 文件的方式进行设置行过滤条件、设置过滤列和设置分片列等操作。详情请参见 下载和导入同步对象配置

      4. 单击 检验合法性

      5. 通过合法性的检验后,单击 确定

      更改 Topic

      1. 在选择区域的右侧列表中,鼠标悬停至目标对象。

      2. 单击显示的 更改 Topic

      3. 将对象映射至 Topic 的对话框中,更改需要同步的 Topic。

      4. 单击 确定

        重要

        确定后会将所选 Topic 及表合并至选中 Topic,请谨慎操作。

      设置

      数据传输支持 WHERE 条件实现行过滤,选择分片列和需要同步的列。

      设置 对话框中,您可以进行以下操作。

      • 行过滤条件 区域的文本框中,输入标准的 SQL 语句中的 WHERE 子句,来配置行过滤。其作用范围为 全量同步 + 增量同步

        只有满足 WHERE 条件的数据才会被同步至目标数据源,以实现数据的行过滤。如果语句中包含 SQL 保留关键字,请添加转义符(`)。

      • 分片列 下拉列表中,选择目标分片列。您可以选择多个字段作为分片列,该参数为选填。

        选择分片列时,如果没有特殊情况,默认选择主键即可。如果存在主键负载不均衡的情况,请选择唯一性标识且负载相对均衡的字段作为分片列。

        请确保分片列的正确性。分片列填写错误,会导致数据同步项目失败。分片列的主要作用如下:

        • 负载均衡:在目标端可以进行并发写入的情况下,通过分片列区分发送消息需要使用的特定线程。

        • 有序性:由于存在并发写入可能导致的无序问题,数据传输确保在分片列的值相同的情况下,用户接收到的消息是有序的。此处的有序是指变更顺序(DML 对于一列的执行顺序)。

      • 选择需要同步的列。

        选择列 区域,选择需要同步的列。全选或不勾选任何列,数据传输会同步全部列。

      移除/全部移除

      数据传输支持在数据映射时,对暂时选中到目标端的单个或多个对象进行移除操作。

      • 移除单个同步对象

        在选择区域的右侧列表中,鼠标悬停至目标对象,单击显示的 移除,即可移除该同步对象。

      • 移除全部同步对象

        在选择区域的右侧列表中,单击右上角的 全部移除。在对话框中,单击 确定,即可移除全部同步对象。

  6. 单击 下一步

  7. 同步选项 页面,配置各项参数。

    分类

    参数

    描述

    同步设置

    增量同步起始位点

    • 如果设置同步类型和配置时已勾选 全量同步,此处默认为项目启动时间,不支持修改。

    • 如果设置同步类型和配置时未勾选 全量同步,请在此处指定同步某个时间节点之后的数据,默认为当前系统时间。您可以选择时间节点,也可以直接输入时间戳。

      重要

      仅支持选择当前时间,或当前时间之前的时间点。

      该位点与当前归档日志的保留时间密切相关。如果无特殊要求,可以从当前位点开始启动。

    高级选项

    序列化方式

    控制数据同步至 Kafka 的消息格式,目前支持 DefaultCanalDataworks(支持 2.0 版本)、SharePlexDefaultExtendColumnType。详情请参见 消息格式说明

    开启事务内序号编排

    根据需求,设置是否开启事务内保持排序。如果开启,数据传输可以为一个事务发送至下游进行顺序标识。

    重要

    该参数仅对 SharePlex 格式生效,目的是为了确保您能够获取构成交易的 DML 语句的序号。

    例如,在同一交易内包含 10 条 DML 语句(顺序为 1,2,3…10),则数据传输会按照 1,2,3 ...10 的顺序投递至目标端。

    该选项会有性能损耗,请根据业务性质选择性开启。

    分区规则

    同步 OceanBase 数据库的数据至 Kafka Topic 的规则,目前支持 HashOne

    • Hash 表示数据传输使用一定的 Hash 算法,根据主键值或分片列值 Hash 选择 Kafka Topic 的分区。

    • One 表示 JSON 消息会投递至 Topic 下的某个分区,目的是为了保持排序。

      该选项会有性能损耗,请根据业务性质选择性开启。

    业务系统标识(可选)

    用于标识数据的业务系统来源,以便您后续进行自定义处理。该业务系统标识的长度限制为 1~20 个字符。

  8. 单击 预检查

    预检查 环节,数据传输会检测和目标端 Kafka 实例的连接情况。如果预检查报错:

    • 您可以排查并处理问题后,重新执行预检查,直至预检查成功。

    • 您也可以单击错误预检查项操作列中的 跳过,会弹出对话框提示您跳过本操作的具体影响,确认可以跳过后,请单击对话框中的 确定

  9. 单击 启动项目。如果您暂时无需启动项目,请单击 保存,跳转至数据同步项目的详情页面,您需要根据需要进行手动启动。

    数据传输支持在数据同步项目运行过程中修改同步对象,详情请参见 查看和修改同步对象。数据同步项目启动后,会根据选择的同步类型依次执行,详情请参见 查看同步详情

如果中途接入数据的过程出错(通常由于网络不通或进程启动过慢导致),您可以进入项目的详情页面,单击右上角的 恢复(此处为了延时,使用暂停再恢复)。

阿里云首页 云数据库 OceanBase 相关技术圈