配置Kafka输入组件后,可以将kafka数据源中的数据读取至大数据平台对接的存储系统内,并进行数据整合和二次加工。本文为您介绍如何配置Kafka输入组件。
前提条件
在开始执行操作前,请确认您已完成以下操作:
已创建Kafka数据源。具体操作,请参见创建Kafka数据源。
进行Kafka输入组件属性配置的账号,需具备该数据源的同步读权限。如果没有权限,则需要申请数据源权限。具体操作,请参见申请、续期和交还数据源权限。
操作步骤
请参见离线管道组件开发入口,进入离线单条管道脚本的开发页面。
按照下图操作指引,进入KAFKA输入配置对话框。
在KAFKA输入配置对话框,按照下表配置参数。
参数
说明
步骤名称
根据当前组件的使用场景及定位,输入合适的名称。
数据源
选择Dataphin已配置的数据源。同时您可以单击数据源后的新建,进入规划模块新建数据源。后续操作,详情请参见创建Kafka数据源。
说明进行属性配置的账号需具备该数据源的同步读权限,如果没有权限,则需要申请数据源权限,详情请参见申请、续期和交还数据源权限。
主题
Kafka的Topic。单击下拉列表,选择需要读取的Kafka主题名称。
键类型
Kafka的Key的类型,决定了初始化Kafka Consumer时的key.deserializer配置。可选值包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT、STRING和KAFKA AVRO(数据源配置了schema.registry时可选择)。
值类型
Kafka的Value的类型,决定了初始化Kafka Consumer时的value.deserializer配置。可选值包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT、STRING和KAFKA AVRO(数据源配置了schema.registry时可选择)。
消费群组ID
初始化Kafka Consumer时的group.id配置。
为了确保同步时消费位点的正确性,请避免该参数与其他消费进程重复。如果不指定该参数,则每次执行同步自动生成
datax_
开头的随机字符串作为group.id。起始时间
读取起始时间。仅支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,是时间范围的左边界。需配合调度参数使用,例如调度参数配置为
beginDateTime=${20220101000000}
,则起始时间配置为${beginDateTime}。结束时间
读取结束时间。仅支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,是时间范围的右边界。需配合调度参数使用,例如调度参数配置为
endDateTime=${20220101000000}
,则结束时间配置为${endDateTime}。同步结束策略
选择同步结束策略,有以下两种策略:
1分钟读取不到新数据时:如果消费者1分钟从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性原因),则立即停止任务,否则持续重试直到再次读到数据。
到达指定结束位点:如果数据集成任务读取到的Kafka记录业务时间或者位点满足上面读取结束位点配置时,则任务结束,否则无限重试读取Kafka记录。
高级配置
可通过高级配置进行位点重置策略、单次读取大小、单次读取时间、读取超时等配置。若topic配置了schema registry,需在高级配置中配置keySchema和valueSchema参数。默认为空。样例格式如下:
{ "namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
输出字段
默认展示__key__ ,__value__,__partition__,__headers__,__offset__,__timestamp__6个字段。支持手动添加输出字段:
单击批量添加 ,以JSON格式批量配置,例如:
"column":[ "__key__", "__value__", "__partition__", "__headers__", "__offset__", "__timestamp__" ]
单击新建输出字段,根据页面提示填写来源序号、字段及选择类型。
源头表字段也可配置为上述6个字符串之外的字符串,此时将Kafka记录作为JSON字符串进行解析,将源头表字段配置的字符串作为JSON路径读取对应内容作为字段值写入对应的目标表字段。例如:
{ "data": { "name": "bob", "age": 35 } }
为Kafka记录的value值,当源头表字段配置为data.name时,将会读取bob作为这个字段的值并写入对应目标表,支持添加的字段类型为Java类型和datax的映射类型。同时可以对已添加的字段执行如下操作:
单击操作列下的图标,编辑已有的字段。
单击操作列下的图标,删除已有的字段。
单击确定,完成Kafka输入组件配置。