配置说明

更新时间:2025-03-19 01:48:23

启动Tablestore Sink Connector时,您需要通过键值映射向Kafka Connect进程传递参数。通过本文您可以结合配置示例和配置参数说明了解Tablestore Sink Connector的相关配置。

配置示例

当从Kafka同步数据到数据表或者时序表时配置项不同,且不同工作模式下相应配置文件的示例不同。此处以同步数据到数据表中为例介绍配置示例。同步数据到时序表的配置示例中需要增加时序相关配置项。

standalone模式配置示例
distributed模式配置示例

如果使用的是standalone模式,您需要通过.properties格式文件进行配置。配置示例如下:

# 设置连接器名称。
name=tablestore-sink
# 指定连接器类。
connector.class=TableStoreSinkConnector
# 设置最大任务数。
tasks.max=1
# 指定导出数据的Kafka的Topic列表。
topics=test
# 以下为Tablestore连接参数的配置。
# Tablestore实例的Endpoint。 
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填写AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore实例名称。
tablestore.instance.name=xxx

# 以下为数据映射相关的配置。
# 指定Kafka Record的解析器。
# 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser

# 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
# topics.assign.tables配置的优先级更高,如果配置了topics.assign.tables,则忽略table.name.format的配置。
# 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
table.name.format=<topic>
# 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
# 如果缺省,则采取table.name.format的配置。
# topics.assign.tables=test:test_kafka

# 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
# kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
# record_key表示以Record Key中的字段作为数据表的主键。
# record_value表示以Record Value中的字段作为数据表的主键。
primarykey.mode=kafka

# 定义导入数据表的主键列名和数据类型。
# 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
# 其中<tablename>为数据表名称的占位符。
# 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
# 当主键模式为record_key或record_value时,必须配置以下两个属性。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer

# 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
# 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
# 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
# 其中<tablename>为数据表名称的占位符。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer

# 以下为写入Tablestore相关的配置。
# 指定写入模式,可选值包括put和update,默认值为put。
# put表示覆盖写。
# update表示更新写。
insert.mode=put
# 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
insert.order.enable=true
# 是否自动创建目标表,默认值为false
auto.create=false

# 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
# none表示不允许进行任何删除。
# row表示允许删除行。
# column表示允许删除属性列。
# row_and_column表示允许删除行和属性列。
delete.mode=none

# 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
buffer.size=1024
# 写入数据表时的回调线程数,默认值为核数+1。
# max.thread.count=
# 写入数据表时的最大请求并发数,默认值为10。
max.concurrency=10
# 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
bucket.count=3
# 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
flush.Interval=10000

# 以下为脏数据处理相关配置。
# 在解析Kafka Record或者写入数据表时可能发生错误,您可以可通过以下配置进行处理。
# 指定容错能力,可选值包括none和all,默认值为none。
# none表示任何错误都将导致Sink Task立即失败。
# all表示跳过产生错误的Record,并记录该Record。
runtime.error.tolerance=none
# 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
# ignore表示忽略所有错误。
# kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
# tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
runtime.error.mode=ignore

# 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors

# 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
# runtime.error.table.name=errors

如果使用的是distributed模式,您需要通过JSON格式文件进行配置。配置示例如下:

