本文介绍使用函数计算作为Transform时的背景信息、注意事项及操作步骤。
背景信息
当Transform选择阿里云的函数计算时,您可以通过编写函数代码对事件进行更复杂、更加定制化的处理。整体流程如图所示。
源端(Source)拉取事件后,先经过攒批,达到攒批的条件后,攒批的事件会进入Filter阶段。
Filter阶段会对攒批的事件中的每条事件进行判断,决定是否过滤该事件。经过攒批和Filter过滤的事件会进入Transform阶段。
Transform会对攒批和Filter过滤的事件进一步处理,以确保发送给函数的多条事件符合Payload限制。如果攒批和Filter过滤的事件总大小超过了Payload限制,则将其拆分成符合Payload限制的事件数量,依次调用函数进行处理。
Transform调用完函数之后,会将函数返回的内容推送给目标端(Sink)。
注意事项
项目 | 说明 |
函数调用方式 | 仅支持同步方式请求函数计算,函数请求的Payload值限制为16 MB。 |
函数入参协议 |
关于CloudEvents事件格式的更多信息,请参见事件概述。 |
函数执行超时时间 |
|
函数返回值 | 函数可以返回任何值。但需注意以下事项:
|
异常处理机制 | Transform的异常处理策略与当前任务的异常处理策略保持一致,即“任务属性”对Transform也生效。 |
返回值与实际值对比
由于目前函数计算的限制,函数使用不同Runtime时,函数代码中返回的内容与函数“真正”的返回值即事件流接收到的内容存在不一致现象。以Python、Node、Go为例,差异如下。
Runtime | 函数返回的内容 | 事件流接收的内容 |
Python 3.9 | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | ["test1", "test2"] | |
"test" | test | |
"\"test\"" | "test" | |
Node 14 | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | ["test1", "test2"] | |
"test" | test | |
"\"test\"" | "test" | |
Go 1.x | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | "[\"test1\", \"test2\"]" | |
"test" | "test" | |
"\"test\"" | "\"test\"" |
前提条件
步骤一:创建事件流
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流面板,设置任务名称和描述,配置以下参数。
任务创建
在Source(源)配置向导,选择数据提供方,本文以云消息队列 Kafka 版为例,配置以下参数,然后单击下一步。
参数
说明
示例
地域
选择云消息队列 Kafka 版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列 Kafka 版消息的源实例。
alikafka_post_115964845466****_ByBeUp3p
Topic
选择生产云消息队列 Kafka 版消息的Topic。
topic
Group ID
选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
快速创建
消费位点
选择开始消费消息的位点。
最新位点
网络配置
选择路由消息的网络类型。
基础网络
专有网络VPC
选择VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
批量推送条数(可选)
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)(可选)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,配置模式内容,默认值为
{}
,表示不过滤任何事件。更多信息,请参见事件模式。说明Filtering为可选步骤,可以点击流程中右上角的删除按钮,删除Filtering步骤;也可以点击流程中右上角的+Filtering(过滤)添加Filtering步骤 ,Filtering步骤在链路中的位置不会改变。
在 Transform(转换) 配置向导,设置选择阿里云服务为函数计算,设置以下参数。本示例以新建函数模板为例,使用默认生成的服务、函数及函数模板。
配置方式
参数
说明
新建函数模板
服务
函数计算中的服务名。提供了系统生成的随机值,可以根据需要编辑。当任务保存之后,会在函数计算中创建对应名字的服务。
函数
函数计算中的函数名。提供了系统生成的值,可以根据需要编辑。当任务保存之后,会在函数计算中创建对应名字的函数。
函数模板
默认提供四种函数模板:
内容分割
内容映射
内容富化
动态路由
可以使用提供的简易版函数代码编辑框,对函数模板中的代码进行编辑和调试,完成编辑之后,无需保存,当任务保存之后,函数中使用的代码为编辑框中的代码。
绑定现有函数
服务
选择函数计算中已创建的服务。
函数
选择上一步选择的服务中的函数。
版本和别名
选择服务的版本或别名。
在Sink(目标)配置向导,选择服务类型,配置相关参数。本示例选择函数计算。
参数
说明
示例
服务
选择已创建的函数计算的服务。
test
函数
选择已创建的函数计算的函数。
test
服务版本和别名
选择服务版本或服务别名。
默认版本
执行方式
选择同步执行或异步执行。
异步
事件
选择事件内容转换类型。更多信息,请参考事件内容转换。
完整事件
任务属性(可选)
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。本示例配置重试策略为退避重试、容错策略为允许容错,不使用死信队列。
配置完成后,单击保存。
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
步骤二:测试事件流
在事件流页面,找到创建好的事件流,单击事件源列的Kafka的Topic,跳转至云消息队列 Kafka 版控制台,单击右上角的体验发送消息。
在快速体验消息收发面板,设置发送方式、消息key、消息内容,单击确定。
查看Transform中使用的函数是否收到了请求。
回到事件流页面单击操作列的详情,单击Transform(转换)区域的函数名称。
单击调用日志,查看是否有对应的日志。如果调用日志中的调用结果列显示调用成功,则表示 Transform执行成功。
说明如果没有开通日志服务,请先开通日志服务,之后需要重新发送 Kafka 消息。
点击监控指标,查看是否有调用次数。如果有调用次数,且错误次数为0,则表示Sink成功收到了Transform后的内容。
查看Sink中使用的函数是否收到了请求。
回到事件流页面单击操作列的详情,单击事件目标列的函数名称。
单击调用日志,查看是否有对应的日志。如果调用日志中的调用结果列显示调用成功,则表示 Transform执行成功。
说明如果没有开通日志服务,请先开通日志服务,之后需要重新发送 Kafka 消息。
点击监控指标,查看是否有调用次数。如果有调用次数,且错误次数为0,则表示Sink成功收到了Transform后的内容。