实时集成数据至Kafka

MySQL、Microsoft SQL Server、Oracle、PostgreSQL、IBM DB2的数据实时同步至Kafka,可以通过创建实时集成任务实现。本文为您介绍如何创建实时集成任务。

前提条件

需创建实时集成任务所需的数据源(MySQL、Microsoft SQL Server、Oracle、PostgreSQL、IBM DB2),以便在配置实时集成任务过程中,选择对应的来源数据和目标数据。具体操作,请参见实时集成支持的数据源

步骤一:新建实时集成任务

  1. Dataphin首页的顶部菜单栏中,选择研发 > 数据集成

  2. 在顶部菜单栏中选择项目(Dev-Prod模式还需要选择环境)。

  3. 在左侧导航栏中选择集成 > 实时集成,在右侧实时集成列表中单击image图标,选择实时集成任务

  4. 新建实时集成任务对话框中,填写任务名称描述选择目录

    参数

    描述

    任务名称

    填写实时任务名称。

    以字母开头,仅包含小写英文字母、数字、下划线(_),长度限制为4~63个字符。

    生产/开发环境队列资源

    可选择所有配置为实时任务的资源组。

    说明

    仅当项目所使用的计算源是部署模式为KubernetesFlink计算源时,支持此配置项。

    描述

    填写任务的简单描述信息,长度不超过1000个字符。

    选择目录

    选择实时任务所存放的目录。

    若未创建目录,您可以新建文件夹,操作方法如下:

    1. 在页面左侧实时任务列表上方单击image图标,打开新建文件夹对话框。

    2. 新建文件夹对话框中输入文件夹名称并根据需要选择目录位置。

    3. 单击确定

  5. 配置完成后,单击确定

  6. 在新建的实时集成任务中,配置来源数据目标数据

    1. 来源数据配置

      分区

      参数

      描述

      数据源配置

      数据源类型

      选择数据源类型。支持MySQLMicrosoft SQL ServerOraclePostgreSQLIBM DB2

      数据源

      选择数据源。

      系统提供新建数据源入口,您可以单击新建,在数据源页面新建数据源。具体操作,请参见实时集成支持的数据源

      重要

      需在数据源端开启日志,并需确保配置的账户有读取日志权限,否则系统无法实时同步该数据源。

      同步规则设置

      同步方案

      默认实时增量

      说明

      将源数据库的增量变更按照变更发生的顺序采集并写入到下游目标库中。

      圈选方式

      支持整库、圈选表、排除表三种方式。

      • 整库:对当前数据库进行整库数据同步。

        仅当来源数据源类型选择MySQL时,将同步所选数据源下所有数据库中的所有表。

      • 圈选表/排除表:选中当前数据库中部分表进行实时同步。

        • 批量选择/批量排除批量选择时,所选择当前数据库的多个表将进行实时同步;批量排除时,所选择当前数据库的多个表将不进行实时同步。

          来源数据源类型选择MySQL时,支持选择所选数据源下所有数据库中的所有表,列表中以DBname.Tablename格式展示各表。

          来源数据源类型选择PostgreSQL时,不支持排除表方式。

        • 正则匹配:可在正则表达式输入框内填写表名的正则表达式。适用Java正则表达式,如schemaA.*|schemaB.*

          仅当来源数据源类型选择MySQL时,支持批量匹配所选数据源下所有数据库中的所有表,可使用数据库名(DBname)和表名(Tablename)进行正则匹配。

          来源数据源类型选择PostgreSQLMicrosoft SQL ServerIBM DB2时,不支持正则匹配

    2. 目标端数据配置

      分区

      参数

      描述

      数据源配置

      数据源类型

      选择Kafka数据源类型。

      数据源

      选择目标数据源。

      系统提供新建数据源入口,您可以单击新建,在数据源页面新建数据源。具体操作,请参见创建Kafka数据源

      目标Topic

      目标数据的Topic,可选择TopicTopic。选择单Topic后,需选择目标Topic,可输入Topic名称关键字进行搜索;选择多Topic后,支持配置Topic名转换和Topic参数。

      • Topic:所有表消息写入同一个Topic。

      • Topic:每张表建立一个同名Topic。

      数据格式

      支持设置写入数据的存储格式包括DTS AvroCanal Json

      • DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。

      • Canal Json:对于Canal的兼容格式,数据存储格式为Canal Json。

      说明

      当目标Topic选择Topic时,数据格式仅支持选择Canal Json

      目标topic配置

      Topic名转换

      单击配置Topic名转换,可在配置Topic名转换规则对话框中,配置Topic名转换规则以及Topic名称的前后缀

      • Topic名转换规则:单击新建规则,新建一行规则,您需要分别输入来源表待替换字符串目标Topic替换字符串,二者均不可为空,并且目标Topic替换字符串仅支持32个字符以内的英文字母、数字及下划线(_)。

      • Topic名称的前后缀:支持输入英文字母、数字和下划线(-),不超过32个字符。

      说明
      • 替换字符Topic名称前后缀中的英文字符,系统将自动转换为小写

      • 仅当目标Topic选择Topic时,支持配置Topic名转换。

      Topic参数

      用于创建Topic时的附加参数,格式为key=value,多个参数间使用换行分隔。

      说明

      仅当目标Topic选择Topic时,支持配置此项。

    3. 映射配置

      说明

      仅当目标Topic选择Topic时,支持配置此区块。

      image

      区域

      描述

      搜索与筛选区

      支持按源表目标Topic进行搜索。如需快速筛选目标表,单击顶部的1图标,支持按映射状态目标Topic建立方式进行筛选。

      刷新映射关系

      如需刷新目标表配置列表,请单击刷新映射关系

      重要

      目标Topic配置中已有内容时,重新选择数据源类型及数据源会导致目标Topic列表及映射关系情况重置,请谨慎操作。

      列表

      列表包括序号源表映射状态目标Topic建立方式目标Topic,同时您可以对目标表进行删除操作。

      • 目标表建立方式:若目标Topic已存在,则建立方式为使用已有Topic;若目标Topic不存在,则建立方式为自动建Topic

        自动建Topic时,系统将根据已生成的目标Topic名称以及Topic参数进行建立。

      • 映射状态:仅检测目标Topic是否存在。

      • 删除:删除对应行,删除操作不可撤销。

      批量操作

      您可对目标表进行批量删除操作。

  7. 配置完成后,单击保存

步骤二:配置实时任务属性

  1. 单击当前实时集成任务页签顶部菜单栏中的资源配置,或单击右侧边栏的属性,打开属性面板。

  2. 分别配置当前实时集成任务的基本信息资源配置

    • 基本信息:选择当前实时集成任务的开发负责人运维负责人,并填入当前任务的相应描述,不超过1000个字符。

    • 资源配置:详情请参见实时集成资源配置

步骤三:提交实时集成任务

  1. 单击提交,提交当前实时集成任务。

  2. 提交对话框,输入提交备注信息并单击确定并提交

  3. 完成提交后,在提交对话框中,可查看提交详情。

    如果项目的模式为Dev-Prod,则您需要发布实时集成任务至生产环境。具体操作,请参见管理发布任务

后续步骤

提交或发布任务至生产环境后,您可在运维中心查看并运维实时集成任务,保证任务的正常运行。具体操作,请参见查看并管理实时任务