本文以云消息队列 Kafka 版作为事件源为例,介绍如何在事件总线EventBridge控制台添加表格存储(Tablestore)作为事件流中的事件目标以实现将云消息队列 Kafka 版中的消息写入到表格存储(Tablestore)中进行数据存储和管理。
前提条件
开通消息队列 Kafka,并完成kafka实例的创建和部署。具体操作,请参见购买和部署实例。
注意事项
计费说明
Connector任务运行在阿里云函数计算平台,任务加工传输消耗的计算资源将按函数计算的单价计费。函数计算计费信息,请参见计费概述和【产品变更】函数计算定向减免消息类产品和云工作流的函数调用次数费用。
工作流程
事件总线将云消息队列 Kafka 版数据路由到表格存储(Tablestore)的过程如下:
配置事件链路:用户配置事件源为云消息队列 Kafka 版,添加事件目标为表格存储(Tablestore),建立起从Kafka到Tablestore的数据传输通道。
数据流动:事件总线从云消息队列 Kafka 版中拉取数据,经过攒批、过滤、转换之后将数据写入表格存储(Tablestore)。
数据存储与管理:用户可基于表格存储(Tablestore)对写入的数据进行存储和管理。
这一过程不仅提高了数据处理的时效性,还确保了数据流转的高效性和可靠性,助力用户构建灵活、可扩展的事件驱动架构。
步骤一:创建目标服务资源
创建表格存储实例和数据表。具体操作,请参见通过控制台使用宽表模型。
步骤二:创建事件流
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流页面,设置任务名称和描述,配置以下参数。
在Source(源)配置向导,选择数据提供方为消息队列Kafka版,并配置相关参数。
参数
说明
示例
地域
选择云消息队列Kafka版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列Kafka版消息的源实例。
alikafka_post-cn-jte3****
Topic
选择生产云消息队列Kafka版消息的 Topic。
topic
Group ID
数据源所在的云消息队列 Kafka 版实例的Group ID。
快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。
使用已有
消费位点
选择开始消费消息的位点。
最新位点:从最新位点开始消费。
最早位点:从最初位点开始消费。
最新位点
网络配置
选择路由消息的网络类型。
基础网络:将默认打通实例间的网络连接,仅支持非跨境场景的数据传输使用。
自建公网:若配置项涉及跨境传输则需自行配置VPC网络,请选择带有公网NAT网关的VPC资源。
基础网络
专有网络VPC
选择 VPC ID。仅当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择 vSwitch ID。仅当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。仅当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
数据格式
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力,支持多种数据格式编码:
Json( Json 格式编码,二进制数据按照 utf8 编码为 Json 格式放入 Payload)
Text(默认文本格式编码,二进制数据按照 utf8 编码为字符串放入 Payload)
Binary(二进制格式编码,二进制数据按照 Base64 编码为字符串放入 Payload)
Json( Json 格式编码,二进制数据按照 utf8 编码为 Json 格式放入 Payload)
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为[1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见事件内容转换。
在Sink(目标)配置向导,选择服务类型为表格存储TableStore,配置相关参数,然后单击保存。
参数
说明
示例
实例名称
选择待写入数据的Tablestore实例。
ts_sink_test_1
目标表
选择待写入数据的Tablestore表。
table
主键
填写Tablestore中主键列的主键值,仅支持配置JsonPath语法。
$.data.value.key
属性列
填写针对Tablestore中属性列名称,及其属性类型、属性值、存储格式。
属性列名称:支持常量或JsonPath的方式,不支持两者混合使用。如果选择JsonPath的方式,则将JsonPath提取后的内容作为列名。
属性类型:数值提取后的类型。
属性值:仅支持JsonPath的方式。
存储格式:支持默认和JSON两种存储格式。
默认:直接存储提取到的数值,比如通过属性值提取了test1字符串,则直接以字符串类型存储test1。
JSON:如果通过属性值提取到的是JSON内容,则会遍历该JSON内容进行存储。比如属性列名称配置为column,且属性值提取到的是
{"jsonKey":"jsonValue"}
,则实际存储的列名则是“column_jsonKey”
,存储的属性值是“jsonValue”
。
属性列名称:column
属性类型:字符串
属性值:
$.data.value.name
存储格式:默认
写入模式
选择数据写入的模式。
put:当两条数据主键相同时,新数据会覆盖老数据。
update:当两条数据主键相同时,只会在此行中写入增量列,不会删除存量列。
put
网络配置
选择路由消息的网络类型。
公网
专有网络VPC
选择 VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择 vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
任务属性。
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
任务启动。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看是否启动成功。
步骤三:测试Tablestore Sink Connector
在事件流页面,在Tablestore Sink Connector任务的事件源列单击源Topic名称。
在Topic 详情页面,单击体验发送消息。
在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
返回事件流页面,在Table Sink Connector任务的事件目标列单击目标Tablestore表名称。
在表管理页面,单击数据管理页签,然后单击查询数据,设置查询范围,单击确定。