配置Kafka输入组件后,可以将kafka数据源中的数据读取至大数据平台对接的存储系统内,并进行数据整合和二次加工。本文为您介绍如何配置Kafka输入组件。
前提条件
在开始执行操作前,请确认您已完成以下操作:
已创建Kafka数据源。具体操作,请参见创建Kafka数据源。
进行Kafka输入组件属性配置的账号,需具备该数据源的同步读权限。如果没有权限,则需要申请数据源权限。具体操作,请参见申请、续期和交还数据源权限。
操作步骤
在Dataphin首页顶部菜单栏,选择研发 > 数据集成。
在集成页面顶部菜单栏选择项目(Dev-Prod模式需要选择环境)。
在左侧导航栏中单击离线集成,在离线集成列表中单击需要开发的离线管道,打开该离线管道的配置页面。
单击页面右上角的组件库,打开组件库面板。
在组件库面板左侧导航栏中需选择输入,在右侧的输入组件列表中找到KAFKA组件,并拖动该组件至画布。
单击KAFKA输入组件卡片中的
图标,打开KAFKA输入配置对话框。
在KAFKA输入配置对话框,按照下表配置参数。
参数
描述
步骤名称
即Kafka输入组件的名称。Dataphin自动生成步骤名称,您也可以根据业务场景修改。命名规则如下:
只能包含中文、字母、下划线(_)、数字。
不能超过64个字符。
数据源
在数据源下拉列表中,展示当前Dataphin中所有Kafka类型的数据源,包括您已拥有同步读权限的数据源和没有同步读权限的数据源。 单击
图标,可复制当前数据源名称。
对于没有同步读权限的数据源,您可以单击数据源后的申请,申请数据源的同步读权限。具体操作,请参见申请数据源权限。
如果您还没有Kafka类型的数据源,单击新建数据源,创建数据源。具体操作,请参见创建Kafka数据源。
主题
Kafka的Topic。单击下拉列表,选择需要读取的Kafka主题名称。
键类型
Kafka的Key的类型,决定了初始化Kafka Consumer时的key.deserializer配置。可选值包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT、STRING和KAFKA AVRO(数据源配置了schema.registry时可选择)。
值类型
Kafka的Value的类型,决定了初始化Kafka Consumer时的value.deserializer配置。可选值包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT、STRING和KAFKA AVRO(数据源配置了schema.registry时可选择)。
消费群组ID
初始化Kafka Consumer时的group.id配置。
为了确保同步时消费位点的正确性,请避免该参数与其他消费进程重复。如果不指定该参数,则每次执行同步自动生成
datax_
开头的随机字符串作为group.id。起始时间
读取起始时间。仅支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,是时间范围的左边界。需配合调度参数使用,例如调度参数配置为
beginDateTime=${20220101000000}
,则起始时间配置为${beginDateTime}。结束时间
读取结束时间。仅支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,是时间范围的右边界。需配合调度参数使用,例如调度参数配置为
endDateTime=${20220101000000}
,则结束时间配置为${endDateTime}。同步结束策略
选择同步结束策略,有以下两种策略:
1分钟读取不到新数据时:如果消费者1分钟从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性原因),则立即停止任务,否则持续重试直到再次读到数据。
到达指定结束位点:如果数据集成任务读取到的Kafka记录业务时间或者位点满足上面读取结束位点配置时,则任务结束,否则无限重试读取Kafka记录。
高级配置
可通过高级配置进行位点重置策略、单次读取大小、单次读取时间、读取超时等配置。若topic配置了schema registry,需在高级配置中配置keySchema和valueSchema参数。默认为空。样例格式如下:
{ "namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
输出字段
默认展示__key__ ,__value__,__partition__,__headers__,__offset__,__timestamp__6个字段。支持手动添加输出字段:
单击批量添加 ,以JSON格式批量配置,例如:
"column":[ "__key__", "__value__", "__partition__", "__headers__", "__offset__", "__timestamp__" ]
单击新建输出字段,根据页面提示填写来源序号、字段及选择类型。
源头表字段也可配置为上述6个字符串之外的字符串,此时将Kafka记录作为JSON字符串进行解析,将源头表字段配置的字符串作为JSON路径读取对应内容作为字段值写入对应的目标表字段。例如:
{ "data": { "name": "bob", "age": 35 } }
为Kafka记录的value值,当源头表字段配置为data.name时,将会读取bob作为这个字段的值并写入对应目标表,支持添加的字段类型为Java类型和datax的映射类型。同时可以对已添加的字段执行如下操作:
单击操作列下的
图标,编辑已有的字段。
单击操作列下的
图标,删除已有的字段。
单击确定,完成Kafka输入组件配置。