使用函数计算作为Transform

本文介绍使用函数计算作为Transform时的背景信息、注意事项及操作步骤。

背景信息

当Transform选择阿里云的函数计算时,您可以通过编写函数代码对事件进行更复杂、更加定制化的处理。整体流程如图所示。

image
  1. 源端(Source)拉取事件后,先经过攒批,达到攒批的条件后,攒批的事件会进入Filter阶段。

  2. Filter阶段会对攒批的事件中的每条事件进行判断,决定是否过滤该事件。经过攒批和Filter过滤的事件会进入Transform阶段。

  3. Transform会对攒批和Filter过滤的事件进一步处理,以确保发送给函数的多条事件符合Payload限制。如果攒批和Filter过滤的事件总大小超过了Payload限制,则将其拆分成符合Payload限制的事件数量,依次调用函数进行处理。

  4. Transform调用完函数之后,会将函数返回的内容推送给目标端(Sink)。

注意事项

项目

说明

函数调用方式

仅支持同步方式请求函数计算,函数请求的Payload值限制为16 MB。

函数入参协议

  • 多条CloudEvents,CloudEvents的入参格式为[cloudEvents1,cloudEvents2]

  • 单条CloudEvents,CloudEvents的入参格式为[cloudEvents1]

关于CloudEvents事件格式的更多信息,请参见事件概述

函数执行超时时间

  • 如果您通过函数模板创建函数,则新建函数的默认超时时间为60 s。如果60 s不满足业务需求,可在函数创建后跳转至函数计算控制台自定义修改,更多信息,请参见管理函数

  • 如果您选择绑定自己创建的函数,则超时时间由您的函数决定。

函数返回值

函数可以返回任何值。但需注意以下事项:

  • 由于目前函数计算的限制,函数在使用不同Runtime时,函数代码中返回的内容与函数“真正”的返回值(也就是事件流接收到的内容)存在不一致的现象,详见下文返回值与实际值的对比。此时,发送给目标端的数据格式,以“真正”的返回值为准,也就是以事件流接收到的内容为准。

  • 返回值大小受目标端的限制。

  • 针对目标端为非函数计算的产品来说,例如轻量消息队列(原 MNS)云消息队列 RocketMQ 版云消息队列 Kafka 版等产品。返回值中单条数据的大小,不能超过下游服务能接收的最大值。具体来说,如果是非数组格式的数据,则返回值不能超过下游服务能接收的最大值;如果是数组格式的数据,则数组中的每条数据都不能超过下游服务能接收的最大值。

  • 针对目标端为函数计算来说。返回值不能超过16 MB。

异常处理机制

Transform的异常处理策略与当前任务的异常处理策略保持一致,即“任务属性”对Transform也生效。

返回值与实际值对比

由于目前函数计算的限制,函数使用不同Runtime时,函数代码中返回的内容与函数“真正”的返回值即事件流接收到的内容存在不一致现象。以Python、Node、Go为例,差异如下。

Runtime

函数返回的内容

事件流接收的内容

Python 3.9

["test1", "test2"]

["test1", "test2"]

"[\"test1\", \"test2\"]"

["test1", "test2"]

"test"

test

"\"test\""

"test"

Node 14

["test1", "test2"]

["test1", "test2"]

"[\"test1\", \"test2\"]"

["test1", "test2"]

"test"

test

"\"test\""

"test"

Go 1.x

["test1", "test2"]

["test1", "test2"]

"[\"test1\", \"test2\"]"

"[\"test1\", \"test2\"]"

"test"

"test"

"\"test\""

"\"test\""

前提条件

