配置说明

启动Tablestore Sink Connector时,您需要通过键值映射向Kafka Connect进程传递参数。通过本文您可以结合配置示例和配置参数说明了解Tablestore Sink Connector的相关配置。

配置示例

当从Kafka同步数据到数据表或者时序表时配置项不同,且不同工作模式下相应配置文件的示例不同。此处以同步数据到数据表中为例介绍配置示例。同步数据到时序表的配置示例中需要增加时序相关配置项。

standalone模式配置示例

如果使用的是standalone模式,您需要通过.properties格式文件进行配置。配置示例如下:

# 设置连接器名称。
name=tablestore-sink
# 指定连接器类。
connector.class=TableStoreSinkConnector
# 设置最大任务数。
tasks.max=1
# 指定导出数据的Kafka的Topic列表。
topics=test
# 以下为Tablestore连接参数的配置。
# Tablestore实例的Endpoint。 
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填写AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore实例名称。
tablestore.instance.name=xxx

# 以下为数据映射相关的配置。
# 指定Kafka Record的解析器。
# 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser

# 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
# topics.assign.tables配置的优先级更高,如果配置了topics.assign.tables,则忽略table.name.format的配置。
# 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
table.name.format=<topic>
# 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
# 如果缺省,则采取table.name.format的配置。
# topics.assign.tables=test:test_kafka

# 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
# kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
# record_key表示以Record Key中的字段作为数据表的主键。
# record_value表示以Record Value中的字段作为数据表的主键。
primarykey.mode=kafka

# 定义导入数据表的主键列名和数据类型。
# 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
# 其中<tablename>为数据表名称的占位符。
# 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
# 当主键模式为record_key或record_value时,必须配置以下两个属性。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer

# 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
# 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
# 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
# 其中<tablename>为数据表名称的占位符。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer

# 以下为写入Tablestore相关的配置。
# 指定写入模式,可选值包括put和update,默认值为put。
# put表示覆盖写。
# update表示更新写。
insert.mode=put
# 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
insert.order.enable=true
# 是否自动创建目标表,默认值为false。
auto.create=false

# 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
# none表示不允许进行任何删除。
# row表示允许删除行。
# column表示允许删除属性列。
# row_and_column表示允许删除行和属性列。
delete.mode=none

# 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
buffer.size=1024
# 写入数据表时的回调线程数,默认值为核数+1。
# max.thread.count=
# 写入数据表时的最大请求并发数,默认值为10。
max.concurrency=10
# 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
bucket.count=3
# 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
flush.Interval=10000

# 以下为脏数据处理相关配置。
# 在解析Kafka Record或者写入数据表时可能发生错误,您可以可通过以下配置进行处理。
# 指定容错能力,可选值包括none和all,默认值为none。
# none表示任何错误都将导致Sink Task立即失败。
# all表示跳过产生错误的Record,并记录该Record。
runtime.error.tolerance=none
# 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
# ignore表示忽略所有错误。
# kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
# tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
runtime.error.mode=ignore

# 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors

# 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
# runtime.error.table.name=errors

distributed模式配置示例

如果使用的是distributed模式,您需要通过JSON格式文件进行配置。配置示例如下:

