路由到表格存储Tablestore

本文以云消息队列 Kafka 版作为事件源为例,介绍如何在事件总线EventBridge控制台添加表格存储(Tablestore)作为事件流中的事件目标以实现将云消息队列 Kafka 版中的消息写入到表格存储(Tablestore)中进行数据存储和管理。

前提条件

注意事项

  1. 事件总线(EventBridge)目前仅支持宽表模型、时序表作为表格存储(Tablestore)的数据存储模型,如有其他模型需求提交工单申请。

  2. 写入Tablestore的单条内容,最大为20KB。更多Tablestore的限制,请参见通用限制

计费说明

Connector任务运行在阿里云函数计算平台,任务加工传输消耗的计算资源将按函数计算的单价计费。函数计算计费信息,请参见计费概述【产品变更】函数计算定向减免消息类产品和云工作流的函数调用次数费用

工作流程

事件总线云消息队列 Kafka 版数据路由到表格存储(Tablestore)的过程如下:

  1. 配置事件链路:用户配置事件源为云消息队列 Kafka 版,添加事件目标为表格存储(Tablestore),建立起从Kafka到Tablestore的数据传输通道。

  2. 数据流动:事件总线云消息队列 Kafka 版中拉取数据,经过攒批、过滤、转换之后将数据写入表格存储(Tablestore)

  3. 数据存储与管理:用户可基于表格存储(Tablestore)对写入的数据进行存储和管理。

image

这一过程不仅提高了数据处理的时效性,还确保了数据流转的高效性和可靠性,助力用户构建灵活、可扩展的事件驱动架构。

步骤一:创建目标服务资源

创建表格存储实例和数据表。具体操作,请参见通过控制台使用宽表模型

步骤二:创建事件流

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流页面,设置任务名称描述,配置以下参数。

    1. Source(源)配置向导,选择数据提供方消息队列Kafka版,并配置相关参数。

      image

      参数

      说明

      示例

      地域

      选择云消息队列Kafka版源实例所在的地域。

      华北2(北京)

      kafka 实例

      选择生产云消息队列Kafka版消息的源实例。

      alikafka_post-cn-jte3****

      Topic

      选择生产云消息队列Kafka版消息的 Topic。

      topic

      Group ID

      数据源所在的云消息队列 Kafka 版实例的Group ID。

      • 快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。

      • 使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。

      使用已有

      消费位点

      选择开始消费消息的位点。

      • 最新位点:从最新位点开始消费。

      • 最早位点:从最初位点开始消费。

      最新位点

      网络配置

      选择路由消息的网络类型。

      • 基础网络:将默认打通实例间的网络连接,仅支持非跨境场景的数据传输使用。

      • 自建公网:若配置项涉及跨境传输则需自行配置VPC网络,请选择带有公网NAT网关的VPC资源。

      基础网络

      专有网络VPC

      选择 VPC ID。仅当网络配置设置为自建公网时需要设置此参数。

      vpc-bp17fapfdj0dwzjkd****

      交换机

      选择 vSwitch ID。仅当网络配置设置为自建公网时需要设置此参数。

      vsw-bp1gbjhj53hdjdkg****

      安全组

      选择安全组。仅当网络配置设置为自建公网时需要设置此参数。

      alikafka_pre-cn-7mz2****

      数据格式

      数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力,支持多种数据格式编码:

      • Json( Json 格式编码,二进制数据按照 utf8 编码为 Json 格式放入 Payload)

      • Text(默认文本格式编码,二进制数据按照 utf8 编码为字符串放入 Payload)

      • Binary(二进制格式编码,二进制数据按照 Base64 编码为字符串放入 Payload)

      Json( Json 格式编码,二进制数据按照 utf8 编码为 Json 格式放入 Payload)

      批量推送条数

      调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为[1,10000]。

      100

      批量推送间隔(单位:秒)

      调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

      3

    2. Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤

      image

    3. Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见事件内容转换

    4. Sink(目标)配置向导,选择服务类型表格存储TableStore,配置相关参数,然后单击保存

      image

      参数

      说明

      示例

      实例名称

      选择待写入数据的Tablestore实例。

      ts_sink_test_1

      目标表

      选择待写入数据的Tablestore表。

      table

      主键

      填写Tablestore中主键列的主键值,仅支持配置JsonPath语法。

      $.data.value.key

      属性列

      填写针对Tablestore中属性列名称,及其属性类型属性值存储格式

      1. 属性列名称:支持常量或JsonPath的方式,不支持两者混合使用。如果选择JsonPath的方式,则将JsonPath提取后的内容作为列名。

      2. 属性类型:数值提取后的类型。

      3. 属性值:仅支持JsonPath的方式。

      4. 存储格式:支持默认JSON两种存储格式。

        1. 默认:直接存储提取到的数值,比如通过属性值提取了test1字符串,则直接以字符串类型存储test1。

        2. JSON:如果通过属性值提取到的是JSON内容,则会遍历该JSON内容进行存储。比如属性列名称配置为column,且属性值提取到的是 {"jsonKey":"jsonValue"},则实际存储的列名则是“column_jsonKey”,存储的属性值是“jsonValue”

      属性列名称:column

      属性类型字符串

      属性值$.data.value.name

      存储格式默认

      写入模式

      选择数据写入的模式。

      • put:当两条数据主键相同时,新数据会覆盖老数据。

      • update:当两条数据主键相同时,只会在此行中写入增量列,不会删除存量列。

      put

      网络配置

      选择路由消息的网络类型。

      公网

      专有网络VPC

      选择 VPC ID。当网络配置设置为自建公网时需要设置此参数。

      vpc-bp17fapfdj0dwzjkd****

      交换机

      选择 vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

      vsw-bp1gbjhj53hdjdkg****

      安全组

      选择安全组。当网络配置设置为自建公网时需要设置此参数。

      alikafka_pre-cn-7mz2****

    5. 任务属性。

      配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

    6. 任务启动。

      启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看是否启动成功。

      hepHJW7QXr

步骤三:测试Tablestore Sink Connector

  1. 事件流页面,在Tablestore Sink Connector任务的事件源列单击源Topic名称。

    kNcfddZXLe

  2. Topic 详情页面,单击体验发送消息

  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定

    peWAqSvFzz

  4. 返回事件流页面,在Table Sink Connector任务的事件目标列单击目标Tablestore表名称。

    TRoHPOs2DN

  5. 表管理页面,单击数据管理页签,然后单击查询数据,设置查询范围,单击确定

    5wlOH7znyx