配置Kafka输出组件

Kafka输出组件可以将外部数据库中读取数据写入到Kafka,或从大数据平台对接的存储系统中将数据复制推送至Kafka,进行数据整合和再加工。本文为您介绍如何配置Kafka输出组件。

前提条件

  • 已创建Kafka数据源。具体操作,请参见创建Kafka数据源

  • 进行Kafka输出组件属性配置的账号,需具备该数据源的同步读权限。如果没有权限,则需要申请数据源权限。具体操作,请参见申请数据源权限

操作步骤

  1. 请参见离线管道组件开发入口,进入离线单条管道脚本的开发页面。

  2. 按照以下操作指引,进入Kafka输出配置对话框。

    单击组件库->单击输出->拖动Kafka输出组件至画布->连接image图标->单击配置image图标。

    image

  3. KAFKA输出配置对话框,按照下表配置参数。

    参数

    说明

    步骤名称

    根据当前组件的使用场景及定位,输入合适的名称。

    数据源

    选择Dataphin已配置的数据源。同时您可以单击数据源后的新建,进入管理中心模块新建数据源。具体操作,请参见创建Kafka数据源

    进行属性配置的账号需具备该数据源的同步写权限,如果没有权限,则需要申请数据源权限,详情请参见申请数据源权限

    主题

    根据实际场景,选择需要的topic。

    键取值列

    填写键取值列。

    • 如果选择多列,会将配置的所有列序号的值用逗号连接作为写入Kafka记录的Key。

    • 如果不选择,写入Kafka记录Key为null,数据轮流写入topic的各个分区中。

    写入模式

    该配置项决定将数据源端读取记录的所有列拼接作为写入Kafka记录Value的格式,可选值为Text和JSON,默认值为Text。

    • Text:将所有列按照分隔符配置项指定分隔符拼接。

    • JSON:将所有列按照目标表字段名称拼接为JSON字符串。

    说明

    如果配置了valueIndex,该配置项无效。

    例如源端记录有三列,值为a、b和c:

    • 当写入模式配置为Text、分隔符配置为#时,写入Kafka的记录Value为字符串a#b#c。

    • 当写入模式配置为JSON、目标表字段配置为["col1","col2","col3"]时,写入Kafka的记录Value为字符串{"col1":"a","col2":"b","col3":"c"}。

    值分隔符

    Value分隔符配置。

    • 写入模式JSON时,不支持Value分隔符配置。

    • 写入模式Text时,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置unicode字符,支持\t、\n等转义字符。默认为\t(水平制表符),支持Value分隔符配置。

    键类型值类型

    选择Kafka的Key和Value的类型。

    • 键取值列不选择或选择多列时,Key类型Value类型可选项包含BYTEARRAYSTRINGKAFKA AVRO(数据源配置了schema.registry时可选择)。

    • 键取值列选择一列时,Key类型Value类型可选项包含BYTEARRAYDOUBLEFLOATINTEGERLONGSHORTSTRINGKAFKA AVRO(数据源配置了schema.registry时可选择)。

    高级配置

    按需进行配置,支持以下参数:

    • keyfieldDelimiter:键分隔符,Kafka的键取值列为多列时的连接字符,默认为空。

    • valueIndex:配置Kafka Writer中作为Value的列,例如valueIndex=[0,1,2,3] ,[ ]内的数字代表输入组件的字段的seqnumber

      • 写入模式为Text时,默认将所有列拼起来作为Value,使用分隔符配置的分隔符进行分割,值类型只能选择BYTEARRAY或STRING。

      • 写入模式为JSON时,以键值对写入JSON。

    • partition=0:指定写入Kafka topic指定分区的编号,是一个大于等于0的整数,默认为0。

    • nullKeyFormat=null:key指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置不做替换。

    • nullValueFormat=null:当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不做替换。

    • acks=all:初始化Kafka Producer时的acks配置,决定写入成功的确认方式。值为0不进行写入成功确认,值为1确认主副本写入成功,值为all确认所有副本写入成功。默认acks=all。

    • keySchema 如topic配置了schema.regisrty, 请输入key schema。默认为空。

    • valueSchema 如topic配置了schema.regisrty, 请输入value schema。默认为空。

    输入字段

    为您展示上游组件的输出字段。

    输出字段

    为您展示输出字段。 Dataphin支持通过批量添加新建输出字段的方式配置输出字段:

    • 批量添加:单击批量添加,支持JSON、Text格式批量配置

      • 以JSON格式批量配置,例如:

        // 示例:
        [{"name": "user_id","type": "String"},
        {"name": "user_name","type": "String"}]
        说明

        name表示引入的字段名称,type表示引入后的字段类型。 例如:"name":"user_id","type":"String"表示把字段名为user_id的字段引入,设置字段类型为String。

      • 以Text格式批量配置,例如:

        // 示例:
        user_id,String
        user_name,String
        • 行分隔符用于分隔每个字段的信息,默认为换行符(\n),可支持换行符(\n)、分号(;)、点(.)。

        • 列分隔符用于分隔字段名与字段类型,默认英文逗号(,)。

    • 新建输出字段。

      单击+新建输出字段,根据页面提示填写字段及选择类型

    • 复制上游字段。

      单击复制上游字段,系统将根据上游的字段名自动生成输出字段。

    • 管理输出字段。

      同时您也可以对已添加的字段执行如下操作:

      • 单击操作列下的agag图标,编辑已有的字段。

      • 单击操作列下的agfag图标,删除已有的字段。

    映射关系

    映射关系用于将源表的输入字段和目标表的输出字段映射起来,便于后续进行数据同步。映射关系包括同名映射和同行映射。适用场景说明如下:

    • 同名映射:对字段名称相同的字段进行映射。

    • 同行映射:源表和目标表的字段名称不一致,但字段对应行的数据需要映射。只映射同行的字段。

  4. 单击确认,完成Kafka输出组件配置。