消息队列RabbitMQ作为事件源通过事件总线EventBridge函数计算集成后,通过消息队列RabbitMQ触发器(以下简称RabbitMQ触发器)能够触发关联函数执行,通过函数可以对发布到消息队列RabbitMQ中的消息进行自定义处理。本文介绍如何在函数计算控制台创建RabbitMQ触发器、配置函数入口参数和编写代码并测试。

功能简介

您在函数计算的控制台提交触发器创建请求之后,函数计算会根据触发器的配置信息,自动创建与事件总线EventBridge相关的资源。目前提供事件模式事件流模式两种消息推送模式,每个模式创建的资源如下:
创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的资源信息。当源消息队列RabbitMQ实例中有消息入队时,将会触发函数计算执行,不同消息推送模式支持的参数内容也不同。
  • 事件模式:每次会将单个消息作为事件参数传入函数中,事件遵循CloudEvents规范。消息内容和CloudEvents的关系,请参见参数内容
  • 事件流模式:根据您的攒批配置,将一个或多个消息事件以批的形式推送到函数中进行处理,适合端到端的流式数据处理场景。

注意事项

  • 作为触发源的消息队列RabbitMQ实例必须和函数计算的函数在相同的地域。
  • 创建的自定义总线以及事件规则的数量超过上限后,将无法再创建事件模式的RabbitMQ触发器。
  • 创建的事件流数量超过上限后,将无法再创建事件流模式的RabbitMQ触发器。

在单个阿里云账号单个地域维度下,关于创建触发器涉及的资源数量的限制,请参见使用限制

前提条件

步骤一:创建触发器

  1. 登录函数计算控制台,在左侧导航栏,单击服务及函数
  2. 在顶部菜单栏,选择地域,然后在服务列表页面,单击目标服务。
  3. 函数管理页面,单击目标函数名称。
  4. 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器
  5. 在创建触发器面板,填写相关信息。然后单击确定
    基础配置项说明如下所示。
    配置项操作本文示例
    触发器类型选择消息队列 RabbitMQ 版消息队列 RabbitMQ 版
    名称填写自定义的触发器名称。rabbitmq-trigger
    版本或别名默认值为LATEST,如果您需要创建其他版本或别名的触发器,首先需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本管理别名LATEST
    RabbitMQ 实例选择已创建的消息队列RabbitMQ的实例。amqp-cn-i7m2l6m2****
    Vhost选择已创建的消息队列RabbitMQ实例的Vhost。myhost-1
    Queue选择已创建的消息队列RabbitMQ实例的Queue。myqueue-1
    调用方式选择函数调用方式。
    取值说明如下。
    • 同步调用:适用于顺序调用场景。单个(批)事件触发函数调用,等待函数执行完成返回结果后,再由下一个(批)事件继续触发函数调用。同步调用请求正文有效负载最大为32 MB。更多信息,请参见同步调用
    • 异步调用:可以快速消费事件。单个(批)事件触发函数调用,函数计算会立刻返回响应,再由下一个(批)事件继续触发函数调用。该过程中函数会异步执行。异步调用请求正文有效负载最大为128 KB。更多信息,请参见异步调用
    同步调用
    消息推送模式消息数据推送到函数计算时的底层应用模式。
    取值说明如下。
    • 事件模式:每次会将单个消息作为事件参数传入函数中,事件遵循CloudEvents规范。消息内容和CloudEvents的关系,请参见参数内容
    • 事件流模式:会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理,适合端到端的流式数据处理场景。
    事件模式
    触发器启用状态创建触发器后是否立即启用。默认勾选启用触发器,即创建触发器后立即启用触发器。不涉及

    关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能

    创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤二:配置函数入口参数

