RocketMQ触发器

消息队列 RocketMQ 版作为事件源通过事件总线EventBridge函数计算集成后,通过消息队列 RocketMQ 版触发器(以下简称RocketMQ触发器)能够触发关联函数执行,通过函数可以对发布到消息队列 RocketMQ 版中的消息进行自定义处理。本文介绍如何在函数计算控制台创建RocketMQ触发器、配置函数入口参数和编写代码并测试。

功能简介

您在函数计算的控制台提交触发器创建请求之后,函数计算会根据触发器的配置信息,自动在事件总线EventBridge侧创建事件流资源。

创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的资源信息。当源消息队列 RocketMQ 版实例中有消息入队时,将会触发函数计算执行。执行时会根据您的攒批配置,将一个或多个消息事件以批的形式推送到函数中进行处理,适合端到端的流式数据处理场景。

注意事项

  • 作为触发源的消息队列 RocketMQ 版的实例必须和函数计算的函数在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建RocketMQ触发器。

在单个阿里云账号单个地域维度下,关于创建触发器涉及的资源数量的限制,请参见使用限制

前提条件

步骤一:创建触发器

当您已经创建好RocketMQ的实例,您需要登录函数计算控制台,进入目标函数,选择配置页签,创建触发器,创建完成点击确定如下图所示。

image

上图中的配置项如下所示。

配置项

操作

本文示例

消费位点

选择消息的消费位点,即消息队列 RocketMQ 版从事件总线开始拉取消息的位置。取值说明如下。

  • 最新位点:从最新位点开始消费。

  • 最早位点:从最早位点开始消费。

  • 指定时间戳:从指定时间戳开始消费。

最新位点

调用方式

选择函数调用方式。

取值说明如下。

  • 同步调用:适用于顺序调用场景。单个(批)事件触发函数调用,等待函数执行完成返回结果后,再由下一个(批)事件继续触发函数调用。同步调用请求正文有效负载最大为32 MB。更多信息,请参见同步调用

  • 异步调用:可以快速消费事件。单个(批)事件触发函数调用,函数计算会立刻返回响应,再由下一个(批)事件继续触发函数调用。该过程中函数会异步执行。异步调用请求正文有效负载最大为128 KB。更多信息,请参见异步调用

同步调用

关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能

步骤二:(可选)配置函数入口参数

消息队列 RocketMQ 版事件源会以event的形式作为输入参数传递给函数,您可以使用代码解析event参数,并对event进行处理。您可以手动将event传给函数模拟触发事件,测试函数代码是否正确。

  1. 在函数详情页面的代码页签,单击测试函数右侧的image.png图标,从下拉列表中,选择配置测试参数

  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件,填写事件名称和事件内容,然后单击确定。

    event格式如下所示。

    [
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        },
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        }
    ]

    data字段包含的参数解释如下表所示。关于CloudEvents规范中定义的参数解释,请参见事件概述

    参数

    类型

    示例值

    描述

    topic

    String

    TopicName

    Topic名称。

    systemProperties

    Map

    系统属性。

    MIN_OFFSET

    Int

    0

    最低位点。

    TRACE_ON

    Boolean

    true

    是否有消息轨迹。取值说明如下:

    • true:有消息轨迹。

    • false:无消息轨迹。

    MAX_OFFSET

    Int

    8

    最高位点。

    MSG_REGION

    String

    cn-hangzhou

    发送消息的地域。

    KEYS

    String

    systemProperties.KEYS

    过滤属性。

    CONSUME_START_TIME

    Long

    1628577790396

    开始消费时间。单位:毫秒。

    UNIQ_KEY

    String

    AC14C305069E1B28CDFA3181CDA2****

    消息唯一键。

    TAGS

    String

    systemProperties.TAGS

    过滤属性。

    INSTANCE_ID

    String

    MQ_INST_123456789098****_BXhFHryi

    实例ID。

    userProperties

    Map

    用户属性。

    body

    String

    TEST

    消息内容。

步骤三:编写函数代码并测试

完成触发器创建后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当消息队列 RocketMQ 版事件通过事件总线EventBridge投递到函数计算时,触发器会自动触发函数的执行。

  1. 在函数详情页面的代码页签,在代码编辑器中编写代码,然后单击部署代码

    本文以Node.js函数代码为例。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // 解析event参数,对event进行处理。
      callback(null, 'return result');
    }
  2. 测试函数。

    方式一:如果您是配置函数入口参数event模拟事件源,单击测试函数

    方式二:登录消息队列 RocketMQ 版控制台选择您创建的目标Topic,点击发送消息,如下图。

    image

  3. 执行完成后,在实时日志查看结果。

    image

更多信息

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理