文档

配置Kafka输出组件

更新时间:

配置Kafka输出组件,可以将外部数据库中读取数据写入到Kafka,或从大数据平台对接的存储系统中将数据复制推送至Kafka,进行数据整合和再加工。本文为您介绍如何配置Kafka输出组件。

操作步骤

  1. 请参见离线管道组件开发入口,进入离线单条管道脚本的开发页面。

  2. 按照下图操作指引,进入KAFKA输出配置对话框。

    image

  3. KAFKA输出配置对话框,按照下表配置参数。

    参数

    说明

    步骤名称

    根据当前组件的使用场景及定位,输入合适的名称。

    数据源

    选择Dataphin已配置的数据源。同时您可以单击数据源后的新建,进入规划模块新建数据源。后续操作,详情请参见创建Kafka数据源

    说明

    进行属性配置的账号需具备该数据源的同步写权限,如果没有权限,则需要申请数据源权限,详情请参见申请数据源权限

    主题

    根据实际场景,选择需要的topic。

    键取值列

    填写键取值列。

    • 如果选择多列,会将配置的所有列序号的值用逗号连接作为写入Kafka记录的Key。

    • 如果不选择,写入Kafka记录Key为null,数据轮流写入topic的各个分区中。

    写入模式

    该配置项决定将数据源端读取记录的所有列拼接作为写入Kafka记录Value的格式,可选值为Text和JSON,默认值为Text。

    • Text:将所有列按照分隔符配置项指定分隔符拼接。

    • JSON:将所有列按照目标表字段名称拼接为JSON字符串。

    说明

    如果配置了valueIndex,该配置项无效。

    例如源端记录有三列,值为a、b和c:

    • 当写入模式配置为Text、分隔符配置为#时,写入Kafka的记录Value为字符串a#b#c。

    • 当写入模式配置为JSON、目标表字段配置为["col1","col2","col3"]时,写入Kafka的记录Value为字符串{"col1":"a","col2":"b","col3":"c"}。

    值分隔符

    Value分隔符配置。

    • 写入模式JSON时,不支持Value分隔符配置。

    • 写入模式Text时,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置unicode字符,支持\t、\n等转义字符。默认为\t(水平制表符),支持Value分隔符配置。

    键类型值类型

    选择Kafka的Key和Value的类型。

    • 键取值列不选择或选择多列时,Key类型Value类型可选项包含BYTEARRAYSTRINGKAFKA AVRO(数据源配置了schema.registry时可选择)。

    • 键取值列选择一列时,Key类型Value类型可选项包含BYTEARRAYDOUBLEFLOATINTEGERLONGSHORTSTRINGKAFKA AVRO(数据源配置了schema.registry时可选择)。

    高级配置

    按需进行配置,支持以下参数:

    • keyfieldDelimiter:键分隔符,Kafka的键取值列为多列时的连接字符,默认为空。

    • valueIndex:配置Kafka Writer中作为Value的列,例如valueIndex=[0,1,2,3] ,[ ]内的数字代表输入组件的字段的seqnumber

      • 写入模式为Text时,默认将所有列拼起来作为Value,使用分隔符配置的分隔符进行分割,值类型只能选择BYTEARRAY或STRING。

      • 写入模式为JSON时,以键值对写入JSON。

    • partition=0:指定写入Kafka topic指定分区的编号,是一个大于等于0的整数,默认为0。

    • nullKeyFormat=null:key指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置不做替换。

    • nullValueFormat=null:当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不做替换。

    • acks=all:初始化Kafka Producer时的acks配置,决定写入成功的确认方式。值为0不进行写入成功确认,值为1确认主副本写入成功,值为all确认所有副本写入成功。默认acks=all。

    • keySchema 如topic配置了schema.regisrty, 请输入key schema。默认为空。

    • valueSchema 如topic配置了schema.regisrty, 请输入value schema。默认为空。

    字段映射

    • 输入字段:展示上游的输入字段。

    • 输出字段:展示输出字段。支持手动添加输出字段。

      • 单击批量添加 ,以JSON格式批量配置,示例如下。

        "column":[
                    {
                      "index":5,
                      "name":"test1",
                      "type":"STRING"
                    },
                    {
                      "index":2,
                      "name":"partition",
                      "type":"STRING"
                    }
                  ]
      • 单击新建输出字段,根据页面提示填写字段及选择类型

      同时您也可以对已添加的字段执行如下操作:

      • 单击操作列下的agag图标,编辑已有的字段。

      • 单击操作列下的agfag图标,删除已有的字段。

    • 快速映射:根据上游的输入和目标表的字段,可以手动选择字段映射。快速映射包括同行映射同名映射

      • 当写入模式为Text时,映射到目标表字段的源头表字段的值使用分隔符连接作为Kafka记录的value。

      • 当写入模式为JSON时,目标表字段名将决定源端读取记录的列值在JSON结构中的字段名,例如源头表字段有两列,值为a和b,目标表字段配置为col1和col2,那么写入Kafka的记录Value为字符串{"col1":"a","col2":"b"}。

  4. 单击确认,完成Kafka输出组件配置。

  • 本页导读 (0)