消息队列RabbitMQ事件源会以event的形式作为输入参数传递给函数,您可以手动将event传给函数模拟触发事件,测试函数代码是否正确。

  1. 在函数详情页面,单击函数代码页签,然后单击xialatubiao图标,从下拉列表中,选择配置测试参数
  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定
    事件模式的event格式如下所示。
      {
        "id":"bj694332-4cj1-389e-9d8c-b137h30b****",
        "source":"RabbitMQ-Function-rabbitmq-trigger",
        "specversion":"1.0",
        "type":"amqp:Queue:SendMessage",
        "datacontenttype":"application/json;charset=utf-8",
        "subject":"acs:amqp:cn-hangzhou:164901546557****:/instances/amqp-cn-tl32e756****/vhosts/eb-connect/queues/housekeeping",
        "time":"2021-08-12T06:56:40.709Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T08:58:55.140Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RabbitMQ-Function-rabbitmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "envelope":{
                "deliveryTag":98,
                "exchange":"",
                "redeliver":false,
                "routingKey":"housekeeping"
            },
            "body":{
                "Hello":"RabbitMQ"
            },
            "props":{
                "contentEncoding":"UTF-8",
                "messageId":"f7622d51-e198-41de-a072-77c1ead7****"
            }
        }
    }
    事件流模式的event格式如下所示。
    [
          {
        "id":"bj694332-4cj1-389e-9d8c-b137h30b****",
        "source":"RabbitMQ-Function-rabbitmq-trigger",
        "specversion":"1.0",
        "type":"amqp:Queue:SendMessage",
        "datacontenttype":"application/json;charset=utf-8",
        "subject":"acs:amqp:cn-hangzhou:164901546557****:/instances/amqp-cn-tl32e756****/vhosts/eb-connect/queues/housekeeping",
        "time":"2021-08-12T06:56:40.709Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T08:58:55.140Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RabbitMQ-Function-rabbitmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "envelope":{
                "deliveryTag":98,
                "exchange":"",
                "redeliver":false,
                "routingKey":"housekeeping"
            },
            "body":{
                "Hello":"RabbitMQ"
            },
            "props":{
                "contentEncoding":"UTF-8",
                "messageId":"f7622d51-e198-41de-a072-77c1ead7****"
            }
        }
        },
        {
        "id":"bj694332-4cj1-389e-9d8c-b137h30b****",
        "source":"RabbitMQ-Function-rabbitmq-trigger",
        "specversion":"1.0",
        "type":"amqp:Queue:SendMessage",
        "datacontenttype":"application/json;charset=utf-8",
        "subject":"acs:amqp:cn-hangzhou:164901546557****:/instances/amqp-cn-tl32e756****/vhosts/eb-connect/queues/housekeeping",
        "time":"2021-08-12T06:56:40.709Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T08:58:55.140Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RabbitMQ-Function-rabbitmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "envelope":{
                "deliveryTag":98,
                "exchange":"",
                "redeliver":false,
                "routingKey":"housekeeping"
            },
            "body":{
                "Hello":"RabbitMQ"
            },
            "props":{
                "contentEncoding":"UTF-8",
                "messageId":"f7622d51-e198-41de-a072-77c1ead7****"
            }
        }
        }
    ]
    data字段包含的参数解释如下表所示。关于CloudEvents规范中定义的参数解释,请参见事件概述
    参数类型示例值描述
    bodyMap消息内容。
    HelloStringEventBridge用户数据。
    propsMap消息属性。
    contentEncodingStringutf-8消息内容编码。
    messageIdStringf7622d51-e198-41de-a072-77c1ead7****消息ID。每条消息的ID取值唯一。
    envelopeMap消息的envelope信息。
    deliveryTagInt98消息的Tag。
    exchangeString消息的Exchange。
    redeliverBooleanfalse是否支持重发消息。取值说明如下:
    • true:支持
    • false:不支持
    routingKeyStringhousekeeping消息的路由规则。

步骤三:编写函数代码并测试

完成触发器创建后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当消息队列RabbitMQ事件通过事件总线EventBridge投递到函数计算时,触发器会自动触发函数的执行。

  1. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码
    本文以Node.js函数代码为例。
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event参数,对event进行处理。
      callback(null, 'return result');
    }
  2. 单击函数代码页签的测试函数
    执行完成后,您可以在函数代码页签的上方查看执行结果。

更多信息

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理