本文以云消息队列 Kafka 版作为事件源为例,介绍如何在事件总线EventBridge控制台添加云原生大数据计算服务 MaxCompute作为事件流中的事件目标以实现将云消息队列 Kafka 版中的消息写入到云原生大数据计算服务 MaxCompute中进行数据计算和分析。
前提条件
计费说明
Connector任务运行在函数计算平台,任务加工传输消耗的计算资源将按函数计算的单价计费。函数计算计费信息,请参见计费概述和【产品变更】函数计算定向减免消息类产品和云工作流的函数调用次数费用。
工作流程
事件总线将云消息队列 Kafka 版路由到云原生大数据计算服务 MaxCompute的过程如下:
配置事件链路:用户配置事件源为云消息队列 Kafka 版,添加事件目标为云原生大数据计算服务 MaxCompute,建立起从Kafka到MaxCompute的数据传输通道。
数据流动:事件总线从云消息队列 Kafka 版中拉取数据,经过攒批、过滤、转换之后将数据写入云原生大数据计算服务 MaxCompute。
数据计算和分析:用户可基于云原生大数据计算服务 MaxCompute对写入的数据进行计算和分析。
这一过程,不仅提高了数据处理的时效性,还确保了数据流转的高效性和可靠性,助力用户构建灵活、可扩展的事件驱动架构。
步骤一:创建事件目标服务资源
在云原生大数据计算服务 MaxCompute控制台创建一个Project并在Project中创建表。
通过MaxCompute控制台创建Project。具体操作,请参见创建MaxCompute项目。
通过MaxCompute控制台创建表。具体操作,请参见创建表。
步骤二:创建事件流
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流页面,设置任务名称和描述,配置以下参数,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列Kafka版,并配置相关参数。
参数
说明
示例
地域
选择云消息队列Kafka版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列Kafka版消息的源实例。
alikafka_post-cn-jte3****
Topic
选择生产云消息队列Kafka版消息的 Topic。
topic
Group ID
数据源所在的云消息队列 Kafka 版实例的Group ID。
快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。
使用已有
消费位点
选择开始消费消息的位点。
最新位点:从最新位点开始消费。
最早位点:从最初位点开始消费。
最新位点
网络配置
选择路由消息的网络类型。
基础网络:将默认打通实例间的网络连接,仅支持非跨境场景的数据传输使用。
自建公网:若配置项涉及跨境传输则需自行配置VPC网络,请选择带有公网NAT网关的VPC资源。
基础网络
专有网络VPC
选择 VPC ID。仅当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择 vSwitch ID。仅当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。仅当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
数据格式
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力,支持多种数据格式编码:
Json( Json 格式编码,二进制数据按照 utf8 编码为 Json 格式放入 Payload)
Text(默认文本格式编码,二进制数据按照 utf8 编码为字符串放入 Payload)
Binary(二进制格式编码,二进制数据按照 Base64 编码为字符串放入 Payload)
Json( Json 格式编码,二进制数据按照 utf8 编码为 Json 格式放入 Payload)
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为[1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见事件内容转换。
在Sink(目标)配置向导,选择服务类型为大数据计算服务 Maxcompute,配置相关参数。
参数
说明
示例
账号AccessKey ID
阿里云账号的AccessKey ID,用于访问MaxCompute服务。
LTAI5tHPVCZywsoEVvFu****
账号AccessKey Secret
将数据写入MaxCompute表时使用的 AccessKey Secret。
说明所填AccessKey ID和AccessKey Secret必须要有写入MaxCompute表的权限。具体操作,请参见授权给其他用户。
4RAKUQpZtUntDgvoKu0tvrkrOM****
MaxCompute项目名称
选择待写入数据的MaxCompute项目。
mc_sink_project
MaxCompute表名称
选择待写入数据的MaxCompute表。
table
MaxCompute表入参
选择MaxCompute表后,此处会展示表的列名和类型信息,配置数据提取规则即可。
$.data.value.key
分区配置
关闭:不开启分区能力。
开启:开启分区能力。
仅当分区配置为开启时需配置此参数。
支持{yyyy}、{MM}、{dd}、{HH}、{mm}时间变量参数,分别对应年、月、日、时、分。时间变量大小写敏感。
支持填写常量。
开启
{yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
网络配置
选择路由消息的网络类型。
专有网络:通过专有网络VPC将上游数据投递到MaxCompute。
公网:通过公网将上游数据投递到MaxCompute。
公网
专有网络VPC
选择 VPC ID。仅当网络配置为专有网络时需要配置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择 vSwitch ID。仅当网络配置为专有网络时需要配置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。仅当网络配置为专有网络时需要配置此参数。
alikafka_pre-cn-7mz2****
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
任务启动
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
步骤三:测试MaxCompute Sink Connector
在事件流页面,在MaxCompute Sink Connector任务的事件源列单击源Topic。
在Topic详情页面,单击体验发送消息。
在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
在事件流页面,在 MaxCompute Sink Connector 任务的事件目标列单击MaxCompute项目。
在MaxCompute控制台顶部菜单栏选择地域,然后在左侧导航栏选择 ,输入
SELECT * FROM <table>;
查询语句并单击运行查看结果。