Kafka Writer通过Kafka服务的Java SDK向Kafka写入数据,本文为您介绍Kafka Writer的实现原理、参数和示例。
Apache Kafka是一个快速、可扩展、高吞吐和可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理的场景中使用。
实现原理
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
参数说明
参数 | 描述 | 是否必选 |
---|---|---|
datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。 | 是 |
server | Kafka的server地址,格式为ip:port。 | 是 |
topic | Kafka的topic,是Kafka处理资源的消息源(feeds of messages)的不同分类。
每条发布至Kafka集群的消息都有一个类别,该类别被称为topic,一个topic是对一组消息的归纳。 |
是 |
valueIndex | Kafka Writer中作为Value的那一列。如果不填写,默认将所有列拼起来作为Value,分隔符为fieldDelimiter。 | 否 |
writeMode | 当未配置valueIndex时,该配置项决定将源端读取记录的所有列拼接作为写入kafka记录Value的格式,可选值为text和JSON,默认值为text。
例如源端记录有三列,值为a、b和c,writeMode配置为text、fieldDelimiter配置为#时,写入kafka的记录Value为字符串a#b#c;writeMode配置为JSON、column配置为[{"name":"col1"},{"name":"col2"},{"name":"col3"}]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b","col3":"c"}。 如果配置了valueIndex,该配置项无效。 |
否 |
column | 目标表需要写入数据的字段,字段间用英文逗号分隔。例如:"column": ["id", "name", "age"]。
当未配置valueIndex,并且writeMode选择JSON时,该配置项定义源端读取记录的列值在JSON结构中的字段名称。例如,"column": [{"name":id"}, {"name":"name"}, {"name":"age"}]。
如果配置了valueIndex,或者writeMode配置为text,该配置项无效。 |
当未配置valueIndex,并且writeMode配置为JSON时必选 |
partition | 指定写入Kafka topic指定分区的编号,是一个大于等于0的整数。 | 否 |
keyIndex | Kafka Writer中作为Key的那一列。
keyIndex参数取值范围是大于等于0的整数,否则任务会出错。 |
否 |
keyIndexes | 源端读取记录中作为写入kafka记录Key的列的序号数组。
列序号从0开始,例如[0,1,2],会将配置的所有列序号的值用逗号连接作为写入kafka记录的Key。如果不填写,写入kafka记录Key为null,数据轮流写入topic的各个分区中,与keyIndex参数只能二选一。 |
否 |
fieldDelimiter | 当writeMode配置为text,并且未配置valueIndex时,将源端读取记录的所有列按照该配置项指定列分隔符拼接作为写入kafka记录的Value,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置unicode字符,支持\t、\n等转义字符。默认值为\t。
如果writeMode未配置为text或者配置了valueIndex,该配置项无效。 |
否 |
keyType | Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
valueType | Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
nullKeyFormat | keyIndex或者keyIndexes指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置不做替换。 | 否 |
nullValueFormat | 当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不做替换。 | 否 |
acks | 初始化Kafka Producer时的acks配置,决定写入成功的确认方式。默认acks参数为all。acks取值如下:
|
否 |
向导模式开发
- 选择数据源。
配置同步任务的数据来源和数据去向。
参数 描述 数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。 主题 即上述参数说明中的topic。 键取值列 即上述参数说明中的keyIndexes,指定写入Kafka记录的Key的取值方式。 写入模式 即上述参数说明中的writeMode,指定写入Kafka记录的格式。 分隔符 即上述参数说明中的fieldDelimiter,当写入模式为text时决定写入Kafka记录的Value的连接字符。 null键替代字符串 即上述参数说明中的nullKeyFormat,指定null的替代方案。 null值替代字符串 即上述参数说明中的nullValueFormat,指定null的替代方案。 写入成功确认方式 即上述参数说明中的acks,指定写入成功确认方式。 单次写入大小 决定初始化Kafka Producer时的batch.size和linger.ms参数,控制单次写入数据量,默认值为batch.size=16384、linger.ms=10。 写入超时时间 决定初始化Kafka Producer时的timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms参数,控制单次写入超时时间,默认值为timeout.ms=30000,request.timeout.ms=30000,metadata.fetch.timeout.ms=60000。 - 字段映射,即上述参数说明中的column。左侧的源头表字段和右侧的目标表字段为一一对应的关系。
- 当写入模式为text时,映射到目标表字段的源头表字段的值使用分隔符连接作为Kafka记录的value。
- 当写入模式为JSON时,目标表字段名将决定源端读取记录的列值在JSON结构中的字段名,例如源头表字段有两列,值为a和b,目标表字段配置为col1和col2,那么写入kafka的记录Value为字符串{"col1":"a","col2":"b"}。
说明 目标表字段名称必须使用字母、数字或者下划线,否则任务运行时会失败。参数 描述 同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。 同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。 取消映射 单击取消映射,可以取消建立的映射关系。 自动排版 可以根据相应的规律自动排版。 - 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组概述和新增和使用独享数据集成资源组。
脚本模式开发
使用脚本模式开发的详情请参见通过脚本模式配置任务。
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//插件名。
"parameter":{
"server": "ip:9092", //Kafka的server地址。
"keyIndex": 0, //作为Key的列。需遵循驼峰命名规则,k小写
"valueIndex": 1, //作为Value的某列。目前只支持取来源端数据的一列或者该参数不填(不填表示取来源所有数据)
//例如想取odps的第2、3、4列数据作为kafkaValue,请新建odps表将原odps表数据做清洗整合写新odps表后使用新表同步。
"keyType": "Integer", //Kafka的Key的类型。
"valueType": "Short", //Kafka的Value的类型。
"topic": "t08", //Kafka的topic。
"batchSize": 1024 //向kafka一次性写入的数据量。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
使用SASL鉴权
如果需要使用SASL鉴权或SSL鉴权,请在定义kafka数据源时进行相关配置,详情请参考:配置Kafka数据源。