配置Kafka输出组件,可以将外部数据库中读取数据写入到Kafka,或从大数据平台对接的存储系统中将数据复制推送至Kafka,进行数据整合和再加工。本文为您介绍如何配置Kafka输出组件。
操作步骤
请参见离线管道组件开发入口,进入离线单条管道脚本的开发页面。
按照下图操作指引,进入KAFKA输出配置对话框。
在KAFKA输出配置对话框,按照下表配置参数。
参数
说明
步骤名称
根据当前组件的使用场景及定位,输入合适的名称。
数据源
选择Dataphin已配置的数据源。同时您可以单击数据源后的新建,进入规划模块新建数据源。后续操作,详情请参见创建Kafka数据源。
说明进行属性配置的账号需具备该数据源的同步写权限,如果没有权限,则需要申请数据源权限,详情请参见申请数据源权限。
主题
根据实际场景,选择需要的topic。
键取值列
填写键取值列。
如果选择多列,会将配置的所有列序号的值用逗号连接作为写入Kafka记录的Key。
如果不选择,写入Kafka记录Key为null,数据轮流写入topic的各个分区中。
写入模式
该配置项决定将数据源端读取记录的所有列拼接作为写入Kafka记录Value的格式,可选值为Text和JSON,默认值为Text。
Text:将所有列按照分隔符配置项指定分隔符拼接。
JSON:将所有列按照目标表字段名称拼接为JSON字符串。
说明如果配置了valueIndex,该配置项无效。
例如源端记录有三列,值为a、b和c:
当写入模式配置为Text、分隔符配置为#时,写入Kafka的记录Value为字符串a#b#c。
当写入模式配置为JSON、目标表字段配置为["col1","col2","col3"]时,写入Kafka的记录Value为字符串{"col1":"a","col2":"b","col3":"c"}。
值分隔符
Value分隔符配置。
当写入模式为JSON时,不支持Value分隔符配置。
当写入模式为Text时,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置unicode字符,支持\t、\n等转义字符。默认为\t(水平制表符),支持Value分隔符配置。
键类型和值类型
选择Kafka的Key和Value的类型。
当键取值列不选择或选择多列时,Key类型和Value类型可选项包含BYTEARRAY、STRING和KAFKA AVRO(数据源配置了schema.registry时可选择)。
当键取值列选择一列时,Key类型和Value类型可选项包含BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG、SHORT、STRING和KAFKA AVRO(数据源配置了schema.registry时可选择)。
高级配置
按需进行配置,支持以下参数:
keyfieldDelimiter:键分隔符,Kafka的键取值列为多列时的连接字符,默认为空。
valueIndex:配置Kafka Writer中作为Value的列,例如valueIndex=[0,1,2,3] ,[ ]内的数字代表输入组件的字段的seqnumber
写入模式为Text时,默认将所有列拼起来作为Value,使用分隔符配置的分隔符进行分割,值类型只能选择BYTEARRAY或STRING。
写入模式为JSON时,以键值对写入JSON。
partition=0:指定写入Kafka topic指定分区的编号,是一个大于等于0的整数,默认为0。
nullKeyFormat=null:key指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置不做替换。
nullValueFormat=null:当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不做替换。
acks=all:初始化Kafka Producer时的acks配置,决定写入成功的确认方式。值为0不进行写入成功确认,值为1确认主副本写入成功,值为all确认所有副本写入成功。默认acks=all。
keySchema: 如topic配置了schema.regisrty, 请输入key schema。默认为空。
valueSchema: 如topic配置了schema.regisrty, 请输入value schema。默认为空。
字段映射
输入字段:展示上游的输入字段。
输出字段:展示输出字段。支持手动添加输出字段。
单击批量添加 ,以JSON格式批量配置,示例如下。
"column":[ { "index":5, "name":"test1", "type":"STRING" }, { "index":2, "name":"partition", "type":"STRING" } ]
单击新建输出字段,根据页面提示填写字段及选择类型。
同时您也可以对已添加的字段执行如下操作:
单击操作列下的图标,编辑已有的字段。
单击操作列下的图标,删除已有的字段。
快速映射:根据上游的输入和目标表的字段,可以手动选择字段映射。快速映射包括同行映射和同名映射。
当写入模式为Text时,映射到目标表字段的源头表字段的值使用分隔符连接作为Kafka记录的value。
当写入模式为JSON时,目标表字段名将决定源端读取记录的列值在JSON结构中的字段名,例如源头表字段有两列,值为a和b,目标表字段配置为col1和col2,那么写入Kafka的记录Value为字符串{"col1":"a","col2":"b"}。
单击确认,完成Kafka输出组件配置。
- 本页导读 (0)