{
  "name": "tablestore-sink",
  "config": {
    // 指定连接器类。
    "connector.class":"TableStoreSinkConnector",
    // 设置最大任务数。
    "tasks.max":"3",
    // 指定导出数据的Kafka的Topic列表。
    "topics":"test",
    // 以下为Tablestore连接参数的配置。
    // Tablestore实例的Endpoint。
    "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
    // 填写AccessKey ID和AccessKey Secret。
    "tablestore.access.key.id":"xxx",
    "tablestore.access.key.secret":"xxx",
    // Tablestore实例名称。
    "tablestore.instance.name":"xxx",

    // 以下为数据映射相关的配置。
    // 指定Kafka Record的解析器。
    // 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
    "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",

    // 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
    // topics.assign.tables配置的优先级更高。如果配置了topics.assign.tables,则忽略table.name.format的配置。
    // 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
    "table.name.format":"<topic>",
    // 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
    // 如果缺省,则采取table.name.format的配置。
    // "topics.assign.tables":"test:test_kafka",

    // 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
    // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
    // record_key表示以Record Key中的字段作为数据表的主键。
    // record_value表示以Record Value中的字段作为数据表的主键。
    "primarykey.mode":"kafka",

    // 定义导入数据表的主键列名和数据类型。
    // 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
    // 其中<tablename>为数据表名称的占位符。
    // 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
    // 当主键模式为record_key或record_value时,必须配置以下两个属性。
    // "tablestore.test.primarykey.name":"A,B",
    // "tablestore.test.primarykey.type":"string,integer",

    // 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
    // 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
    // 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
    // 其中<tablename>为数据表名称的占位符。
    // "tablestore.test.columns.whitelist.name":"A,B",
    // "tablestore.test.columns.whitelist.type":"string,integer",

    // 以下为写入Tablestore相关的配置。
    // 指定写入模式,可选值包括put和update,默认值为put。
    // put表示覆盖写。
    // update表示更新写。
    "insert.mode":"put",
    // 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
    "insert.order.enable":"true",
    // 是否自动创建目标表,默认值为false。
    "auto.create":"false",

    // 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
    // none表示不允许进行任何删除。
    // row表示允许删除行。
    // column表示允许删除属性列。
    // row_and_column表示允许删除行和属性列。
    "delete.mode":"none",

    // 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
    "buffer.size":"1024",
    // 写入数据表时的回调线程数,默认值为核数+1。
    // "max.thread.count":
    // 写入数据表时的最大请求并发数,默认值为10。
    "max.concurrency":"10",
    // 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
    "bucket.count":"3",
    // 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
    "flush.Interval":"10000",

    // 以下为脏数据处理相关配置。
    // 在解析Kafka Record或者写入数据表时可能发生错误,您可以通过以下配置进行处理。
    // 指定容错能力,可选值包括none和all,默认值为none。
    // none表示任何错误都将导致Sink Task立即失败。
    // all表示跳过产生错误的Record,并记录该Record。
    "runtime.error.tolerance":"none",
    // 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
    // ignore表示忽略所有错误。
    // kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
    // tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
    "runtime.error.mode":"ignore"

    // 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
    // "runtime.error.bootstrap.servers":"localhost:9092",
    // "runtime.error.topic.name":"errors",

    // 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
    // "runtime.error.table.name":"errors",
  }

配置项说明

配置文件中的配置项说明请参见下表。其中时序相关配置项只有同步数据到时序表时才需要配置。

Kafka Connect常见配置

配置项

类型

是否必选

示例值

描述

配置项

类型

是否必选

示例值

描述

name

string

tablestore-sink

连接器(Connector)名称。连接器名称必须唯一。

connector.class

class

TableStoreSinkConnector

连接器的Java类。

如果您要使用该连接器,请在connector.class配置项中指定Connector类的名称,支持配置为Connector类的全名com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector或别名TableStoreSinkConnector,例如connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector

tasks.max

integer

3

连接器支持创建的最大任务数。

如果连接器无法达到此并行度级别,则可能会创建较少的任务。

key.converter

string

org.apache.kafka.connect.json.JsonConverter

覆盖worker设置的默认key转换器。

value.converter

string

org.apache.kafka.connect.json.JsonConverter

覆盖worker设置的默认value转换器。

topics

list

test

连接器输入的Kafka Topic列表,多个Topic之间以半角逗号(,)分隔。

您必须为连接器设置topics来控制连接器输入的Topic。

连接器Connection配置

配置项

类型

是否必选

示例值

描述

配置项

类型

是否必选

示例值

描述

tablestore.endpoint

string

https://xxx.xxx.ots.aliyuncs.com

Tablestore实例的服务地址

tablestore.mode

string

timeseries

根据数据同步到的表类型选择模式。取值范围如下:

  • normal(默认):同步数据到表格存储的数据表。

  • timeseries:同步数据到表格存储的时序表。

tablestore.access.key.id

string

LTAn********************

登录账号的AccessKey IDAccessKey Secret,获取方式请参见创建AccessKey

tablestore.access.key.secret

string

zbnK**************************

tablestore.auth.mode

string

aksk

设置认证方式。取值范围如下:

  • aksk:使用阿里云账号或者RAM用户的AccessKey IDAccess Secret进行认证。请使用此认证方式。

  • sts(默认):使用STS临时访问凭证进行认证。对接云Kafka时使用。

tablestore.instance.name

string

myotstest

Tablestore实例的名称。

连接器Data Mapping配置

配置项

类型

是否必选

示例值

描述

配置项

类型

是否必选

示例值

描述

event.parse.class

class

DefaultEventParser

消息解析器的Java类,默认值为DefaultEventParser。解析器用于从Kafka Record中解析出数据表的主键列和属性列。

重要

Tablestore对列值大小有限制。string类型和binary类型的主键列值限制均为1 KB,属性列列值限制均为2 MB。更多信息,请参见使用限制

如果数据类型转换后列值超出对应限制,则将该Kafka Record作为脏数据处理。

如果使用默认的DefaultEventParser解析器,Kafka RecordKeyValue必须为Kafka ConnectStructMap类型。

Struct中选择的字段必须为支持的数据类型,字段会根据数据类型映射表转换为Tablestore数据类型写入数据表;Map中的值类型也必须为支持的数据类型,支持的数据类型与Struct相同,最终会被转换为binary类型写入数据表。

如果Kafka Record为不兼容的数据格式,则您可以通过实现com.aliyun.tablestore.kafka.connect.parsers.EventParser定义的接口来自定义解析器。

table.name.format

string

kafka_<topic>

目标数据表名称的格式字符串,默认值为<topic>。字符串中可包含<topic>作为原始Topic的占位符。例如当设置table.name.formatkafka_<topic>时,如果KafkaTopic名为test,则映射到Tablestore的表名为kafka_test。

此配置项的优先级低于topics.assign.tables配置项,如果配置了topics.assign.tables,则会忽略table.name.format的配置。

topics.assign.tables

list

test:destTable

指定topicTablestore表之间的映射关系,格式为<topic_1>:<tablename_1>,<topic_2>:<tablename_2>。多个映射关系之间以半角逗号(,)分隔,例如test:destTable表示将Topic名为test的消息记录写入数据表destTable中。

此配置项的优先级高于table.name.format配置项,如果配置了topics.assign.tables,则会忽略table.name.format的配置。

primarykey.mode

string

kafka

数据表的主键模式。取值范围如下:

  • kafka:以<connect_topic>_<connect_partition>(Kafka主题和分区,用下划线"_"分隔)和<connect_offset>(该消息记录在分区中的偏移量)作为数据表的主键。

  • record_key:以Record Key中的字段(Struct类型)或者键(Map类型)作为数据表的主键。

  • record_value:以Record Value中的字段(Struct类型)或者键(Map类型)作为数据表的主键。

请配合tablestore.<tablename>.primarykey.nametablestore.<tablename>.primarykey.type使用。此配置项不区分大小写。

tablestore.<tablename>.primarykey.name

list

A,B

数据表的主键列名,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以半角逗号(,)分隔。

主键模式不同时,主键列名的配置不同。

  • 当设置主键模式为kafka时,以topic_partition,offset作为数据表主键列名称。在该主键模式下,您可以不配置此主键列名。如果配置了主键列名,则不会覆盖默认主键列名。

  • 当设置主键模式为record_key时,从Record Key中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。

  • 当设置主键模式为record_value时,从Record Value中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。

Tablestore数据表的主键列是有顺序的,此属性的配置应注意主键列名顺序,例如PRIMARY KEY(A、B、C)与PRIMARY KEY(A、C、B)是不同的两个主键结构。

tablestore.<tablename>.primarykey.type

list

string, integer

数据表的主键列数据类型,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以半角逗号(,)分隔,顺序必须与tablestore.<tablename>.primarykey.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binaryauto_increment。

主键数据类型的配置与主键模式相关。

  • 当主键模式为kafka时,以string, integer作为数据表主键数据类型。

    在该主键模式下,您可以不配置此主键列数据类型。如果配置了主键列数据类型,则不会覆盖默认主键列数据类型。

  • 当主键模式为record_keyrecord_value时,指定相应主键列的数据类型。在该主键模式下主键列数据类型必须配置。

    如果指定的数据类型与Kafka Schema中定义的数据类型发生冲突,则会产生解析错误,您可以配置Runtime Error相关属性来应对解析错误。

    当配置此配置项为auto_increment时,表示主键自增列,此时Kafka Record中可以缺省该字段,写入数据表时会自动插入自增列。

tablestore.<tablename>.columns.whitelist.name

list

A,B

数据表的属性列白名单中属性列名称,其中<tablename>为数据表名称的占位符,以半角逗号(,)分隔。

如果配置为空,则使用Record Value中的所有字段(Struct类型)或者键(Map类型)作为数据表的属性列,否则用于过滤得到所需属性列。

tablestore.<tablename>.columns.whitelist.type

list

string, integer

数据表的属性列白名单中属性列数据类型,其中<tablename>为数据表名称的占位符,以半角逗号(,)分隔,顺序必须与tablestore.<tablename>.columns.whitelist.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary、booleandouble。

连接器Write配置

配置项

类型

是否必选

示例值

描述

配置项

类型

是否必选

示例值

描述

insert.mode

string

put

写入模式。取值范围如下:

  • put(默认):对应TablestorePutRow操作,即新写入一行数据会覆盖原数据。

  • update:对应TablestoreUpdateRow操作,即更新一行数据,可以增加一行中的属性列,或更新已存在的属性列的值。

此属性配置不区分大小写。

insert.order.enable

boolean

true

写入数据表时是否需要保持顺序。取值范围如下:

  • true(默认):写入时保持Kafka消息记录的顺序。

  • false:写入顺序无保证,但写入效率会提升。

auto.create

boolean

false

是否需要自动创建目标表,支持自动创建数据表或时序表。取值范围如下:

  • true:自动创建目标表。

  • false(默认):不自动创建目标表。

delete.mode

string

none

删除模式,仅当同步数据到数据表且主键模式为record_key时才有效。取值范围如下:

  • none(默认):不允许进行任何删除。

  • row:允许删除行。当Record Value为空时会删除行。

  • column:允许删除属性列。当Record Value中字段值(Struct类型)或者键值(Map类型)为空时会删除属性列。

  • row_and_column:允许删除行和属性列。

此属性配置不区分大小写。

删除操作与insert.mode的配置相关。更多信息,请参见附录:删除语义

buffer.size

integer

1024

写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须是2的指数。

max.thread.count

integer

3

写入数据表时的回调线程数,默认值为CPU核数+1

max.concurrency

integer

10

写入数据表时的最大请求并发数,默认值为10。

bucket.count

integer

3

写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。

flush.Interval

integer

10000

写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。

连接器Runtime Error配置

配置项

类型

是否必选

示例值

描述

配置项

类型

是否必选

示例值

描述

runtime.error.tolerance

string

none

解析Kafka Record或者写入表时产生错误的处理策略。取值范围如下:

  • none(默认):任何错误都将导致Sink Task立即失败。

  • all:跳过产生错误的Record,并记录该Record。

此属性配置不区分大小写。

runtime.error.mode

string

ignore

解析Kafka Record或者写入表时产生错误,对错误的Record的处理策略。取值范围如下:

  • ignore(默认):忽略所有错误。

  • kafka:将产生错误的Record和错误信息存储在Kafka的另一个Topic中,此时需要配置runtime.error.bootstrap.serversruntime.error.topic.name。记录运行错误的Kafka RecordKeyValue与原Record一致,Header中增加ErrorInfo字段来记录运行错误信息。

  • tablestore:将产生错误的Record和错误信息存储在Tablestore另一张数据表中,此时需要配置runtime.error.table.name。记录运行错误的数据表主键列为topic_partition(string类型), offset(integer类型),并且属性列为key(bytes类型)、value(bytes类型)和error_info(string类型)。

kafka模式下需要对Kafka RecordHeader、KeyValue进行序列化转换,tablestore模式下需要对Kafka RecordKeyValue进行序列化转换,此处默认使用org.apache.kafka.connect.json.JsonConverter,并且配置schemas.enabletrue,您可以通过JsonConverter反序列化得到原始数据。关于Converter的更多信息,请参见Kafka Converter

runtime.error.bootstrap.servers

string

localhost:9092

用于记录运行错误的Kafka集群地址。

runtime.error.topic.name

string

errors

用于记录运行错误的Kafka Topic名称。

runtime.error.table.name

string

errors

用于记录运行错误的Tablestore表名称。

时序相关配置

配置项

类型

是否必选

示例值

描述

配置项

类型

是否必选

示例值

描述

tablestore.timeseries.<tablename>.measurement

string

mName

JSON中的key值为指定值对应的value值作为_m_name字段写入对应时序表中。

如果设置此配置项为<topic>,则将Kafka记录的topic作为_m_name字段写入时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改,例如时序表名称为test,则配置项名称为tablestore.timeseries.test.measurement

tablestore.timeseries.<tablename>.dataSource

string

ds

JSON中的key值为ds对应的value值作为_data_source字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.tags

list

region,level

JSONkey值为regionlevel所对应的value值作为tags字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.time

string

timestamp

JSONkey值为timestamp对应的value值作为_time字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.time.unit

string

MILLISECONDS

tablestore.timeseries.<tablename>.time值的时间戳单位。取值范围为SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.field.name

list

cpu,io

JSONkey值为cpuio的键值对作为_field_name以及_field_name的值写入对应时序表。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.field.type

string

double,integer

tablestore.timeseries.<tablename>.field.name中字段对应的数据类型。取值范围为double、integer、string、binary、boolean。多个数据类型之间用半角逗号(,)分隔。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.mapAll

boolean

false

将输入JSON中的非主键字段和时间字段都作为field存储到时序表中。

当配置项取值为false时,tablestore.timeseries.<tablename>.field.nametablestore.timeseries.<tablename>.field.type必填。

tablestore.timeseries.toLowerCase

boolean

true

field中的key(输入数据中非主键或者时间的key,或配置在tablestore.timeseries.<tablename>.field.name中的key)转换为小写写入时序表。

tablestore.timeseries.rowsPerBatch

integer

50

写入tablestore时,一次请求支持写入的最大行数。最大值为200,默认值为200。

附录:KafkaTablestore数据类型映射

KafkaTablestore数据类型映射关系请参见下表。

Kafka Schema Type

Tablestore数据类型

Kafka Schema Type

Tablestore数据类型

STRING

STRING

INT8、INT16、INT32、INT64

INTEGER

FLOAT32、FLOAT64

DOUBLE

BOOLEAN

BOOLEAN

BYTES

BINARY

附录:删除语义

说明

只有同步数据到数据表时才支持此功能。

当同步数据到数据表且Kafka消息记录的value中存在空值时,根据写入模式(insert.mode)和删除模式(delete.mode)的不同设置,数据写入到表格存储数据表的处理方式不同,详细说明请参见下表。

insert.mode

put

update

insert.mode

put

update

delete.mode

none

row

column

row_and_column

none

row

column

row_and_column

value为空值

覆盖写

删行

覆盖写

删行

脏数据

删行

脏数据

删行

value所有字段值均为空值

覆盖写

覆盖写

覆盖写

覆盖写

脏数据

脏数据

删列

删列

value部分字段值为空值

覆盖写

覆盖写

覆盖写

覆盖写

忽略空值

忽略空值

删列

删列

  • 本页导读 (1)
  • 配置示例
  • 配置项说明
  • Kafka Connect常见配置
  • 连接器Connection配置
  • 连接器Data Mapping配置
  • 连接器Write配置
  • 连接器Runtime Error配置
  • 时序相关配置
  • 附录:Kafka和Tablestore数据类型映射
  • 附录:删除语义