使用实时集成的Kafka数据源进行实时研发

Dataphin支持将不同的数据源实时集成至Kafka数据源,并且支持对集成的Kafka数据源进行实时研发。本文将为您介绍如何使用实时集成的Kafka数据源进行实时研发。

前提条件

已创建需要进行实时集成的来源数据源和Kafka数据源。来源数据源本文以MySQL数据源为例。

注意事项

  • 创建的MYSQL数据源需在数据源端开启日志,并需确保配置的账户有读取日志权限,否则系统无法实时集成该数据源。

  • 用于处理实时集成到Kafka的元表,消息体格式仅支持dataphin-canal-json,否则元表无法正常进行实时研发。

案例说明

  • 本案例中,MYSQL数据源来源表DDL语句如下:

    CREATE TABLE `mysql_test` (
      `id` bigint(20) DEFAULT NULL,
      `name` text,
      `age` bigint(20) DEFAULT NULL,
      `address` text
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
  • 本案例中,Kafka数据源的Topic为kafka_test,Group ID为testGroup

操作步骤

步骤一:实时集成MySQL数据源至Kafka数据源

  1. 在Dataphin首页,单击顶部菜单栏的研发->数据集成

  2. 按照以下操作指引,进入新建实时集成任务对话框。

    选择项目(Dev-Prod模式需要选择环境)->单击实时集成->单击image新建图标->单击实时集成任务

    image

  3. 新建实时集成任务对话框,配置任务参数。

    参数

    说明

    任务名称

    填写MySQL实时集成Kafka

    描述

    填写MySQL实时集成Kafka案例

    选择目录

    默认选择实时集成

  4. 在新建的实时集成任务中,配置来源数据目标数据

    参数

    描述

    来源数据

    数据源类型

    选择MySQL数据源类型。

    数据源

    选择创建的MySQL数据源。

    同步方案

    默认实时增量

    圈选方式

    选择圈选表并选择需要进行实时集成的表。例如,mysql_test

    目标数据

    数据源类型

    选择Kafka数据源类型。

    数据源

    选择创建的Kafka数据源。

    目标Topic

    选择需要集成的Topic。例如,kafka_test

    数据格式

    选择Canal Json数据格式。

  5. 配置完成后,单击保存

  6. 完成实时集成任务及属性配置后,单击image..png提交任务进行运维调度。

    如果您项目的开发模式是Dev-Prod模式,则需要发布该实时集成任务,具体操作,请参见管理发布任务

步骤二:创建Kafka实时元表用于实时研发

  1. 在Dataphin首页,单击顶部菜单栏研发->数据开发

  2. 按照以下操作指引,进入新建表对话框。

    选择项目(Dev-Prod模式需要选择环境)->单击表管理->单击image新建图标。

    image

  3. 新建表对话框,配置参数。

    参数

    说明

    表类型

    选择元表

    元表名称

    填写元表的名称。例如,Kafka_test

    数据源

    选择创建的Kafka数据源。

    来源topic

    选择上述集成的目标Topic,即kafka_test

    connector

    选择Kafka connector。

    消息体格式

    选择dataphin-canal-json

    重要

    用于处理实时集成到Kafka的消息体格式仅支持选择dataphin-canal-json

    选择目录

    默认选择表管理

    描述

    填写简单的描述。例如,用于处理实时集成到Kafka的元表

  4. 单击确定,完成元表的创建。

  5. 在元表配置页面,配置元表字段后并单击提交进行运维调度。

    如果您项目的开发模式是Dev-Prod模式,则需要发布该元表,具体操作,请参见管理发布任务

步骤三:使用Kafka实时元表进行实时研发

完成Kafka实时元表创建后,即可在Dataphin的Flink_SQL实时任务中使用已创建的Kafka实时元表进行实时研发。例如,将Kafka实时元表数据实时插入至mysql_bk_test元表中。案例代码如下:

set kafka_test.`properties.group.id`='testGroup'; 
set kafka_test.`scan.startup.mode` = 'earliest-offset';

insert into ${项目名称}.mysql_bk_test select * from ${项目名称}.kafka_test; -- ${项目名称}请替换为您实际的项目名称。