自建Apache RocketMQ触发器

Apache RocketMQ作为事件源通过事件总线EventBridge函数计算集成后,通过Apache RocketMQ触发器能够触发关联函数执行,通过函数可以对发布到Apache RocketMQ的消息进行自定义处理。本文介绍如何在函数计算控制台创建Apache RocketMQ触发器、配置入口参数以及编写代码并测试。

背景信息

您在函数计算的控制台提交触发器创建请求后,函数计算会根据触发器的配置信息,自动在事件总线EventBridge侧创建事件流资源。

创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的资源信息。当Apache RocketMQ中有消息入队时,将会触发函数计算执行,触发时会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理。

前提条件

使用限制

  • 作为触发源的Apache RocketMQ必须支持公网可访问或者阿里云VPC内可访问。

  • 当Apache RocketMQ支持阿里云VPC内可访问时,VPC实例必须和函数计算的函数在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建Apache RocketMQ触发器。关于事件流数量的限制,请参见使用限制

步骤一:创建Apache RocketMQ触发器

  1. 登录函数计算控制台,在左侧导航栏,单击服务及函数
  2. 在顶部菜单栏,选择地域,然后在服务列表页面,单击目标服务。
  3. 函数管理页面,单击目标函数名称。
  4. 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器

  5. 在创建触发器面板,填写相关信息,然后单击确定

    配置项

    操作

    本文示例

    触发器类型

    选择自建 Apache RocketMQ

    自建 Apache RocketMQ

    名称

    填写自定义的触发器名称。

    apache-rocketmq-trigger

    版本或别名

    默认值为LATEST,如果您需要创建其他版本或别名的触发器,需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的介绍,请参见管理版本管理别名

    LATEST

    接入点

    填写集群NameServer地址。

    192.168.X.X:9876

    Topic

    选择已创建的Apache RocketMQ实例的Topic。

    testTopic

    Group ID

    选择已创建的Apache RocketMQ实例的Consumer Group ID。

    testGroup

    FilterType

    选择消息过滤类型。取值说明如下:

    • Tag:通过Tag标签进行消息过滤。

    • SQL:通过SQL语句进行消息过滤,可匹配消息的属性标识以及属性值。

    Tag

    Filter

    选择FilterType后,需要配置对应的过滤类型下的过滤语句。

    TagA

    认证模式

    选择认证模式,支持ACL模式。

    ACL

    用户名

    认证模式选择ACL后,需要配置Apache RocketMQ用户名用于身份验证。

    admin

    密码

    认证模式选择ACL后,需要配置Apache RocketMQ密码用于身份验证。

    ******

    消费位点

    选择消息的消费位点,即Apache RocketMQ从事件总线开始拉取消息的位置。取值说明如下。

    • 最新位点:从最新位点开始消费。

    • 最早位点:从最早位点开始消费。

    • 指定时间戳:从指定时间戳开始消费。

    最新位点

    网络配置

    选择路由消息的网络类型。取值说明如下。

    • 公网:通过公网访问Apache RocketMQ集群。

    • 专有网络:通过阿里云专有网络访问Apache RocketMQ集群。您需要选择对应的专有网络VPC交换机安全组

    公网

    调用方式

    选择函数调用方式。

    取值说明如下:

    • 同步调用:默认调用方式。事件触发函数执行,等待函数调用完成后,函数计算返回执行结果。更多信息,请参见同步调用

    • 异步调用:适用于调度延时较长的函数,事件触发函数执行后,函数计算立即返回响应结果并且确保函数至少被成功执行一次,但不会返回具体执行结果。更多信息,请参见异步调用

    同步调用

    触发器启用状态

    创建触发器后是否立即启用。默认勾选启用触发器,即创建触发器后立即启用触发器。

    启用触发器

    关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能

    创建完成后,在触发器管理页签下会显示已创建的触发器。如果需要对创建的触发器进行修改或删除,请参见触发器管理

步骤二:配置函数入口参数

自建Apache RocketMQ事件源会以event的形式作为输入参数传递给函数,您可以手动将event传给函数模拟触发事件。

  1. 在函数详情页面,单击函数代码页签,然后单击测试函数右侧xialatubiao图标,从下拉列表中,选择配置测试参数

  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定

    event格式如下所示。

    [
      {
        "msgId": "7F0000010BDD2A84AEE70DA49B57****",
        "topic": "testTopic",
        "systemProperties": {
          "UNIQ_KEY": "7F0000010BDD2A84AEE70DA49B57****",
          "CLUSTER": "DefaultCluster",
          "MIN_OFFSET": "0",
          "TAGS": "TagA",
          "MAX_OFFSET": "128"
        },
        "userProperties": {},
        "body": "Hello RocketMQ"
      }
    ]

    event字段包含的参数解释如下表所示。

    参数

    类型

    示例值

    描述

    msgId

    String

    7F0000010BDD2A84AEE70DA49B57****

    Apache RocketMQ消息 ID。

    topic

    String

    testTopic

    Topic名称。

    systemProperties

    Map

    系统属性。

    UNIQ_KEY

    String

    7F0000010BDD2A84AEE70DA49B57****

    消息唯一键。

    CLUSTER

    String

    DefaultCluster

    Apache RocketMQ集群名称。

    MIN_OFFSET

    Int

    0

    最低位点。

    MAX_OFFSET

    Int

    128

    最高位点。

    TAGS

    String

    TagA

    过滤属性。

    userProperties

    Map

    用户属性。

    body

    String

    Hello RocketMQ

    消息内容。

步骤三:编写函数代码并测试

触发器创建完成后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当Apache RocketMQ接收到消息后,触发器会自动触发函数执行。

  1. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码

    本文以Node.js函数代码为例,示例代码如下。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event参数,对event进行处理。
      callback(null, 'return result');
    }
  2. 单击函数代码页签的测试函数

    执行完成后,您可以在函数代码页签的上方查看执行结果。

更多信息

除了函数计算控制台,您还可以通过以下方式配置触发器:

  • 通过Serverless Devs工具配置触发器。更多操作,请参见创建触发器

  • 通过SDK配置触发器。更多操作,请参见SDK列表

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理