{
  "name": "tablestore-sink",
  "config": {
    // 指定连接器类。
    "connector.class":"TableStoreSinkConnector",
    // 设置最大任务数。
    "tasks.max":"3",
    // 指定导出数据的Kafka的Topic列表。
    "topics":"test",
    // 以下为Tablestore连接参数的配置。
    // Tablestore实例的Endpoint。
    "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
    // 填写AccessKey ID和AccessKey Secret。
    "tablestore.access.key.id":"xxx",
    "tablestore.access.key.secret":"xxx",
    // Tablestore实例名称。
    "tablestore.instance.name":"xxx",

    // 以下为数据映射相关的配置。
    // 指定Kafka Record的解析器。
    // 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
    "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",

    // 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
    // topics.assign.tables配置的优先级更高。如果配置了topics.assign.tables,则忽略table.name.format的配置。
    // 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
    "table.name.format":"<topic>",
    // 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
    // 如果缺省,则采取table.name.format的配置。
    // "topics.assign.tables":"test:test_kafka",

    // 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
    // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
    // record_key表示以Record Key中的字段作为数据表的主键。
    // record_value表示以Record Value中的字段作为数据表的主键。
    "primarykey.mode":"kafka",

    // 定义导入数据表的主键列名和数据类型。
    // 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
    // 其中<tablename>为数据表名称的占位符。
    // 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
    // 当主键模式为record_key或record_value时,必须配置以下两个属性。
    // "tablestore.test.primarykey.name":"A,B",
    // "tablestore.test.primarykey.type":"string,integer",

    // 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
    // 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
    // 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
    // 其中<tablename>为数据表名称的占位符。
    // "tablestore.test.columns.whitelist.name":"A,B",
    // "tablestore.test.columns.whitelist.type":"string,integer",

    // 以下为写入Tablestore相关的配置。
    // 指定写入模式,可选值包括put和update,默认值为put。
    // put表示覆盖写。
    // update表示更新写。
    "insert.mode":"put",
    // 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
    "insert.order.enable":"true",
    // 是否自动创建目标表,默认值为false。
    "auto.create":"false",

    // 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
    // none表示不允许进行任何删除。
    // row表示允许删除行。
    // column表示允许删除属性列。
    // row_and_column表示允许删除行和属性列。
    "delete.mode":"none",

    // 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
    "buffer.size":"1024",
    // 写入数据表时的回调线程数,默认值为核数+1。
    // "max.thread.count":
    // 写入数据表时的最大请求并发数,默认值为10。
    "max.concurrency":"10",
    // 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
    "bucket.count":"3",
    // 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
    "flush.Interval":"10000",

    // 以下为脏数据处理相关配置。
    // 在解析Kafka Record或者写入数据表时可能发生错误,您可以通过以下配置进行处理。
    // 指定容错能力,可选值包括none和all,默认值为none。
    // none表示任何错误都将导致Sink Task立即失败。
    // all表示跳过产生错误的Record,并记录该Record。
    "runtime.error.tolerance":"none",
    // 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
    // ignore表示忽略所有错误。
    // kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
    // tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
    "runtime.error.mode":"ignore"

    // 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
    // "runtime.error.bootstrap.servers":"localhost:9092",
    // "runtime.error.topic.name":"errors",

    // 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
    // "runtime.error.table.name":"errors",
  }

配置项说明

配置文件中的配置项说明请参见下表。其中时序相关配置项只有同步数据到时序表时才需要配置。

Kafka Connect常见配置

配置项

类型

是否必选

示例值

描述

name

string

tablestore-sink

连接器(Connector)名称。连接器名称必须唯一。

connector.class

class

TableStoreSinkConnector

连接器的Java类。

如果您要使用该连接器,请在connector.class配置项中指定Connector类的名称,支持配置为Connector类的全名com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector或别名TableStoreSinkConnector,例如connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector

tasks.max

integer

3

连接器支持创建的最大任务数。

如果连接器无法达到此并行度级别,则可能会创建较少的任务。

key.converter

string

org.apache.kafka.connect.json.JsonConverter

覆盖worker设置的默认key转换器。

value.converter

string

org.apache.kafka.connect.json.JsonConverter

覆盖worker设置的默认value转换器。

topics

list

test

连接器输入的Kafka Topic列表,多个Topic之间以半角逗号(,)分隔。

您必须为连接器设置topics来控制连接器输入的Topic。

连接器Connection配置

配置项

类型

是否必选

示例值

描述

tablestore.endpoint

string

https://xxx.xxx.ots.aliyuncs.com

Tablestore实例的服务地址

tablestore.mode

string

timeseries

根据数据同步到的表类型选择模式。取值范围如下:

  • normal(默认):同步数据到表格存储的数据表。

  • timeseries:同步数据到表格存储的时序表。

tablestore.access.key.id

string

LTAn********************

登录账号的AccessKey IDAccessKey Secret,获取方式请参见创建AccessKey

tablestore.access.key.secret

string

zbnK**************************

tablestore.auth.mode

string

aksk

设置认证方式。取值范围如下:

  • aksk:使用阿里云账号或者RAM用户的AccessKey IDAccess Secret进行认证。请使用此认证方式。

  • sts(默认):使用STS临时访问凭证进行认证。对接云Kafka时使用。

tablestore.instance.name

string

myotstest

Tablestore实例的名称。

连接器Data Mapping配置

配置项

类型

是否必选

示例值

描述

event.parse.class

class

DefaultEventParser

消息解析器的Java类,默认值为DefaultEventParser。解析器用于从Kafka Record中解析出数据表的主键列和属性列。

重要

Tablestore对列值大小有限制。string类型和binary类型的主键列值限制均为1 KB,属性列列值限制均为2 MB。更多信息,请参见使用限制

如果数据类型转换后列值超出对应限制,则将该Kafka Record作为脏数据处理。

如果使用默认的DefaultEventParser解析器,Kafka RecordKeyValue必须为Kafka ConnectStructMap类型。

Struct中选择的字段必须为支持的数据类型,字段会根据数据类型映射表转换为Tablestore数据类型写入数据表;Map中的值类型也必须为支持的数据类型,支持的数据类型与Struct相同,最终会被转换为binary类型写入数据表。

如果Kafka Record为不兼容的数据格式,则您可以通过实现com.aliyun.tablestore.kafka.connect.parsers.EventParser定义的接口来自定义解析器。

table.name.format

string

kafka_<topic>

目标数据表名称的格式字符串,默认值为<topic>。字符串中可包含<topic>作为原始Topic的占位符。例如当设置table.name.formatkafka_<topic>时,如果KafkaTopic名为test,则映射到Tablestore的表名为kafka_test。

此配置项的优先级低于topics.assign.tables配置项,如果配置了topics.assign.tables,则会忽略table.name.format的配置。

topics.assign.tables

list

test:destTable

指定topicTablestore表之间的映射关系,格式为<topic_1>:<tablename_1>,<topic_2>:<tablename_2>。多个映射关系之间以半角逗号(,)分隔,例如test:destTable表示将Topic名为test的消息记录写入数据表destTable中。

此配置项的优先级高于table.name.format配置项,如果配置了topics.assign.tables,则会忽略table.name.format的配置。

primarykey.mode

string

kafka

数据表的主键模式。取值范围如下:

  • kafka:以<connect_topic>_<connect_partition>(Kafka主题和分区,用下划线"_"分隔)和<connect_offset>(该消息记录在分区中的偏移量)作为数据表的主键。

  • record_key:以Record Key中的字段(Struct类型)或者键(Map类型)作为数据表的主键。

  • record_value:以Record Value中的字段(Struct类型)或者键(Map类型)作为数据表的主键。

请配合tablestore.<tablename>.primarykey.nametablestore.<tablename>.primarykey.type使用。此配置项不区分大小写。

tablestore.<tablename>.primarykey.name

list

A,B

数据表的主键列名,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以半角逗号(,)分隔。

主键模式不同时,主键列名的配置不同。

  • 当设置主键模式为kafka时,以topic_partition,offset作为数据表主键列名称。在该主键模式下,您可以不配置此主键列名。如果配置了主键列名,则不会覆盖默认主键列名。

  • 当设置主键模式为record_key时,从Record Key中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。

  • 当设置主键模式为record_value时,从Record Value中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。

Tablestore数据表的主键列是有顺序的,此属性的配置应注意主键列名顺序,例如PRIMARY KEY(A、B、C)与PRIMARY KEY(A、C、B)是不同的两个主键结构。

tablestore.<tablename>.primarykey.type

list

string, integer

数据表的主键列数据类型,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以半角逗号(,)分隔,顺序必须与tablestore.<tablename>.primarykey.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binaryauto_increment。

主键数据类型的配置与主键模式相关。

  • 当主键模式为kafka时,以string, integer作为数据表主键数据类型。

    在该主键模式下,您可以不配置此主键列数据类型。如果配置了主键列数据类型,则不会覆盖默认主键列数据类型。

  • 当主键模式为record_keyrecord_value时,指定相应主键列的数据类型。在该主键模式下主键列数据类型必须配置。

    如果指定的数据类型与Kafka Schema中定义的数据类型发生冲突,则会产生解析错误,您可以配置Runtime Error相关属性来应对解析错误。

    当配置此配置项为auto_increment时,表示主键自增列,此时Kafka Record中可以缺省该字段,写入数据表时会自动插入自增列。

tablestore.<tablename>.columns.whitelist.name

list

A,B

数据表的属性列白名单中属性列名称,其中<tablename>为数据表名称的占位符,以半角逗号(,)分隔。

如果配置为空,则使用Record Value中的所有字段(Struct类型)或者键(Map类型)作为数据表的属性列,否则用于过滤得到所需属性列。

tablestore.<tablename>.columns.whitelist.type

list

string, integer

数据表的属性列白名单中属性列数据类型,其中<tablename>为数据表名称的占位符,以半角逗号(,)分隔,顺序必须与tablestore.<tablename>.columns.whitelist.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary、booleandouble。

连接器Write配置

配置项

类型

是否必选

示例值

描述

insert.mode

string

put

写入模式。取值范围如下:

  • put(默认):对应TablestorePutRow操作,即新写入一行数据会覆盖原数据。

  • update:对应TablestoreUpdateRow操作,即更新一行数据,可以增加一行中的属性列,或更新已存在的属性列的值。

此属性配置不区分大小写。

insert.order.enable

boolean

true

写入数据表时是否需要保持顺序。取值范围如下:

  • true(默认):写入时保持Kafka消息记录的顺序。

  • false:写入顺序无保证,但写入效率会提升。

auto.create

boolean

false

是否需要自动创建目标表,支持自动创建数据表或时序表。取值范围如下:

  • true:自动创建目标表。

  • false(默认):不自动创建目标表。

delete.mode

string

none

删除模式,仅当同步数据到数据表且主键模式为record_key时才有效。取值范围如下:

  • none(默认):不允许进行任何删除。

  • row:允许删除行。当Record Value为空时会删除行。

  • column:允许删除属性列。当Record Value中字段值(Struct类型)或者键值(Map类型)为空时会删除属性列。

  • row_and_column:允许删除行和属性列。

此属性配置不区分大小写。

删除操作与insert.mode的配置相关。更多信息,请参见附录:删除语义

buffer.size

integer

1024

写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须是2的指数。

max.thread.count

integer

3

写入数据表时的回调线程数,默认值为CPU核数+1

max.concurrency

integer

10

写入数据表时的最大请求并发数,默认值为10。

bucket.count

integer

3

写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。

flush.Interval

integer

10000

写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。

连接器Runtime Error配置

配置项

类型

是否必选

示例值

描述

runtime.error.tolerance

string

none

解析Kafka Record或者写入表时产生错误的处理策略。取值范围如下:

  • none(默认):任何错误都将导致Sink Task立即失败。

  • all:跳过产生错误的Record,并记录该Record。

此属性配置不区分大小写。

runtime.error.mode

string

ignore

解析Kafka Record或者写入表时产生错误,对错误的Record的处理策略。取值范围如下:

  • ignore(默认):忽略所有错误。

  • kafka:将产生错误的Record和错误信息存储在Kafka的另一个Topic中,此时需要配置runtime.error.bootstrap.serversruntime.error.topic.name。记录运行错误的Kafka RecordKeyValue与原Record一致,Header中增加ErrorInfo字段来记录运行错误信息。

  • tablestore:将产生错误的Record和错误信息存储在Tablestore另一张数据表中,此时需要配置runtime.error.table.name。记录运行错误的数据表主键列为topic_partition(string类型), offset(integer类型),并且属性列为key(bytes类型)、value(bytes类型)和error_info(string类型)。

kafka模式下需要对Kafka RecordHeader、KeyValue进行序列化转换,tablestore模式下需要对Kafka RecordKeyValue进行序列化转换,此处默认使用org.apache.kafka.connect.json.JsonConverter,并且配置schemas.enabletrue,您可以通过JsonConverter反序列化得到原始数据。关于Converter的更多信息,请参见Kafka Converter

runtime.error.bootstrap.servers

string

localhost:9092

用于记录运行错误的Kafka集群地址。

runtime.error.topic.name

string

errors

用于记录运行错误的Kafka Topic名称。

runtime.error.table.name

string

errors

用于记录运行错误的Tablestore表名称。

时序相关配置

配置项

类型

是否必选

示例值

描述

tablestore.timeseries.<tablename>.measurement

string

mName

JSON中的key值为指定值对应的value值作为_m_name字段写入对应时序表中。

如果设置此配置项为<topic>,则将Kafka记录的topic作为_m_name字段写入时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改,例如时序表名称为test,则配置项名称为tablestore.timeseries.test.measurement

tablestore.timeseries.<tablename>.dataSource

string

ds

JSON中的key值为ds对应的value值作为_data_source字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.tags

list

region,level

JSONkey值为regionlevel所对应的value值作为tags字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.time

string

timestamp

JSONkey值为timestamp对应的value值作为_time字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.time.unit

string

MILLISECONDS

tablestore.timeseries.<tablename>.time值的时间戳单位。取值范围为SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.field.name

list

cpu,io

JSONkey值为cpuio的键值对作为_field_name以及_field_name的值写入对应时序表。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.field.type

string

double,integer

tablestore.timeseries.<tablename>.field.name中字段对应的数据类型。取值范围为double、integer、string、binary、boolean。多个数据类型之间用半角逗号(,)分隔。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.mapAll

boolean

false

将输入JSON中的非主键字段和时间字段都作为field存储到时序表中。

当配置项取值为false时,tablestore.timeseries.<tablename>.field.nametablestore.timeseries.<tablename>.field.type必填。

tablestore.timeseries.toLowerCase

boolean

true

field中的key(输入数据中非主键或者时间的key,或配置在tablestore.timeseries.<tablename>.field.name中的key)转换为小写写入时序表。

tablestore.timeseries.rowsPerBatch

integer

50

写入tablestore时,一次请求支持写入的最大行数。最大值为200,默认值为200。

附录:KafkaTablestore数据类型映射

KafkaTablestore数据类型映射关系请参见下表。

Kafka Schema Type

Tablestore数据类型

STRING

STRING

INT8、INT16、INT32、INT64

INTEGER

FLOAT32、FLOAT64

DOUBLE

BOOLEAN

BOOLEAN

BYTES

BINARY

附录:删除语义

说明

只有同步数据到数据表时才支持此功能。

当同步数据到数据表且Kafka消息记录的value中存在空值时,根据写入模式(insert.mode)和删除模式(delete.mode)的不同设置,数据写入到表格存储数据表的处理方式不同,详细说明请参见下表。

insert.mode

put

update

delete.mode

none

row

column

row_and_column

none

row

column

row_and_column

value为空值

覆盖写

删行

覆盖写

删行

脏数据

删行

脏数据

删行

value所有字段值均为空值

覆盖写

覆盖写

覆盖写

覆盖写

脏数据

脏数据

删列

删列

value部分字段值为空值

覆盖写

覆盖写

覆盖写

覆盖写

忽略空值

忽略空值

删列

删列