本文介绍使用函数计算作为Transform时的背景信息、注意事项及操作步骤。
背景信息
当Transform选择阿里云的函数计算时,您可以通过编写函数代码对事件进行更复杂、更加定制化的处理。整体流程如图所示。
源端(Source)拉取事件后,先经过攒批,达到攒批的条件后,攒批的事件会进入Filter阶段。
Filter阶段会对攒批的事件中的每条事件进行判断,决定是否过滤该事件。经过攒批和Filter过滤的事件会进入Transform阶段。
Transform会对攒批和Filter过滤的事件进一步处理,以确保发送给函数的多条事件符合Payload限制。如果攒批和Filter过滤的事件总大小超过了Payload限制,则将其拆分成符合Payload限制的事件数量,依次调用函数进行处理。
Transform调用完函数之后,会将函数返回的内容推送给目标端(Sink)。
注意事项
项目 | 说明 |
项目 | 说明 |
函数调用方式 | 仅支持同步方式请求函数计算,函数请求的Payload值限制为16 MB。 |
函数入参协议 |
关于CloudEvents事件格式的更多信息,请参见事件概述。 |
函数执行超时时间 |
|
函数返回值 | 函数可以返回任何值。但需注意以下事项:
|
异常处理机制 | Transform的异常处理策略与当前任务的异常处理策略保持一致,即“任务属性”对Transform也生效。 |
返回值与实际值对比
由于目前函数计算的限制,函数使用不同Runtime时,函数代码中返回的内容与函数“真正”的返回值即事件流接收到的内容存在不一致现象。以Python、Node、Go为例,差异如下。
Runtime | 函数返回的内容 | 事件流接收的内容 |
Runtime | 函数返回的内容 | 事件流接收的内容 |
Python 3.9 | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | ["test1", "test2"] | |
"test" | test | |
"\"test\"" | "test" | |
Node 14 | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | ["test1", "test2"] | |
"test" | test | |
"\"test\"" | "test" | |
Go 1.x | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | "[\"test1\", \"test2\"]" | |
"test" | "test" | |
"\"test\"" | "\"test\"" |
前提条件
步骤一:创建事件流
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
在创建事件流面板,设置任务名称和描述,配置以下参数。
任务创建
在Source(源)配置向导,选择数据提供方,本文以云消息队列 Kafka 版为例,配置以下参数,然后单击下一步。
参数
说明
示例
参数
说明
示例
地域
选择云消息队列 Kafka 版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列 Kafka 版消息的源实例。
alikafka_post_115964845466****_ByBeUp3p
Topic
选择生产云消息队列 Kafka 版消息的Topic。
topic
Group ID
选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
快速创建
消费位点
选择开始消费消息的位点。
最新位点
网络配置
选择路由消息的网络类型。
基础网络
专有网络VPC
选择VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
批量推送条数(可选)
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)(可选)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,配置模式内容,默认值为
{}
,表示不过滤任何事件。更多信息,请参见事件模式。Filtering为可选步骤,可以点击流程中右上角的删除按钮,删除Filtering步骤;也可以点击流程中右上角的+Filtering(过滤)添加Filtering步骤 ,Filtering步骤在链路中的位置不会改变。
在 Transform(转换) 配置向导,设置选择阿里云服务为函数计算,设置以下参数。本示例以新建函数模板为例,使用默认生成的服务、函数及函数模板。
配置方式
参数
说明
配置方式
参数
说明
新建函数模板
服务
函数计算中的服务名。提供了系统生成的随机值,可以根据需要编辑。当任务保存之后,会在函数计算中创建对应名字的服务。
函数
函数计算中的函数名。提供了系统生成的值,可以根据需要编辑。当任务保存之后,会在函数计算中创建对应名字的函数。
函数模板
默认提供四种函数模板:
内容分割
内容映射
内容富化
动态路由
可以使用提供的简易版函数代码编辑框,对函数模板中的代码进行编辑和调试,完成编辑之后,无需保存,当任务保存之后,函数中使用的代码为编辑框中的代码。
绑定现有函数
服务
选择函数计算中已创建的服务。
函数
选择上一步选择的服务中的函数。
版本和别名
选择服务的版本或别名。
在Sink(目标)配置向导,选择服务类型,配置相关参数。本示例选择函数计算。
参数
说明
示例
参数
说明
示例
服务
选择已创建的函数计算的服务。
test
函数
选择已创建的函数计算的函数。
test
服务版本和别名
选择服务版本或服务别名。
默认版本
执行方式
选择同步执行或异步执行。
异步
事件
选择事件内容转换类型。更多信息,请参考事件内容转换。
完整事件
任务属性(可选)
设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。本示例配置重试策略为退避重试、容错策略为允许容错,不使用死信队列。
配置完成后,单击保存。
返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用。
启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。
步骤二:测试事件流
在事件流页面,找到创建好的事件流,单击事件源列的Kafka的Topic,跳转至云消息队列 Kafka 版控制台,单击右上角的体验发送消息。
在快速体验消息收发面板,设置发送方式、消息key、消息内容,单击确定。
查看Transform中使用的函数是否收到了请求。
回到事件流页面单击操作列的详情,单击Transform(转换)区域的函数名称。
单击调用日志,查看是否有对应的日志。如果调用日志中的调用结果列显示调用成功,则表示 Transform执行成功。
如果没有开通日志服务,请先开通日志服务,之后需要重新发送 Kafka 消息。
点击监控指标,查看是否有调用次数。如果有调用次数,且错误次数为0,则表示Sink成功收到了Transform后的内容。
查看Sink中使用的函数是否收到了请求。
回到事件流页面单击操作列的详情,单击事件目标列的函数名称。
单击调用日志,查看是否有对应的日志。如果调用日志中的调用结果列显示调用成功,则表示 Transform执行成功。
如果没有开通日志服务,请先开通日志服务,之后需要重新发送 Kafka 消息。
点击监控指标,查看是否有调用次数。如果有调用次数,且错误次数为0,则表示Sink成功收到了Transform后的内容。
相关文档
- 本页导读 (1)
- 背景信息
- 注意事项
- 返回值与实际值对比
- 前提条件
- 步骤一:创建事件流
- 步骤二:测试事件流
- 相关文档