步骤一:创建事件流

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板,设置任务名称描述,配置以下参数。

    • 任务创建

      1. Source(源)配置向导,选择数据提供方,本文以云消息队列 Kafka 版为例,配置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择云消息队列 Kafka 版源实例所在的地域。

        华北2(北京)

        kafka 实例

        选择生产云消息队列 Kafka 版消息的源实例。

        alikafka_post_115964845466****_ByBeUp3p

        Topic

        选择生产云消息队列 Kafka 版消息的Topic。

        topic

        Group ID

        选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。

        快速创建

        消费位点

        选择开始消费消息的位点。

        最新位点

        网络配置

        选择路由消息的网络类型。

        基础网络

        专有网络VPC

        选择VPC ID。当网络配置设置为自建公网时需要设置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。当网络配置设置为自建公网时需要设置此参数。

        alikafka_pre-cn-7mz2****

        批量推送条数(可选)

        批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)(可选)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,配置模式内容,默认值为{},表示不过滤任何事件。更多信息,请参见事件模式

        说明

        Filtering为可选步骤,可以点击流程中右上角的删除按钮,删除Filtering步骤;也可以点击流程中右上角的+Filtering(过滤)添加Filtering步骤 ,Filtering步骤在链路中的位置不会改变。

      3. Transform(转换) 配置向导,设置选择阿里云服务函数计算,设置以下参数。本示例以新建函数模板为例,使用默认生成的服务、函数及函数模板。

        配置方式

        参数

        说明

        新建函数模板

        服务

        函数计算中的服务名。提供了系统生成的随机值,可以根据需要编辑。当任务保存之后,会在函数计算中创建对应名字的服务。

        函数

        函数计算中的函数名。提供了系统生成的值,可以根据需要编辑。当任务保存之后,会在函数计算中创建对应名字的函数。

        函数模板

        默认提供四种函数模板:

        • 内容分割

        • 内容映射

        • 内容富化

        • 动态路由

        可以使用提供的简易版函数代码编辑框,对函数模板中的代码进行编辑和调试,完成编辑之后,无需保存,当任务保存之后,函数中使用的代码为编辑框中的代码。

        绑定现有函数

        服务

        选择函数计算中已创建的服务。

        函数

        选择上一步选择的服务中的函数。

        版本和别名

        选择服务的版本或别名。

      4. Sink(目标)配置向导,选择服务类型,配置相关参数。本示例选择函数计算

        参数

        说明

        示例

        服务

        选择已创建的函数计算的服务。

        test

        函数

        选择已创建的函数计算的函数。

        test

        服务版本和别名

        选择服务版本或服务别名。

        默认版本

        执行方式

        选择同步执行或异步执行。

        异步

        事件

        选择事件内容转换类型。更多信息,请参考事件内容转换

        完整事件

    • 任务属性(可选)

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信。本示例配置重试策略为退避重试、容错策略为允许容错,不使用死信队列。

  4. 配置完成后,单击保存

  5. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

步骤二:测试事件流

  1. 在事件流页面,找到创建好的事件流,单击事件源列的Kafka的Topic,跳转至云消息队列 Kafka 版控制台,单击右上角的体验发送消息

  2. 快速体验消息收发面板,设置发送方式消息key消息内容,单击确定

  3. 查看Transform中使用的函数是否收到了请求。

    回到事件流页面单击操作列的详情,单击Transform(转换)区域的函数名称。

    • 单击调用日志,查看是否有对应的日志。如果调用日志中的调用结果列显示调用成功,则表示 Transform执行成功。

      说明

      如果没有开通日志服务,请先开通日志服务,之后需要重新发送 Kafka 消息。

    • 点击监控指标,查看是否有调用次数。如果有调用次数,且错误次数为0,则表示Sink成功收到了Transform后的内容。transform-cn

  4. 查看Sink中使用的函数是否收到了请求。

    回到事件流页面单击操作列的详情,单击事件目标列的函数名称。

    • 单击调用日志,查看是否有对应的日志。如果调用日志中的调用结果列显示调用成功,则表示 Transform执行成功。

      说明

      如果没有开通日志服务,请先开通日志服务,之后需要重新发送 Kafka 消息。

    • 点击监控指标,查看是否有调用次数。如果有调用次数,且错误次数为0,则表示Sink成功收到了Transform后的内容。sink-cn

相关文档

使用函数计算实现消息数据清洗