Kafka数据源为您提供读取和写入Kafka的双向通道,本文为您介绍DataWorks的Kafka数据同步的能力支持情况。
支持的版本
支持阿里云Kafka,以及版本为0.10.2且小于等于2.2.x的自建Kafka版本。
对于<0.10.2版本Kafka,由于Kafka不支持检索分区数据offset,并且Kafka数据结构可能不支持时间戳,进而无法支持数据同步。
实时读取
使用包年包月Serverless资源组时,请提前预估Serverless资源组规格,避免资源组规格不足影响任务运行:
一个topic预估需要1 CU,除此之外,还需根据流量进行评估:
Kafka数据不压缩,按10 MB/s预估需要1 CU。
Kafka数据压缩,按10 MB/s预估需要2 CU。
Kafka数据压缩并且进行JSON解析,按10MB/s预估需要3 CU。
使用包年包月Serverless资源组和旧版独享数据集成资源组时:
对Failover容忍度高,集群槽位的水位建议不超过80%。
对Failover容忍度低,集群槽位的水位建议不超过70%。
实际占用和数据内容格式等相关,评估后您可以根据实际运行情况进行调整。
使用限制
Kafka数据源目前支持使用Serverless资源组(推荐)和旧版独享数据集成资源组。
单表离线读
同时配置parameter.groupId
和parameter.kafkaConfig.group.id
时,parameter.groupId
优先级高于kafkaConfig
配置信息中的group.id
。
单表实时写
写入数据不支持去重,即如果任务重置位点或者Failover后再启动,会导致出现重复数据写入。
整库实时写
实时数据同步任务支持使用Serverless资源组(推荐)和旧版独享数据集成资源组。
对于源端同步表有主键的场景,同步时会使用主键值作为Kafka记录的key,确保同主键的变更有序写入Kafka的同一分区。
对于源端同步表无主键的场景,如果选择了支持无主键表同步选项,则同步时Kafka记录的key为空。如果要确保表的变更有序写入Kafka,则选择写入的Kafka topic必须是单分区。如果选择了自定义同步主键,则同步时使用其他非主键的一个或几个字段的联合,代替主键作为Kafka记录的key。
如果在kafka集群发生响应异常的情况下,仍要确保有主键表同主键的变更有序写入kafka的同一分区,则需要在配置kafka数据源时,在扩展参数表单中加入如下配置。
{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}
添加配置后同步性能会大幅下降,需要在性能和严格保序可靠性之间做好权衡。
实时同步写入Kafka的消息总体格式、同步任务心跳消息格式及源端更改数据对应的消息格式,详情请参见:附录:消息格式。
支持的字段类型
Kafka的数据存储为非结构化的存储,通常Kafka记录的数据模块有key、value、offset、timestamp、headers、partition。DataWorks在对Kafka数据进行读写时,会按照以下的策略进行数据处理。
离线读数据
DataWorks读取Kafka数据时,支持对Kafka数据进行JSON格式的解析,各数据模块的处理方式如下。
Kafka记录数据模块 | 处理后的数据类型 |
Kafka记录数据模块 | 处理后的数据类型 |
key | 取决于数据同步任务配置的keyType配置项,keyType参数介绍请参见下文的全量参数说明章节。 |
value | 取决于数据同步任务配置的valueType配置项,valueType参数介绍请参见下文的全量参数说明章节。 |
offset | Long |
timestamp | Long |
headers | String |
partition | Long |
离线写数据
DataWorks将数据写入Kafka时,支持写入JSON格式或text格式的数据,不同的数据同步方案往Kafka数据源中写入数据时,对数据的处理策略不一致,详情如下。
写入text格式的数据时,不会写入字段名数据,而是使用分隔符来分隔字段取值。
实时同步写入数据到Kafka时,写入的格式为内置的JSON格式,写入数据为包含数据库变更消息的数据、业务时间和DDL信息的所有数据,数据格式详情请参见附录:消息格式。
同步任务类型 | 写入Kafka value的格式 | 源端字段类型 | 写入时的处理方式 |
同步任务类型 | 写入Kafka value的格式 | 源端字段类型 | 写入时的处理方式 |
离线同步 DataStudio的离线同步节点 | json | 字符串 | UTF8编码字符串 |
布尔 | 转换为UTF8编码字符串"true"或者"false" | ||
时间/日期 | yyyy-MM-dd HH:mm:ss格式UTF8编码字符串 | ||
数值 | UTF8编码数值字符串 | ||
字节流 | 字节流会被视为UTF8编码的字符串,被转换成字符串 | ||
text | 字符串 | UTF8编码字符串 | |
布尔 | 转换为UTF8编码字符串"true"或者"false" | ||
时间/日期 | yyyy-MM-dd HH:mm:ss格式UTF8编码字符串 | ||
数值 | UTF8编码数值字符串 | ||
字节流 | 字节流会被视为UTF8编码的字符串,被转换成字符串 | ||
实时同步:ETL实时同步至Kafka DataStudio的实时同步节点 | json | 字符串 | UTF8编码字符串 |
布尔 | json布尔类型 | ||
时间/日期 |
| ||
数值 | json数值类型 | ||
字节流 | 字节流会进行Base64编码后转换成UTF8编码的字符串 | ||
text | 字符串 | UTF8编码字符串 | |
布尔 | 转换为UTF8编码字符串"true"或者"false" | ||
时间/日期 | yyyy-MM-dd HH:mm:ss格式UTF8编码字符串 | ||
数值 | UTF8编码数值字符串 | ||
字节流 | 字节流会进行Base64编码后转换成UTF8编码字符串 | ||
实时同步:整库实时同步至Kafka 纯实时同步增量数据 | 内置JSON格式 | 字符串 | UTF8编码字符串 |
布尔 | json布尔类型 | ||
时间/日期 | 13位毫秒时间戳 | ||
数值 | json数值 | ||
字节流 | 字节流会进行Base64编码后转换成UTF8编码字符串 | ||
同步解决方案:一键实时同步至Kafka 离线全量+实时增量 | 内置JSON格式 | 字符串 | UTF8编码字符串 |
布尔 | json布尔类型 | ||
时间/日期 | 13位毫秒时间戳 | ||
数值 | json数值 | ||
字节流 | 字节流会进行Base64编码后转换成UTF8编码字符串 |
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表、整库实时同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置。
单表、整库全增量实时同步任务配置指导
操作流程请参见数据集成侧同步任务配置。
启用认证配置说明
SSL
配置Kafka数据源时,特殊认证方式选择SSL或者SASL_SSL时,表明Kafka集群开启了SSL认证,您需要上传客户端truststore证书文件并填写truststore证书密码。
如果Kafka集群为alikafka实例,可以参考SSL证书算法升级说明下载正确的truststore证书文件,truststore证书密码为KafkaOnsClient。
如果Kafka集群为EMR实例,可以参考使用SSL加密Kafka链接下载正确的truststore证书文件并获取truststore证书密码。
如果是自建集群,请自行上传正确的truststore证书,填写正确的truststore证书密码。
keystore证书文件、keystore证书密码和SSL密钥密码只有在Kafka集群开启双向SSL认证时才需要进行配置,用于Kafka集群服务端认证客户端身份。当Kafka集群server.properties中ssl.client.auth=required时开启双向SSL认证,详情请参见使用SSL加密Kafka链接。
GSSAPI
配置Kafka数据源时,当Sasl机制选择GSSAPI时,需要上传三个认证文件,分别是JAAS配置文件、Kerberos配置文件以及Keytab文件,并在独享资源组进行DNS/HOST设置,下面分别介绍这三种文件以及独享资源组DNS、HOST的配置方式。
Serverless资源组需要通过内网DNS解析配置Host地址信息,更多信息,请参见内网DNS解析(PrivateZone)。
JAAS配置文件
JAAS文件必须以KafkaClient开头,之后使用一个大括号包含所有配置项:
大括号内第一行定义使用的登录组件类,对于各类Sasl认证机制,登录组件类是固定的,后续的每个配置项以key=value格式书写。
除最后一个配置项,其他配置项结尾不能有分号。
最后一个配置项结尾必须有分号,在大括号"}"之后也必须加上一个分号。
不符合以上格式要求将导致JAAS配置文件解析出错,典型的JAAS配置文件格式如下(根据实际情况替换以下内容中的xxx):
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="xxx" storeKey=true serviceName="kafka-server" principal="kafka-client@EXAMPLE.COM"; };
配置项
说明
配置项
说明
登录模块
必须配置com.sun.security.auth.module.Krb5LoginModule。
useKeyTab
必须指定为true。
keyTab
可以指定任意路径,在同步任务运行时会自动下载数据源配置时上传的keyTab文件到本地,并使用本地文件路径填充keyTab配置项。
storeKey
决定客户端是否保存密钥,配置true或者false均可,不影响数据同步。
serviceName
对应Kafka服务端server.properties配置文件中的sasl.kerberos.service.name配置项,请根据实际情况配置该项。
principal
Kafka客户端使用的kerberos principal,请根据实际情况配置该项,并确保上传的keyTab文件包含该principal的密钥。
Kerberos配置文件
Kerberos配置文件必须包含两个模块[libdefaults]和[realms]
[libdefaults]模块指定Kerberos认证参数,模块中每个配置项以key=value格式书写。
[realms]模块指定kdc地址,可以包含多个realm子模块,每个realm子模块以realm名称=开头。
后面紧跟一组用大括号包含配置项,每个配置项也以key=value格式书写,典型的Kerberos配置文件格式如下(根据实际情况替换以下内容中的xxx):
[libdefaults] default_realm = xxx [realms] xxx = { kdc = xxx }
配置项
说明
配置项
说明
[libdefaults].default_realm
访问Kafka集群节点时默认使用的realm,一般情况下与JAAS配置文件中指定客户端principal所在realm一致。
[libdefaults]其他参数
[libdefaults]模块可以指定其他一些Kerberos认证参数,例如ticket_lifetime等,请根据实际需要配置。
[realms].realm名称
需要与JAAS配置文件中指定客户端principal所在realm,以及[libdefaults].default_realm一致,如果JAAS配置文件中指定客户端principal所在realm和[libdefaults].default_realm不一致,则需要包含两组realms子模块分别对应JAAS配置文件中指定客户端principal所在realm和[libdefaults].default_realm。
[realms].realm名称.kdc
以ip:port格式指定kdc地址和端口,例如kdc=10.0.0.1:88,端口如果省略默认使用88端口,例如kdc=10.0.0.1。
Keytab文件
Keytab文件需要包含JAAS配置文件指定principal的密钥,并且能够通过kdc的验证。例如本地当前工作目录有名为client.keytab的文件,可以通过以下命令验证Keytab文件是否包含指定principal的密钥。
klist -ket ./client.keytab Keytab name: FILE:client.keytab KVNO Timestamp Principal ---- ------------------- ------------------------------------------------------ 7 2018-07-30T10:19:16 te**@demo.com (des-cbc-md5)
独享资源组DNS、HOST配置
开启Kerberos认证的Kafka集群,会使用Kafka集群中节点的hostname作为节点在kdc(Kerberos的服务端程序,即密钥分发中心)中注册的principal的一部分,而客户端访问Kafka集群节点时,会根据本地的DNS、HOST设置,推导Kafka集群节点的principal,进而从kdc获取节点的访问凭证。使用独享资源组访问开启Kerberos认证的Kafka集群时,需要正确配置DNS、HOST,以确保顺利从kdc获取Kafka集群节点的访问凭证:
DNS设置
当独享资源组绑定的VPC中,使用PrivateZone实例进行了Kafka集群节点的域名解析设置,则可以在DataWorks管控台的独享资源组对应的VPC绑定项增加100.100.2.136和100.100.2.138两个IP的自定义路由,以使PrivateZone针对Kafka集群节点的域名解析设置对独享资源组生效。
HOST设置
当独享资源组绑定的VPC中,未使用PrivateZone实例进行了Kafka集群节点的域名解析设置,则需要在DataWorks管控台,独享资源组网络设置中逐个将Kafka集群节点的IP地址与域名映射添加到Host配置中。
PLAIN
配置Kafka数据源时,当Sasl机制选择PLAIN时,JAAS文件必须以KafkaClient开头,之后使用一个大括号包含所有配置项。
大括号内第一行定义使用的登录组件类,对于各类Sasl认证机制,登录组件类是固定的,后续的每个配置项以key=value格式书写。
除最后一个配置项,其他配置项结尾不能有分号。
最后一个配置项结尾必须有分号,在大括号"}"之后也必须加上一个分号。
不符合以上格式要求将导致JAAS配置文件解析出错,典型的JAAS配置文件格式如下(根据实际情况替换以下内容中的xxx):
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
};
配置项 | 说明 |
配置项 | 说明 |
登录模块 | 必须配置org.apache.kafka.common.security.plain.PlainLoginModul |
username | 用户名,请根据实际情况配置该项。 |
password | 密码,请根据实际情况配置该项。 |
常见问题
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
从Kafka读取数据的JSON配置,如下所示。
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent": 1,//并发数
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
}
}
Reader脚本参数
参数 | 描述 | 是否必选 |
参数 | 描述 | 是否必选 |
datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。 | 是 |
server | Kafka的broker server地址,格式为ip:port。 您可以只配置一个server,但请务必保证Kafka集群中所有broker的IP地址都可以连通DataWorks。 | 是 |
topic | Kafka的Topic,是Kafka处理资源的消息源(feeds of messages)的聚合。 | 是 |
column | 需要读取的Kafka数据,支持常量列、数据列和属性列:
| 是 |
keyType | Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
valueType | Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
beginDateTime | 数据消费的开始时间位点,为时间范围(左闭右开)的左边界。yyyymmddhhmmss格式的时间字符串,可以配合调度参数使用。详情请参见调度参数支持的格式。 Kafka 0.10.2及以上的版本支持该功能。 | 需要和beginOffset二选一。 beginDateTime和endDateTime配合使用。 |
endDateTime | 数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yyyymmddhhmmss格式的时间字符串,可以配合调度参数使用。详情请参见调度参数支持的格式。 Kafka 0.10.2及以上的版本支持该功能。 | 需要和endOffset二选一。 endDateTime和beginDateTime配合使用。 |
beginOffset | 数据消费的开始时间位点,您可以配置以下形式:
| 需要和beginDateTime二选一。 |
endOffset | 数据消费的结束位点,用于控制结束数据消费任务退出的时间。 | 需要和endDateTime二选一。 |
skipExceedRecord | Kafka使用
| 否,默认值为false。 |
partition | Kafka的一个Topic有多个分区(partition),正常情况下数据同步任务是读取Topic(多个分区)一个点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。 | 否,无默认值。 |
kafkaConfig | 创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap.servers、auto.commit.interval.ms、session.timeout.ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。 | 否 |
encoding | 当keyType或valueType配置为STRING时,将使用该配置项指定的编码解析字符串。 | 否,默认为UTF-8。 |
waitTIme | 消费者对象从Kafka拉取一次数据的最大等待时间,单位为秒。 | 否,默认为60。 |
stopWhenPollEmpty | 该配置项可选值为true/false。当配置为true时,如果消费者从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性问题),则立即停止任务,否则持续重试直到再次读到数据。 | 否,默认为true。 |
stopWhenReachEndOffset | 该配置项只在stopWhenPollEmpty为true时生效,可选值为true/false。
| 否,默认为false。 兼容历史逻辑,Kafka版本低于V0.10.2无法执行已经读取Kafka Topic所有分区中的最新位点数据检查,但线上可能存在个别脚本模式任务是读取的版本低于V0.10.2的Kafka数据。 |
kafkaConfig参数说明如下。
参数 | 描述 |
参数 | 描述 |
fetch.min.bytes | 指定消费者从broker获取消息的最小字节数,即有足够的数据时,才将其返回给消费者。 |
fetch.max.wait.ms | 等待broker返回数据的最大时间,默认500毫秒。fetch.min.bytes和fetch.max.wait.ms先满足哪个条件,便按照该方式返回数据。 |
max.partition.fetch.bytes | 指定broker从每个partition中返回给消费者的最大字节数,默认为1 MB。 |
session.timeout.ms | 指定消费者不再接收服务之前,可以与服务器断开连接的时间,默认是30秒。 |
auto.offset.reset | 消费者在读取没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)的处理方式。默认为none(意味着不会自动重置位点),您可以更改为earliest(消费者从起始位置读取partition的记录)。 |
max.poll.records | 单次调用poll方法能够返回的消息数量。 |
key.deserializer | 消息Key的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。 |
value.deserializer | 数据Value的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。 |
ssl.truststore.location | SSL根证书的路径。 |
ssl.truststore.password | 根证书Store的密码。如果是Aliyun Kafka,则配置为KafkaOnsClient。 |
security.protocol | 接入协议,目前支持使用SASL_SSL协议接入。 |
sasl.mechanism | SASL鉴权方式,如果是Aliyun Kafka,使用PLAIN。 |
java.security.auth.login.config | SASL鉴权文件路径。 |
Writer脚本Demo
向Kafka写入数据的JSON配置,如下所示。
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//插件名。
"parameter":{
"server": "ip:9092", //Kafka的server地址。
"keyIndex": 0, //作为Key的列。需遵循驼峰命名规则,k小写
"valueIndex": 1, //作为Value的某列。目前只支持取来源端数据的一列或者该参数不填(不填表示取来源所有数据)
//例如想取odps的第2、3、4列数据作为kafkaValue,请新建odps表将原odps表数据做清洗整合写新odps表后使用新表同步。
"keyType": "Integer", //Kafka的Key的类型。
"valueType": "Short", //Kafka的Value的类型。
"topic": "t08", //Kafka的topic。
"batchSize": 1024 //向kafka一次性写入的数据量,单位是字节。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer脚本参数
参数 | 描述 | 是否必选 |
参数 | 描述 | 是否必选 |
datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。 | 是 |
server | Kafka的server地址,格式为ip:port。 | 是 |
topic | Kafka的topic,是Kafka处理资源的消息源(feeds of messages)的不同分类。 每条发布至Kafka集群的消息都有一个类别,该类别被称为topic,一个topic是对一组消息的归纳。 | 是 |
valueIndex | Kafka Writer中作为Value的那一列。如果不填写,默认将所有列拼起来作为Value,分隔符为fieldDelimiter。 | 否 |
writeMode | 当未配置valueIndex时,该配置项决定将源端读取记录的所有列拼接作为写入Kafka记录Value的格式,可选值为text和JSON,默认值为text。
例如源端记录有三列,值为a、b和c,writeMode配置为text、fieldDelimiter配置为#时,写入Kafka的记录Value为字符串a#b#c;writeMode配置为JSON、column配置为[{"name":"col1"},{"name":"col2"},{"name":"col3"}]时,写入Kafka的记录Value为字符串{"col1":"a","col2":"b","col3":"c"}。 如果配置了valueIndex,该配置项无效。 | 否 |
column | 目标表需要写入数据的字段,字段间用英文逗号分隔。例如:"column": ["id", "name", "age"]。 当未配置valueIndex,并且writeMode选择JSON时,该配置项定义源端读取记录的列值在JSON结构中的字段名称。例如,"column": [{"name":id"}, {"name":"name"}, {"name":"age"}]。
如果配置了valueIndex,或者writeMode配置为text,该配置项无效。 | 当未配置valueIndex,并且writeMode配置为JSON时必选 |
partition | 指定写入Kafka topic指定分区的编号,是一个大于等于0的整数。 | 否 |
keyIndex | Kafka Writer中作为Key的那一列。 keyIndex参数取值范围是大于等于0的整数,否则任务会出错。 | 否 |
keyIndexes | 源端读取记录中作为写入Kafka记录Key的列的序号数组。 列序号从0开始,例如[0,1,2],会将配置的所有列序号的值用逗号连接作为写入Kafka记录的Key。如果不填写,写入Kafka记录Key为null,数据轮流写入topic的各个分区中,与keyIndex参数只能二选一。 | 否 |
fieldDelimiter | 当writeMode配置为text,并且未配置valueIndex时,将源端读取记录的所有列按照该配置项指定列分隔符拼接作为写入Kafka记录的Value,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置Unicode字符,支持\t、\n等转义字符。默认值为\t。 如果writeMode未配置为text或者配置了valueIndex,该配置项无效。 | 否 |
keyType | Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
valueType | Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
nullKeyFormat | keyIndex或者keyIndexes指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置,则不做替换。 | 否 |
nullValueFormat | 当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不作替换。 | 否 |
acks | 初始化Kafka Producer时的acks配置,决定写入成功的确认方式。默认acks参数为all。acks取值如下:
| 否 |
附录:写入Kafka消息格式定义
完成配置实时同步任务的操作后,执行同步任务会将源端数据库读取的数据,以JSON格式写入到Kafka topic中。除了会将设置的源端表中已有数据全部写入Kafka对应Topic中,还会启动实时同步将增量数据持续写入Kafka对应Topic中,同时源端表增量DDL变更信息也会以JSON格式写入Kafka对应Topic中。您可以通过附录:消息格式获取写入Kafka的消息的状态及变更等信息。
通过离线同步任务写入Kafka的数据JSON结构中的payload.sequenceId、payload.timestamp.eventTIme和payload.timestamp.checkpointTime字段均设置为-1。
- 本页导读 (1)
- 支持的版本
- 实时读取
- 使用限制
- 单表离线读
- 单表实时写
- 整库实时写
- 支持的字段类型
- 离线读数据
- 离线写数据
- 创建数据源
- 数据同步任务开发
- 单表离线同步任务配置指导
- 单表、整库实时同步任务配置指导
- 单表、整库全增量实时同步任务配置指导
- 启用认证配置说明
- SSL
- GSSAPI
- PLAIN
- 常见问题
- 附录:脚本Demo与参数说明
- 离线任务脚本配置方式
- Reader脚本Demo
- Reader脚本参数
- Writer脚本Demo
- Writer脚本参数
- 附录:写入Kafka消息格式定义