本文介绍如何在云消息队列 RocketMQ 版控制台创建流出目标为云消息队列 Kafka 版的消息流出任务。

前提条件

什么是函数计算?

函数计算(Function Compute,FC)是一个事件驱动的全托管Serverless计算服务,您无需管理服务器等基础设施,只需编写代码并上传,函数计算会为您准备好计算资源,并以弹性、可靠的方式运行您的代码。更多信息,请参见什么是函数计算

函数计算能做什么?

  • 利用函数实现业务消息的处理,通过函数计算平台开发、运行业务消息处理逻辑,订单处理,任务执行。
  • 利用函数对消息进行快速加工处理,ETL清洗。
  • 利用函数计算灵活扩展将消息转储到指定VPC中的其它下游系统。
  • 利用函数链接消息系统和其他阿里云产品服务,将消息数据发送给其他云服务系统。

创建消息流出任务

  1. 登录云消息队列 RocketMQ 版控制台,在左侧导航栏选择消息集成 > 消息流出
  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在消息流出页面中,单击创建任务
  3. 消息流出创建面板,设置任务名称,选择流出类型函数计算,配置以下信息,然后单击确定
    1. 资源配置区域,设置以下参数。
      表 1. 源端消息队列RocketMQ版参数配置
      参数说明示例
      地域默认选择创建消息流出任务时选择的地域且不可更改。华东1(杭州)
      RocketMQ 实例选择生产消息的源云消息队列 RocketMQ 版实例。MQ_INST_115964845466****_ByBeUp3p
      Topic选择生产消息的Topic。topic
      Group ID选择源实例中的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。GID_http_1
      消费位点选择开始消费消息的位点。最新位点
      表 2. 目标端函数计算参数配置
      参数说明示例
      服务选择已创建的函数计算的服务。service
      函数选择已创建的函数计算的函数。test
      执行方式
      • 异步方式:消息都发给函数计算,错误处理由函数计算保证。
      • 同步方式:调用的错误处理及重试由事件总线EventBridge负责。
      异步
    2. 数据处理区域,设置消息的过滤、转换、重试和死信。
      • 消息过滤
        • 无需过滤:默认投递全部事件。
        • 自定义配置:使用事件模式匹配特定事件。详细规则,请参见消息过滤
      • 消息转换
        • 默认配置:默认将消息内容或相关属性映射到目标端。映射关系,请参见默认配置映射关系
        • 自定义配置:根据自身需求配置消息内容参数,将其投递至目标端。配置规则,请参见消息转换
          参数说明示例
          消息体(body)需要投递的事件的消息内容。配置规则,请参见消息转换部分事件
          $.data.body
          自定义属性(Properties)需要投递的事件的属性。配置规则,请参见消息转换部分事件
          $.data.props
          消息索引(Keys)需要投递的事件的过滤属性。配置规则,请参见消息转换部分事件
          $.data.systemProperties.KEYS
          消息标签(Tags)需要投递的事件的过滤属性。配置规则,请参见消息转换部分事件
          $.data.systemProperties.TAGS
      • 重试和死信:配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

其他操作

消息流出页面,找到目标任务,在其右侧操作列,执行其他操作。

函数计算
  • 查看任务详情:单击详情,在消息流出详情页面,查看任务的源端配置、目标端配置及资源信息。
  • 编辑任务配置:单击编辑,在消息流出编辑面板,修改资源配置规则配置
  • 启停任务:单击启用或者停用,然后在提示对话框,单击确认
  • 删除任务:单击删除,然后在提示对话框,单击确认