本文介绍如何创建使用Tablestore Sink Connector,您可通过Tablestore Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至Tablestore的表中。
前提条件
创建资源及授权策略,请参见创建前提。
创建Connector时生成的函数所在的服务角色需要手动添加
AliyunOTSFullAccess
权限策略,赋予此服务角色管理表格存储服务(OTS)的权限。具体操作, 请参见授予函数计算访问其他云服务的权限。
步骤一:创建目标服务资源
开通表格存储服务。具体操作,请参见步骤一:开通表格存储服务。
创建表格存储实例。具体操作,请参见步骤二:创建实例。
创建一个数据表,具体操作,请参见步骤三:创建数据表。
本文以实例为ots-sink和数据表名为ots_sink_table的资源为例,创建数据表时设置pk1、pk2两个主键。
步骤二:创建Tablestore Sink Connector并启动
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
在创建任务页面,设置任务名称和描述,配置以下信息,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 Kafka 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
选择云消息队列 Kafka 版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列 Kafka 版消息的源实例。
MQ_INST_115964845466****_ByBeUp3p
Topic
选择生产云消息队列 Kafka 版消息的Topic。
topic
Group ID
选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
GID_http_1
并发配额(消费者数)
选择源实例的消费者数。
1
消费位点
选择开始消费消息的位点。
最新位点
网络配置
选择路由消息的网络类型。
默认网络
专有网络VPC
选择VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
批量推送
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。
例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。
开启
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见数据清洗。
在Sink(目标)配置向导,选择服务类型为表格存储TableStore,配置以下参数。
参数
说明
示例
实例名称
已创建的Tablestore实例名称。
ost-sink
目标表
已创建的Tablestore数据表名称。
ost_sink_table
表格存储模型
目前仅支持宽表模型,如有其他模型需求请提交工单申请。
宽表模型
自定义主键生成模式
Tablestore主键值生成方式,需定义每个主键内容的提取规则,提取规则通过JsonPath语法表达。
例如,主键名称为id,数值提取规则配置为
$.value.id
列生成模式
Tablestore列值生成方式。
无
消息Key存储格式
Kafka的Key字段的存储方式。
String模式:将Key字段的内容转换为String类型后存储到一列中,列名为key。
JSON模式:将JSON中的每一个字段存储一列,列名为key_{fieldName}。
String模式
消息Value存储格式
Kafka的Value字段的存储方式。
String模式:将Value字段的内容转换为String类型后存储到一列中,列名为value。
JSON模式:将JSON中的每一个字段存储一列,列名为value_{fieldName}。
String模式
Json消息字段类型转换
选择的各字段值写入Table列中的实际类型。
全部作为String类型写入:所有字段均转换为String类型后存储至Table中。
自动识别字段类型:会自动识别字段类型并存储至Table中。
全部作为String类型写入
写入模式
数据写入Tablestore的方式。
put:覆盖写入行。
update:更新写入行。
put
网络配置
专有网络:通过专有网络VPC将Kafka消息投递到Tablestore。
公网:通过公网将Kafka消息投递到Tablestore。
公网
VPC
选择VPC ID。仅当网络配置为专有网络时需配置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。仅当网络配置为专有网络时需配置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。仅当网络配置为专有网络时需配置此参数。
test_group
批量推送
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔两个条件达到任意一个时即会触发批量推送。例如:您设置的推送条数为100条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等待15 s后再推送。
开启:开启批量推送功能。
关闭:关闭批量推送功能。默认每次仅投递给FC一条消息。
关闭
批量推送条数
一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求。取值范围为[1,10000]。仅当批量推送参数设置为开启时需要配置此参数。
100
批量推送间隔
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算。取值范围为[0,15],单位:秒。取值为0表示没有等待时间,直接投递。仅当批量推送参数设置为开启时需要配置此参数。
10
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用。
在提示对话框,阅读提示信息,然后单击确认。
启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。
步骤三:测试Tablestore Sink Connector
在任务列表页面,在Tablestore Sink Connector任务的事件源列单击源Topic。
在Topic详情页面,单击体验发送消息。
在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
在任务列表页面,在Tablestore Sink Connector任务的事件目标列单击目标表名。
在表管理页面,单击数据管理页签,然后单击查询数据,设置查询范围,单击确定。
- 本页导读 (1)