Dataphin支持将不同的数据源实时集成至Kafka数据源,并且支持对集成的Kafka数据源进行实时研发。本文将为您介绍如何使用实时集成的Kafka数据源进行实时研发。
前提条件
已创建需要进行实时集成的来源数据源和Kafka数据源。来源数据源本文以MySQL数据源为例。
注意事项
创建的MYSQL数据源需在数据源端开启日志,并需确保配置的账户有读取日志权限,否则系统无法实时集成该数据源。
用于处理实时集成到Kafka的元表,消息体格式仅支持
dataphin-canal-json
,否则元表无法正常进行实时研发。
案例说明
本案例中,MYSQL数据源来源表DDL语句如下:
CREATE TABLE `mysql_test` ( `id` bigint(20) DEFAULT NULL, `name` text, `age` bigint(20) DEFAULT NULL, `address` text ) ENGINE=InnoDB DEFAULT CHARSET=utf8
本案例中,Kafka数据源的Topic为
kafka_test
,Group ID为testGroup
。
操作步骤
步骤一:实时集成MySQL数据源至Kafka数据源
在Dataphin首页,单击顶部菜单栏的研发->数据集成。
按照以下操作指引,进入新建实时集成任务对话框。
选择项目(Dev-Prod模式需要选择环境)->单击实时集成->单击新建图标->单击实时集成任务。
在新建实时集成任务对话框,配置任务参数。
参数
说明
任务名称
填写MySQL实时集成Kafka。
描述
填写MySQL实时集成Kafka案例。
选择目录
默认选择实时集成。
在新建的实时集成任务中,配置来源数据与目标数据。
参数
描述
来源数据
数据源类型
选择MySQL数据源类型。
数据源
选择创建的MySQL数据源。
同步方案
默认实时增量。
圈选方式
选择圈选表并选择需要进行实时集成的表。例如,
mysql_test
。目标数据
数据源类型
选择Kafka数据源类型。
数据源
选择创建的Kafka数据源。
目标Topic
选择需要集成的Topic。例如,
kafka_test
。数据格式
选择Canal Json数据格式。
配置完成后,单击保存。
完成实时集成任务及属性配置后,单击提交任务进行运维调度。
如果您项目的开发模式是Dev-Prod模式,则需要发布该实时集成任务,具体操作,请参见管理发布任务。
步骤二:创建Kafka实时元表用于实时研发
在Dataphin首页,单击顶部菜单栏研发->数据开发。
按照以下操作指引,进入新建表对话框。
选择项目(Dev-Prod模式需要选择环境)->单击表管理->单击新建图标。
在新建表对话框,配置参数。
参数
说明
表类型
选择元表。
元表名称
填写元表的名称。例如,Kafka_test。
数据源
选择创建的Kafka数据源。
来源topic
选择上述集成的目标Topic,即
kafka_test
。connector
选择Kafka connector。
消息体格式
选择dataphin-canal-json。
重要用于处理实时集成到Kafka的消息体格式仅支持选择
dataphin-canal-json
。选择目录
默认选择表管理。
描述
填写简单的描述。例如,用于处理实时集成到Kafka的元表。
单击确定,完成元表的创建。
在元表配置页面,配置元表字段后并单击提交进行运维调度。
如果您项目的开发模式是Dev-Prod模式,则需要发布该元表,具体操作,请参见管理发布任务。
步骤三:使用Kafka实时元表进行实时研发
完成Kafka实时元表创建后,即可在Dataphin的Flink_SQL实时任务中使用已创建的Kafka实时元表进行实时研发。例如,将Kafka实时元表数据实时插入至mysql_bk_test
元表中。案例代码如下:
set kafka_test.`properties.group.id`='testGroup';
set kafka_test.`scan.startup.mode` = 'earliest-offset';
insert into ${项目名称}.mysql_bk_test select * from ${项目名称}.kafka_test; -- ${项目名称}请替换为您实际的项目名称。