配置Kafka输入组件

配置Kafka输入组件后,可以将kafka数据源中的数据读取至大数据平台对接的存储系统内,并进行数据整合和二次加工。本文为您介绍如何配置Kafka输入组件。

前提条件

在开始执行操作前,请确认您已完成以下操作:

操作步骤

  1. 请参见离线管道组件开发入口,进入离线单条管道脚本的开发页面。

  2. 按照下图操作指引,进入KAFKA输入配置对话框。

    image

  3. KAFKA输入配置对话框,按照下表配置参数。

    参数

    说明

    步骤名称

    根据当前组件的使用场景及定位,输入合适的名称。

    数据源

    选择Dataphin已配置的数据源。同时您可以单击数据源后的新建,进入规划模块新建数据源。后续操作,详情请参见创建Kafka数据源

    说明

    进行属性配置的账号需具备该数据源的同步读权限,如果没有权限,则需要申请数据源权限,详情请参见申请、续期和交还数据源权限

    主题

    KafkaTopic。单击下拉列表,选择需要读取的Kafka主题名称。

    键类型

    KafkaKey的类型,决定了初始化Kafka Consumer时的key.deserializer配置。可选值包括BYTEARRAYDOUBLEFLOATINTEGERLONGSHORTSTRINGKAFKA AVRO(数据源配置了schema.registry时可选择

    值类型

    KafkaValue的类型,决定了初始化Kafka Consumer时的value.deserializer配置。可选值包括BYTEARRAYDOUBLEFLOATINTEGERLONGSHORTSTRINGKAFKA AVRO(数据源配置了schema.registry时可选择

    消费群组ID

    初始化Kafka Consumer时的group.id配置。

    为了确保同步时消费位点的正确性,请避免该参数与其他消费进程重复。如果不指定该参数,则每次执行同步自动生成datax_开头的随机字符串作为group.id。

    起始时间

    读取起始时间。仅支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,是时间范围的左边界。需配合调度参数使用,例如调度参数配置为beginDateTime=${20220101000000},则起始时间配置为${beginDateTime}。

    结束时间

    读取结束时间。仅支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,是时间范围的右边界。需配合调度参数使用,例如调度参数配置为endDateTime=${20220101000000},则结束时间配置为${endDateTime}。

    同步结束策略

    选择同步结束策略,有以下两种策略:

    • 1分钟读取不到新数据时:如果消费者1分钟从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性原因),则立即停止任务,否则持续重试直到再次读到数据。

    • 到达指定结束位点:如果数据集成任务读取到的Kafka记录业务时间或者位点满足上面读取结束位点配置时,则任务结束,否则无限重试读取Kafka记录。

    高级配置

    可通过高级配置进行位点重置策略、单次读取大小、单次读取时间、读取超时等配置。若topic配置了schema registry,需在高级配置中配置keySchemavalueSchema参数。默认为空。样例格式如下:

    {
     "namespace": "example.avro",
     "type": "record",
     "name": "User",
     "fields": [
         {"name": "name", "type": "string"},
         {"name": "favorite_number",  "type": ["int", "null"]},
         {"name": "favorite_color", "type": ["string", "null"]}
     ]
    }

    输出字段

    默认展示__key__ __value____partition____headers____offset____timestamp__6个字段。支持手动添加输出字段:

    • 单击批量添加 ,以JSON格式批量配置,例如:

      "column":[
                  "__key__",
                  "__value__",
                  "__partition__",
                  "__headers__",
                  "__offset__",
                  "__timestamp__"
                ]
    • 单击新建输出字段,根据页面提示填写来源序号字段及选择类型

    源头表字段也可配置为上述6个字符串之外的字符串,此时将Kafka记录作为JSON字符串进行解析,将源头表字段配置的字符串作为JSON路径读取对应内容作为字段值写入对应的目标表字段。例如:

    { "data": { "name": "bob", "age": 35 } }Kafka记录的value值,当源头表字段配置为data.name时,将会读取bob作为这个字段的值并写入对应目标表,支持添加的字段类型为Java类型和datax的映射类型。

    同时可以对已添加的字段执行如下操作:

    • 单击操作列下的agag图标,编辑已有的字段。

    • 单击操作列下的agfag图标,删除已有的字段。

  4. 单击确定,完成Kafka输入组件配置。