消息队列RabbitMQ版推送事件到函数计算

本文介绍如何通过事件总线EventBridge云消息队列 RabbitMQ 版的数据推送到函数计算。

前提条件

您已完成以下操作:

步骤一:添加自定义事件源

  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,单击事件总线
  3. 在顶部菜单栏,选择地域。
  4. 事件总线页面,单击已创建的自定义事件总线。

  5. 在左侧导航栏,单击事件源
  6. 事件源页面,单击添加事件源

  7. 添加自定义事件源面板,输入名称描述事件提供方选择消息队列 RabbitMQ 版,并选择已创建的云消息队列 RabbitMQ 版的资源信息等,然后单击确定

步骤二:创建事件规则

重要 目标服务和事件规则必须处于同一地域。
  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线
  2. 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
  3. 在左侧导航栏,单击事件规则,然后单击创建规则
  4. 创建规则页面,完成以下操作。
    1. 配置基本信息配置向导,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步
    2. 配置事件模式配置向导,事件源类型选择自定义事件源事件源选择步骤一添加的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步

      如需了解更多信息,请参见事件模式

    3. 配置事件目标配置向导,配置事件目标,然后单击创建
      说明 1个事件规则最多可以添加5个目标。
      • 服务类型:单击函数计算
      • 服务:选择已创建的函数计算的服务。
      • 函数:选择已创建的函数计算的函数。
      • 事件:单击模板

        以下提供变量和模板的示例。

        变量示例:

        {
          "source":"$.source",
          "type":"$.type"
        }

        模板示例:

        The event comes from ${source},event type is ${type}.

        如需了解更多信息,请参见事件内容转换

      • 服务版本和别名:选择服务版本或服务别名。
        • 默认版本:LATEST。
        • 指定版本:选择服务版本。更多信息,请参见管理版本
        • 指定别名:选择服务别名。更多信息,请参见管理别名
      • 调用方式:选择同步调用异步调用。更多信息,请参见同步调用功能概览

步骤三:发布事件

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //设置实例的接入点。
        String hostName = "xxx.xxx.aliyuncs.com";
        //设置实例的静态用户名密码。
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        //设置实例的Vhost。
        String virtualHost = "${VirtualHost}";

        //在生产环境中,建议提前创建好Connection,并在需要时重复使用,避免频繁创建和关闭Connection,以提高性能、复用连接资源,以及保证系统的稳定性。
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        //设置Exchange、Queue和绑定关系。
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";
        //设置Exchange类型。
        String exchangeType = "${ExchangeType}";

        //此处为了体验流畅,确保了Exchange和Queue的创建过程。
        //在生产环境中,建议在控制台提前创建,尽量避免在代码中直接声明,否则可能触发单API调用的限流。
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        //开始发送消息。
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, routingKey, true, props,
                    ("消息发送示例Body-"  + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        //默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        //基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

结果验证

您可以在函数计算控制台使用表盘解读数据指标。

  1. 登录函数计算控制台

  2. 在左侧导航栏,单击服务及函数
  3. 在顶部菜单栏,选择地域。
  4. 服务列表页面,找到目标服务,在其右侧操作列单击函数管理
  5. 函数管理页面,找到目标函数,单击目标函数名称。
  6. 函数详情页面,单击调用日志页签,查看日志。
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****

常见问题

事件发布失败,我该如何定位问题?

如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。

发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?

您可以按照以下步骤处理:
  1. 登录函数计算控制台,执行目标函数并观察执行时间。
  2. 如果函数执行时间大于15s,请排查网络问题;如果函数执行时间小于15s,请确认您是否可以访问函数计算服务所属地域的Endpoint。
  3. 如您不能访问当前函数计算服务所属地域的Endpoint,请联系函数计算工程师处理。