启动Tablestore Sink Connector时,您需要通过键值映射向Kafka Connect进程传递参数。通过本文您可以结合配置示例和配置参数说明了解Tablestore Sink Connector的相关配置。
配置示例
当从Kafka同步数据到数据表或者时序表时配置项不同,且不同工作模式下相应配置文件的示例不同。此处以同步数据到数据表中为例介绍配置示例。同步数据到时序表的配置示例中需要增加时序相关配置项。
如果使用的是standalone模式,您需要通过.properties格式文件进行配置。配置示例如下:
# 设置连接器名称。
name=tablestore-sink
# 指定连接器类。
connector.class=TableStoreSinkConnector
# 设置最大任务数。
tasks.max=1
# 指定导出数据的Kafka的Topic列表。
topics=test
# 以下为Tablestore连接参数的配置。
# Tablestore实例的Endpoint。
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填写AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore实例名称。
tablestore.instance.name=xxx
# 以下为数据映射相关的配置。
# 指定Kafka Record的解析器。
# 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
# 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
# topics.assign.tables配置的优先级更高,如果配置了topics.assign.tables,则忽略table.name.format的配置。
# 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
table.name.format=<topic>
# 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
# 如果缺省,则采取table.name.format的配置。
# topics.assign.tables=test:test_kafka
# 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
# kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
# record_key表示以Record Key中的字段作为数据表的主键。
# record_value表示以Record Value中的字段作为数据表的主键。
primarykey.mode=kafka
# 定义导入数据表的主键列名和数据类型。
# 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
# 其中<tablename>为数据表名称的占位符。
# 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
# 当主键模式为record_key或record_value时,必须配置以下两个属性。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer
# 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
# 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
# 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
# 其中<tablename>为数据表名称的占位符。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer
# 以下为写入Tablestore相关的配置。
# 指定写入模式,可选值包括put和update,默认值为put。
# put表示覆盖写。
# update表示更新写。
insert.mode=put
# 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
insert.order.enable=true
# 是否自动创建目标表,默认值为false。
auto.create=false
# 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
# none表示不允许进行任何删除。
# row表示允许删除行。
# column表示允许删除属性列。
# row_and_column表示允许删除行和属性列。
delete.mode=none
# 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
buffer.size=1024
# 写入数据表时的回调线程数,默认值为核数+1。
# max.thread.count=
# 写入数据表时的最大请求并发数,默认值为10。
max.concurrency=10
# 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
bucket.count=3
# 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
flush.Interval=10000
# 以下为脏数据处理相关配置。
# 在解析Kafka Record或者写入数据表时可能发生错误,您可以可通过以下配置进行处理。
# 指定容错能力,可选值包括none和all,默认值为none。
# none表示任何错误都将导致Sink Task立即失败。
# all表示跳过产生错误的Record,并记录该Record。
runtime.error.tolerance=none
# 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
# ignore表示忽略所有错误。
# kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
# tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
runtime.error.mode=ignore
# 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors
# 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
# runtime.error.table.name=errors
如果使用的是distributed模式,您需要通过JSON格式文件进行配置。配置示例如下:
{
"name": "tablestore-sink",
"config": {
// 指定连接器类。
"connector.class":"TableStoreSinkConnector",
// 设置最大任务数。
"tasks.max":"3",
// 指定导出数据的Kafka的Topic列表。
"topics":"test",
// 以下为Tablestore连接参数的配置。
// Tablestore实例的Endpoint。
"tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
// 填写AccessKey ID和AccessKey Secret。
"tablestore.access.key.id":"xxx",
"tablestore.access.key.secret":"xxx",
// Tablestore实例名称。
"tablestore.instance.name":"xxx",
// 以下为数据映射相关的配置。
// 指定Kafka Record的解析器。
// 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
"event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
// 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
// topics.assign.tables配置的优先级更高。如果配置了topics.assign.tables,则忽略table.name.format的配置。
// 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
"table.name.format":"<topic>",
// 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
// 如果缺省,则采取table.name.format的配置。
// "topics.assign.tables":"test:test_kafka",
// 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
// kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
// record_key表示以Record Key中的字段作为数据表的主键。
// record_value表示以Record Value中的字段作为数据表的主键。
"primarykey.mode":"kafka",
// 定义导入数据表的主键列名和数据类型。
// 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
// 其中<tablename>为数据表名称的占位符。
// 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
// 当主键模式为record_key或record_value时,必须配置以下两个属性。
// "tablestore.test.primarykey.name":"A,B",
// "tablestore.test.primarykey.type":"string,integer",
// 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
// 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
// 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
// 其中<tablename>为数据表名称的占位符。
// "tablestore.test.columns.whitelist.name":"A,B",
// "tablestore.test.columns.whitelist.type":"string,integer",
// 以下为写入Tablestore相关的配置。
// 指定写入模式,可选值包括put和update,默认值为put。
// put表示覆盖写。
// update表示更新写。
"insert.mode":"put",
// 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
"insert.order.enable":"true",
// 是否自动创建目标表,默认值为false。
"auto.create":"false",
// 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
// none表示不允许进行任何删除。
// row表示允许删除行。
// column表示允许删除属性列。
// row_and_column表示允许删除行和属性列。
"delete.mode":"none",
// 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
"buffer.size":"1024",
// 写入数据表时的回调线程数,默认值为核数+1。
// "max.thread.count":
// 写入数据表时的最大请求并发数,默认值为10。
"max.concurrency":"10",
// 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
"bucket.count":"3",
// 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
"flush.Interval":"10000",
// 以下为脏数据处理相关配置。
// 在解析Kafka Record或者写入数据表时可能发生错误,您可以通过以下配置进行处理。
// 指定容错能力,可选值包括none和all,默认值为none。
// none表示任何错误都将导致Sink Task立即失败。
// all表示跳过产生错误的Record,并记录该Record。
"runtime.error.tolerance":"none",
// 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
// ignore表示忽略所有错误。
// kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
// tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
"runtime.error.mode":"ignore"
// 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
// "runtime.error.bootstrap.servers":"localhost:9092",
// "runtime.error.topic.name":"errors",
// 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
// "runtime.error.table.name":"errors",
}
配置项说明
配置文件中的配置项说明请参见下表。其中时序相关配置项只有同步数据到时序表时才需要配置。
Kafka Connect常见配置
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
name | string | 是 | tablestore-sink | 连接器(Connector)名称。连接器名称必须唯一。 |
connector.class | class | 是 | TableStoreSinkConnector | 连接器的Java类。 如果您要使用该连接器,请在 |
tasks.max | integer | 是 | 3 | 连接器支持创建的最大任务数。 如果连接器无法达到此并行度级别,则可能会创建较少的任务。 |
key.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆盖worker设置的默认key转换器。 |
value.converter | string | 否 | org.apache.kafka.connect.json.JsonConverter | 覆盖worker设置的默认value转换器。 |
topics | list | 是 | test | 连接器输入的Kafka Topic列表,多个Topic之间以半角逗号(,)分隔。 您必须为连接器设置topics来控制连接器输入的Topic。 |
连接器Connection配置
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
tablestore.endpoint | string | 是 | https://xxx.xxx.ots.aliyuncs.com | Tablestore实例的服务地址。 |
tablestore.mode | string | 是 | timeseries | 根据数据同步到的表类型选择模式。取值范围如下:
|
tablestore.access.key.id | string | 是 | LTAn******************** | 登录账号的AccessKey ID和AccessKey Secret,获取方式请参见创建AccessKey。 |
tablestore.access.key.secret | string | 是 | zbnK************************** | |
tablestore.auth.mode | string | 是 | aksk | 设置认证方式。取值范围如下:
|
tablestore.instance.name | string | 是 | myotstest | Tablestore实例的名称。 |
连接器Data Mapping配置
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
event.parse.class | class | 是 | DefaultEventParser | 消息解析器的Java类,默认值为DefaultEventParser。解析器用于从Kafka Record中解析出数据表的主键列和属性列。 Tablestore对列值大小有限制。string类型和binary类型的主键列值限制均为1 KB,属性列列值限制均为2 MB。更多信息,请参见使用限制。 如果数据类型转换后列值超出对应限制,则将该Kafka Record作为脏数据处理。 如果使用默认的DefaultEventParser解析器,Kafka Record的Key或Value必须为Kafka Connect的Struct或Map类型。 Struct中选择的字段必须为支持的数据类型,字段会根据数据类型映射表转换为Tablestore数据类型写入数据表;Map中的值类型也必须为支持的数据类型,支持的数据类型与Struct相同,最终会被转换为binary类型写入数据表。 如果Kafka Record为不兼容的数据格式,则您可以通过实现 |
table.name.format | string | 否 | kafka_<topic> | 目标数据表名称的格式字符串,默认值为 此配置项的优先级低于 |
topics.assign.tables | list | 是 | test:destTable | 指定topic与Tablestore表之间的映射关系,格式为 此配置项的优先级高于 |
primarykey.mode | string | 否 | kafka | 数据表的主键模式。取值范围如下:
请配合 |
tablestore.<tablename>.primarykey.name | list | 否 | A,B | 数据表的主键列名,其中 主键模式不同时,主键列名的配置不同。
Tablestore数据表的主键列是有顺序的,此属性的配置应注意主键列名顺序,例如PRIMARY KEY(A、B、C)与PRIMARY KEY(A、C、B)是不同的两个主键结构。 |
tablestore.<tablename>.primarykey.type | list | 否 | string, integer | 数据表的主键列数据类型,其中 主键数据类型的配置与主键模式相关。
|
tablestore.<tablename>.columns.whitelist.name | list | 否 | A,B | 数据表的属性列白名单中属性列名称,其中 如果配置为空,则使用Record Value中的所有字段(Struct类型)或者键(Map类型)作为数据表的属性列,否则用于过滤得到所需属性列。 |
tablestore.<tablename>.columns.whitelist.type | list | 否 | string, integer | 数据表的属性列白名单中属性列数据类型,其中 |
连接器Write配置
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
insert.mode | string | 否 | put | 写入模式。取值范围如下:
此属性配置不区分大小写。 |
insert.order.enable | boolean | 否 | true | 写入数据表时是否需要保持顺序。取值范围如下:
|
auto.create | boolean | 否 | false | 是否需要自动创建目标表,支持自动创建数据表或时序表。取值范围如下:
|
delete.mode | string | 否 | none | 删除模式,仅当同步数据到数据表且主键模式为record_key时才有效。取值范围如下:
此属性配置不区分大小写。 删除操作与 |
buffer.size | integer | 否 | 1024 | 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须是2的指数。 |
max.thread.count | integer | 否 | 3 | 写入数据表时的回调线程数,默认值为 |
max.concurrency | integer | 否 | 10 | 写入数据表时的最大请求并发数,默认值为10。 |
bucket.count | integer | 否 | 3 | 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。 |
flush.Interval | integer | 否 | 10000 | 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。 |
连接器Runtime Error配置
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
runtime.error.tolerance | string | 否 | none | 解析Kafka Record或者写入表时产生错误的处理策略。取值范围如下:
此属性配置不区分大小写。 |
runtime.error.mode | string | 否 | ignore | 解析Kafka Record或者写入表时产生错误,对错误的Record的处理策略。取值范围如下:
kafka模式下需要对Kafka Record的Header、Key和Value进行序列化转换,tablestore模式下需要对Kafka Record的Key和Value进行序列化转换,此处默认使用 |
runtime.error.bootstrap.servers | string | 否 | localhost:9092 | 用于记录运行错误的Kafka集群地址。 |
runtime.error.topic.name | string | 否 | errors | 用于记录运行错误的Kafka Topic名称。 |
runtime.error.table.name | string | 否 | errors | 用于记录运行错误的Tablestore表名称。 |
时序相关配置
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
配置项 | 类型 | 是否必选 | 示例值 | 描述 |
tablestore.timeseries.<tablename>.measurement | string | 是 | mName | 将JSON中的key值为指定值对应的value值作为_m_name字段写入对应时序表中。 如果设置此配置项为 配置项名称中 |
tablestore.timeseries.<tablename>.dataSource | string | 是 | ds | 将JSON中的key值为ds对应的value值作为_data_source字段写入对应时序表中。 配置项名称中 |
tablestore.timeseries.<tablename>.tags | list | 是 | region,level | 将JSON中key值为region和level所对应的value值作为tags字段写入对应时序表中。 配置项名称中 |
tablestore.timeseries.<tablename>.time | string | 是 | timestamp | 将JSON中key值为timestamp对应的value值作为_time字段写入对应时序表中。 配置项名称中 |
tablestore.timeseries.<tablename>.time.unit | string | 是 | MILLISECONDS |
配置项名称中 |
tablestore.timeseries.<tablename>.field.name | list | 否 | cpu,io | 将JSON中key值为cpu和io的键值对作为_field_name以及_field_name的值写入对应时序表。 配置项名称中 |
tablestore.timeseries.<tablename>.field.type | string | 否 | double,integer |
配置项名称中 |
tablestore.timeseries.mapAll | boolean | 否 | false | 将输入JSON中的非主键字段和时间字段都作为field存储到时序表中。 当配置项取值为false时, |
tablestore.timeseries.toLowerCase | boolean | 否 | true | 将field中的key(输入数据中非主键或者时间的key,或配置在 |
tablestore.timeseries.rowsPerBatch | integer | 否 | 50 | 写入tablestore时,一次请求支持写入的最大行数。最大值为200,默认值为200。 |
附录:Kafka和Tablestore数据类型映射
Kafka和Tablestore数据类型映射关系请参见下表。
Kafka Schema Type | Tablestore数据类型 |
Kafka Schema Type | Tablestore数据类型 |
STRING | STRING |
INT8、INT16、INT32、INT64 | INTEGER |
FLOAT32、FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
附录:删除语义
只有同步数据到数据表时才支持此功能。
当同步数据到数据表且Kafka消息记录的value中存在空值时,根据写入模式(insert.mode)和删除模式(delete.mode)的不同设置,数据写入到表格存储数据表的处理方式不同,详细说明请参见下表。
insert.mode | put | update |
insert.mode | put | update | ||||||
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
value为空值 | 覆盖写 | 删行 | 覆盖写 | 删行 | 脏数据 | 删行 | 脏数据 | 删行 |
value所有字段值均为空值 | 覆盖写 | 覆盖写 | 覆盖写 | 覆盖写 | 脏数据 | 脏数据 | 删列 | 删列 |
value部分字段值为空值 | 覆盖写 | 覆盖写 | 覆盖写 | 覆盖写 | 忽略空值 | 忽略空值 | 删列 | 删列 |
- 本页导读 (1)
- 配置示例
- 配置项说明
- Kafka Connect常见配置
- 连接器Connection配置
- 连接器Data Mapping配置
- 连接器Write配置
- 连接器Runtime Error配置
- 时序相关配置
- 附录:Kafka和Tablestore数据类型映射
- 附录:删除语义