消息队列RocketMQ版推送事件到函数计算

更新时间:2025-01-20 03:18:38

本文介绍如何通过事件总线EventBridge将消息队列RocketMQ版的数据推送到函数计算。

前提条件

您已完成以下操作:

步骤一:添加自定义事件源

  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,单击事件总线
  3. 在顶部菜单栏,选择地域。
  4. 事件总线页面,单击已创建的自定义事件总线。

  5. 在左侧导航栏,单击事件源
  6. 事件源页面,单击添加事件源

  7. 添加自定义事件源面板,输入名称描述事件提供方选择消息队列 RocketMQ 版,并选择已创建的云消息队列 RocketMQ 版的资源信息等,然后单击确定

步骤二:创建事件规则

重要

目标服务和事件规则必须处于同一地域。

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线
  2. 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
  3. 在左侧导航栏,单击事件规则,然后单击创建规则
  4. 创建规则页面,完成以下操作。

    1. 配置基本信息配置向导,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步

    2. 配置事件模式配置向导,事件源类型选择自定义事件源事件源选择步骤一添加的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步

      如需了解更多信息,请参见事件模式

    3. 配置事件目标配置向导,配置事件目标,然后单击创建

      说明

      1个事件规则最多可以添加5个目标。

      配置项

      说明

      配置项

      说明

      服务类型

      在下拉列表中选择函数计算

      函数

      在下拉列表中选择已创建的函数。

      事件

      支持完整事件部分事件固定值模板四种事件类型,本文以模板类型为例进行介绍说明。具体事件类型的介绍,请参见事件内容转换

      以下提供的是变量模板示例。

      变量示例:

      {
        "source":"$.source",
        "type":"$.type"
      }

      模板示例:

      The event comes from ${source},event type is ${type}.

      版本和别名

      支持指定函数版本或指定函数别名:

      • 如果您选择指定版本,需要选择函数的具体版本。

      • 如果您选择指定别名,需要选择函数的具体别名。

      执行方式

      支持以下两种执行方式,具体信息,请参见同步调用异步调用功能概览

      • 同步:同步调用是事件被函数处理后直接返回结果。

      • 异步:异步调用是函数计算系统接收异步调用请求后,将请求持久化后会立即返回响应,而不是等待请求执行完成后再返回。

      投递方式

      支持以下两种投递方式:

      • Object格式:如果您选用此格式,事件将会以对象(Object) 格式向下游函数进行投递。

      • ObjectList格式:如果您选用此格式,事件将会以对象数组(Array)格式向下游函数进行投递。

      说明

      此功能为非必选项,如果您不选择投递格式,则默认事件将以Object格式向下游函数进行投递。

      重试和死信

      请参见重试和死信文档进行设置。

步骤三:发布事件

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
        System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
        System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        //设置发送超时时间,单位毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");

        Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();

        Message msg = new Message(
                // Message所属的Topic。
                "TopicTestMQ",
                // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                "TagA",
                // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());

        // 设置代表消息的业务关键属性,请尽可能全局唯一。以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发。
        msg.setKey("ORDERID_100");

        // 异步发送消息,发送结果通过callback返回给客户端。
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                // 消息发送成功。
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });

        // 在callback返回之前即可取得msgId。
        System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

        // 在应用退出前,销毁Producer对象。 注意:如果不销毁也没有问题。
        producer.shutdown();
    }
}

结果验证

您可以在函数计算控制台使用表盘解读数据指标。

  1. 登录函数计算控制台

  2. 在左侧导航栏,单击函数,然后在顶部菜单栏选择目标地域。

  3. 函数页面,单击目标函数名称。

  4. 在目标函数详情页面,单击日志页签,然后单击函数日志,即可查看目标函数的日志信息。

    2n968ZJ9Lj

常见问题

事件发布失败,我该如何定位问题?

如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。

发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?

您可以按照以下步骤处理:
  1. 登录函数计算控制台,执行目标函数并观察执行时间。
  2. 如果函数执行时间大于15s,请排查网络问题;如果函数执行时间小于15s,请确认您是否可以访问函数计算服务所属地域的Endpoint。
  3. 如您不能访问当前函数计算服务所属地域的Endpoint,请联系函数计算工程师处理。
  • 本页导读 (1)
  • 前提条件
  • 步骤一:添加自定义事件源
  • 步骤二:创建事件规则
  • 步骤三:发布事件
  • 结果验证
  • 常见问题
  • 事件发布失败,我该如何定位问题?
  • 发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等