文档

Kafka触发器

更新时间:
一键部署

消息队列 Kafka 版作为事件源通过事件总线EventBridge函数计算集成后,通过消息队列 Kafka 版触发器(以下简称Kafka触发器)能够触发关联函数执行,通过函数可以对发布到消息队列 Kafka 版的消息进行自定义处理。本文介绍如何在函数计算控制台创建Kafka触发器、配置入口参数以及编写代码并测试代码。

功能简介

您在函数计算的控制台提交触发器创建请求之后,函数计算会根据触发器的配置信息,自动在事件总线EventBridge侧创建事件流资源。

创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的资源信息。当源消息队列 Kafka 版中有消息入队时,将会触发函数计算执行,触发时会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理。

注意事项

  • 作为触发源的消息队列 Kafka 版实例必须和函数计算的函数在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建Kafka触发器。关于事件流数量的限制,请参见使用限制

前提条件

步骤一:创建Kafka触发器

  1. 登录函数计算控制台,在左侧导航栏,单击函数

  2. 在顶部菜单栏,选择地域,然后在函数页面,单击目标函数。

  3. 在函数配置页面,选择配置页签,在左侧导航栏,单击触发器,然后单击创建触发器

  4. 在创建触发器面板,填写相关信息,然后单击确定

    基础配置项说明如下所示。

    配置项

    操作

    本文示例

    触发器类型

    选择消息队列 Kafka 版

    消息队列 Kafka 版

    名称

    填写自定义的触发器名称。

    kafka-trigger

    版本或别名

    默认值为LATEST,如果您需要创建其他版本或别名的触发器,首先需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本管理别名

    LATEST

    Kafka 实例

    选择已创建的消息队列 Kafka 版实例。

    alikafka_pre-cn-i7m2t7t1****

    Topic

    选择已创建的消息队列 Kafka 版实例的Topic。

    topic1

    Group ID

    选择已创建的消息队列 Kafka 版实例的Group ID。

    说明

    请使用独立的Group ID来创建触发器,不要与已有的业务混用Group ID,否则会影响已有的消息收发。

    GID_group1

    消费任务并发数

    消费者的并发数量,取值范围为[1,Topic的分区数]。

    2

    消费位点

    选择消息的消费位点,即消息队列 Kafka 版从事件总线开始拉取消息的位置。

    取值说明如下。

    • 最早位点:从最早位点开始消费。

    • 最新位点:从最新位点开始消费。

    最新位点

    网络配置

    选择路由消息的网络类型。

    取值说明如下。

    • 默认网络:默认使用部署Kafka实例时选择的VPC IDvSwitch ID

    • 自建公网:需选择另外的专有网络VPC交换机安全组

    默认网络

    调用方式

    选择函数调用方式。

    取值说明如下。

    • 同步调用:适用于顺序调用场景。单个(批)事件触发函数调用,等待函数执行完成返回结果后,再由下一个(批)事件继续触发函数调用。同步调用请求正文有效负载最大为32 MB。更多信息,请参见同步调用

    • 异步调用:可以快速消费事件。单个(批)事件触发函数调用,函数计算会立刻返回响应,再由下一个(批)事件继续触发函数调用。该过程中函数会异步执行。异步调用请求正文有效负载最大为128 KB。更多信息,请参见功能概览

    同步调用

    触发器启用状态

    创建触发器后是否立即启用。默认勾选启用触发器,即创建触发器后立即启用触发器。

    不涉及

    关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能

    创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤二:配置函数入口参数

消息队列 Kafka 版事件源会以event的形式作为输入参数传递给函数,您可以手动将event传给函数模拟触发事件。

  1. 在函数配置页面的代码页签,单击测试函数右侧的image.png图标,从下拉列表中,选择配置测试参数

  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件,填写事件名称和事件内容,然后单击确定。

    event格式如下所示:

    [
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        },
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        }
    ]

    CloudEvents规范中定义的参数解释,请参见事件概述

    data字段包含的参数解释如下表所示。

    参数类型示例值描述
    topicStringTopicNameTopic的名称。
    partitionInt1云消息队列 Kafka 版的消费分区信息。
    offsetInt0云消息队列 Kafka 版的消息位点。
    timestampString1655952591589开始消费时间戳。

步骤三:编写函数代码并测试

触发器创建完成后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当消息队列 Kafka 版事件发生时,触发器会自动触发函数的执行。

  1. 在函数配置页面的代码页签,在代码编辑器中编写代码,然后单击部署代码

    本文以Node.js函数代码为例。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      //解析event参数,对event进行处理。
      callback(null, 'return result');
    }
  2. 单击测试函数

更多信息

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

  • 本页导读 (1)