Kafka Writer通过Kafka服务的Java SDK向Kafka写入数据,本文为您介绍Kafka Writer的实现原理、参数和示例。

注意 目前Kafka Writer仅支持使用新增和使用独享数据集成资源组,不支持使用默认资源组和自定义资源组

Apache Kafka是一个快速、可扩展、高吞吐和可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理的场景中使用。

实现原理

Kafka Writer通过Java SDK向Kafka中写入数据,使用的日志服务Java SDK版本如下。
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
</dependency>

参数说明

参数 描述 是否必选
datasource 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。
server Kafka的server地址,格式为ip:port
topic Kafka的topic,是Kafka处理资源的消息源(feeds of messages)的不同分类。

每条发布至Kafka集群的消息都有一个类别,该类别被称为topic,一个topic是对一组消息的归纳。

valueIndex Kafka Writer中作为Value的那一列。如果不填写,默认将所有列拼起来作为Value,分隔符为fieldDelimiter
writeMode 当未配置valueIndex时,该配置项决定将源端读取记录的所有列拼接作为写入kafka记录Value的格式,可选值为text和json,默认值为text。
  • 配置为text,将所有列按照fieldDelimiter指定分隔符拼接。
  • 配置为JSON,将所有列按照column参数指定字段名称拼接为JSON字符串。

例如源端记录有三列,值为a、b和c,writeMode配置为text、fieldDelimiter配置为#时,写入kafka的记录Value为字符串a#b#c;writeMode配置为JSON、column配置为["col1","col2","col3"]时,写入kafka的记录Value为字符串{"col1","a","col2":"b","col3":"c"}。

如果配置了valueIndex,该配置项无效。

column 目标表需要写入数据的字段,字段间用英文逗号分隔。例如:"column": ["id", "name", "age"]。如果要依次写入全部列,使用(*)表示。例如"column": ["*"]

当未配置valueIndex,并且writeMode配置为JSON时,该配置项决定源端读取记录的列值在JSON结构中的记录格式。

  • 当源端读取记录列的个数多于column配置的字段名个数时,写入时进行截断。例如:

    源端记录有三列,值为a、b和c,column配置为["col1","col2"]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b"}。

  • 当源端读取记录列的个数少于column配置的字段名个数时,多余column配置字段名填充null或者nullValueFormat指定的字符串。例如:

    源端记录有两列,值为a和b,column配置为["col1","col2","col3"]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b","col3":null}。

如果配置了valueIndex,或者writeMode配置为text,该配置项无效。

当未配置valueIndex,并且writeMode配置为JSON时必选
partition 指定写入Kafka topic指定分区的编号,是一个大于等于0的整数。
keyIndex Kafka Writer中作为Key的那一列。

keyIndex参数取值范围是大于等于0的整数,否则任务会出错。

keyIndexs 源端读取记录中作为写入kafka记录Key的列的序号数组。

列序号从0开始,例如[0,1,2],会将配置的所有列序号的值用逗号连接作为写入kafka记录的Key。如果不填写,写入kafka记录Key为null,数据轮流写入topic的各个分区中,与keyIndex参数只能二选一。

fieldDelimiter 当writeMode配置为text,并且未配置valueIndex时,将源端读取记录的所有列按照该配置项指定列分隔符拼接作为写入kafka记录的Value,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置unicode字符,支持\t\n等转义字符。默认值为\t

如果writeMode未配置为text或者配置了valueIndex,该配置项无效。

keyType Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
valueType Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
nullKeyFormat keyIndex或者keyIndexes指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置不做替换。
nullValueFormat 当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不做替换。
acks 初始化Kafka Producer时的acks配置,决定写入成功的确认方式。

向导模式开发

  1. 选择数据源。
    配置同步任务的数据去向选择数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。
    主题 即上述参数说明中的topic
    键取值列 即上述参数说明中的keyIndexes,指定写入Kafka记录的Key的取值方式。
    写入模式 即上述参数说明中的writeMode,指定写入Kafka记录的格式。
    分隔符 即上述参数说明中的fieldDelimiter,当写入模式为text时决定写入Kafka记录的Value的连接字符。
    null键替代字符串 即上述参数说明中的nullKeyFormat,当写入模式为text时决定写入Kafka记录的Value的连接字符。
    null值替代字符串 即上述参数说明中的nullValueFormat,当写入模式为text时决定写入Kafka记录的Value的连接字符。
    写入成功确认方式 即上述参数说明中的acks,当写入模式为text时决定写入Kafka记录的Value的连接字符。
    单次写入大小 即上述参数说明中的kafkaConfigs的JSON结构中的batch.size字段和linger.ms字段,决定初始化Kafka Producer时的batch.size和linger.ms参数,控制单次写入数据量,默认值为batch.size=16384、linger.ms=10。
    写入超时时间 即上述参数说明中的kafkaConfigs的JSON结构中的timeout.ms字段、request.timeout.ms字段和metadata.fetch.timeout.ms字段,决定初始化Kafka Producer时的timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms参数,控制单次写入超时时间,默认值为timeout.ms=30000,request.timeout.ms=30000,metadata.fetch.timeout.ms=60000。
  2. 字段映射,即上述参数说明中的column。左侧的源头表字段和右侧的目标表字段为一一对应的关系。
    • 当写入模式为text时,映射到目标表字段的源头表字段的值使用分隔符连接作为Kafka记录的value。
    • 当写入模式为JSON时,目标表字段名将决定源端读取记录的列值在JSON结构中的字段名,例如源头表字段有两列,值为a和b,目标表字段配置为col1和col2,那么写入kafka的记录Value为字符串{"col1":"a","col2":"b"}。
    说明 目标表字段名称必须使用字母、数字或者下划线,否则任务运行时会失败。
    字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本模式开发

使用脚本模式开发的详情请参见通过脚本模式配置任务

向Kafka写入数据的JSON配置,如下所示。
{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"Kafka",//插件名。
            "parameter":{
                   "server": "ip:9092", //Kafka的server地址。
                   "keyIndex": 0, //作为Key的列。需遵循驼峰命名规则,k小写
                   "valueIndex": 1, //作为Value的某列。目前只支持取来源端数据的一列或者该参数不填(不填表示取来源所有数据)
        //例如想取odps的第2、3、4列数据作为kafkaValue,请新建odps表将原odps表数据做清洗整合写新odps表后使用新表同步。
                   "keyType": "Integer", //Kafka的Key的类型。
                   "valueType": "Short", //Kafka的Value的类型。
                   "topic": "t08", //Kafka的topic。
                   "batchSize": 1024 //向kafka一次性写入的数据量。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
                     "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                     "concurrent":1, //作业并发数。
                     "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

使用SASL鉴权

如果需要使用SASL鉴权或SSL鉴权,请在定义kafka数据源时进行相关配置,详情请参考:配置Kafka数据源