创建FC Sink Connector

本文介绍如何在云消息队列 RocketMQ 版控制台创建流出目标为函数计算的消息流出任务。

前提条件

什么是函数计算?

函数计算(Function Compute,FC)是一个事件驱动的全托管Serverless计算服务,您无需管理服务器等基础设施,只需编写代码并上传,函数计算会为您准备好计算资源,并以弹性、可靠的方式运行您的代码。更多信息,请参见什么是函数计算

函数计算能做什么?

  • 利用函数实现业务消息的处理,通过函数计算平台开发、运行业务消息处理逻辑,订单处理,任务执行。

  • 利用函数对消息进行快速加工处理,ETL清洗。

  • 利用函数计算灵活扩展将消息转储到指定VPC中的其他下游系统。

  • 利用函数连接消息系统和其他阿里云产品服务,将消息数据发送给其他云服务系统。

创建消息流出任务

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择Connector生态集成 > 任务列表

  3. 任务列表页面,单击创建任务

  4. 创建任务面板,设置任务名称描述,配置以下参数,单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 Kafka 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择云消息队列 Kafka 版源实例所在的地域。

        华北2(北京)

        kafka 实例

        选择生产云消息队列 Kafka 版消息的源实例。

        alikafka_post-cn-jte3****

        Topic

        选择生产云消息队列 Kafka 版消息的Topic。

        demo-topic

        Group ID

        选择源实例的消费组名称。

        • 快速创建:推荐方案,自动创建以GID_EVENTBRIDGE_xxx 命名的 Group ID。

        • 使用已有:请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发

        快速创建

        消费位点

        选择开始消费消息的位点。

        • 最新点位

        • 最早点位

        最新位点

        网络配置

        选择路由消息的网络类型。

        • 基础网络

        • 自建公网

        基础网络

        专有网络VPC

        选择VPC ID。当网络配置设置为自建公网时需要设置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。当网络配置设置为自建公网时需要设置此参数。

        alikafka_pre-cn-7mz2****

        数据格式

        数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。

        • Json(Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)

        • Text(默认文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)

        • Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)

        Json

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见事件模式

      3. Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见数据清洗

      4. Sink(目标)配置向导,选择服务类型函数计算,配置以下参数。

        参数

        说明

        示例

        函数

        选择已创建的函数计算的函数。

        test

        版本和别名

        选择服务版本或服务别名。

        • 指定版本

        • 指定别名

        指定版本

        版本

        函数变更的对应版本,一般选择为最新版。当版本和别名设置为指定版本时配置。

        LATEST

        别名

        函数别名。当版本和别名设置为指定别名时配置。

        test

        执行方式

        选择同步执行或异步执行。

        异步

        投递格式

        • Object 格式:事件将会以对象(Object) 格式向下游函数进行投递。

        • ObjectList 格式:事件将会以对象数组(Array)格式向下游函数进行投递。

        Object 格式

        事件

        事件总线EventBridge通过JSONPath提取消息中的数据,将指定的消息内容路由到目标。

        • 完整数据

        • 数据提取

        • 固定值

        • 模板

        完整数据

    • 任务属性

      设置此任务的重试策略及死信队列。更多信息,请参见重试和死信

  5. 返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用

  6. 提示对话框,阅读提示信息,然后单击确认

    启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。

其他操作

任务列表页面,找到目标任务,在其右侧操作列,执行其他操作。

  • 查看任务详情:单击详情,在任务页面,查看任务的基础信息、任务属性及监控指标。

  • 编辑任务配置:单击编辑,在编辑任务面板,修改任务详情及属性。

  • 启停任务:单击启用或者停用,然后在提示对话框,单击确认

  • 删除任务:单击删除,然后在提示对话框,单击确认