消息队列RabbitMQ版推送事件到函数计算

本文介绍如何通过事件总线EventBridge云消息队列 RabbitMQ 版的数据推送到函数计算。

前提条件

您已完成以下操作:

步骤一:添加自定义事件源

  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,单击事件总线
  3. 在顶部菜单栏,选择地域。
  4. 事件总线页面,单击已创建的自定义事件总线。

  5. 在左侧导航栏,单击事件源
  6. 事件源页面,单击添加事件源

  7. 添加自定义事件源面板,输入名称描述事件提供方选择消息队列 RabbitMQ 版,并选择已创建的云消息队列 RabbitMQ 版的资源信息等,然后单击确定

步骤二:创建事件规则

重要

目标服务和事件规则必须处于同一地域。

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线
  2. 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
  3. 在左侧导航栏,单击事件规则,然后单击创建规则
  4. 创建规则页面,完成以下操作。

    1. 配置基本信息配置向导,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步

    2. 配置事件模式配置向导,事件源类型选择自定义事件源事件源选择步骤一添加的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步

      如需了解更多信息,请参见事件模式

    3. 配置事件目标配置向导,配置事件目标,然后单击创建

      说明

      1个事件规则最多可以添加5个目标。

      配置项

      说明

      服务类型

      在下拉列表中选择函数计算

      函数

      在下拉列表中选择已创建的函数。

      事件

      支持完整事件部分事件固定值模板四种事件类型,本文以模板类型为例进行介绍说明。具体事件类型的介绍,请参见事件内容转换

      以下提供的是变量模板示例。

      变量示例:

      {
        "source":"$.source",
        "type":"$.type"
      }

      模板示例:

      The event comes from ${source},event type is ${type}.

      版本和别名

      支持指定函数版本或指定函数别名:

      • 如果您选择指定版本,需要选择函数的具体版本。

      • 如果您选择指定别名,需要选择函数的具体别名。

      执行方式

      支持以下两种执行方式,具体信息,请参见同步调用异步调用功能概览

      • 同步:同步调用是事件被函数处理后直接返回结果。

      • 异步:异步调用是函数计算系统接收异步调用请求后,将请求持久化后会立即返回响应,而不是等待请求执行完成后再返回。

      投递方式

      支持以下两种投递方式:

      • Object格式:如果您选用此格式,事件将会以对象(Object) 格式向下游函数进行投递。

      • ObjectList格式:如果您选用此格式,事件将会以对象数组(Array)格式向下游函数进行投递。

      说明

      此功能为非必选项,如果您不选择投递格式,则默认事件将以Object格式向下游函数进行投递。

      重试和死信

      请参见重试和死信文档进行设置。

步骤三:发布事件

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class Producer {
    //设置为云消息队列 RabbitMQ 版实例的接入点。
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    //设置为云消息队列 RabbitMQ 版实例的静态用户名。
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    //设置为云消息队列 RabbitMQ 版实例的静态用户名密码。
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    //设置为云消息队列 RabbitMQ 版实例的Vhost名称。
    public static final String virtualHost = "vhost_test";

    //如果使用5671端口,需要enableSSL设置为true。
    public static final int port = 5672;
    public static final boolean enableSSL = false;

    private Channel channel;
    private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
    private final ConnectionFactory factory;
    private final String exchangeName;
    private final String queueName;
    private final String bindingKey;

    public Producer(ConnectionFactory factory, String exchangeName, String queueName, String bindingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        this.factory = factory;
        this.outstandingConfirms = new ConcurrentSkipListMap<>();
        this.channel = factory.createChannel();
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.bindingKey = bindingKey;
    }

    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        //构建连接工厂。
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);

        //初始化生产者。
        Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "BindingKeyTest");

        //declare。
        producer.declare();

        producer.initChannel();

        //发送消息。
        producer.doSend("hello,amqp");
    }

    private void initChannel() throws IOException {
        channel.confirmSelect();

        ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);

                for (Long tag : confirmed.keySet()) {
                    String msgId = confirmed.get(tag);
                    System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
                }

                confirmed.clear();
            } else {
                String msgId = outstandingConfirms.remove(deliveryTag);
                System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
            }
        };
        channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
            String msgId = outstandingConfirms.get(deliveryTag);
            System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
            // send msg failed, re-publish
        });


        channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
    }

    private void declare() throws IOException {
        channel.exchangeDeclare(exchangeName, "direct", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, bindingKey);
    }
    

    private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        try {
            String msgId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();

            channel.basicPublish(exchangeName, bindingKey, true, props, content.getBytes(StandardCharsets.UTF_8));

            outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
        } catch (AlreadyClosedException e) {
            //need reconnect if channel is closed.
            String message = e.getMessage();

            System.out.println(message);

            if (channelClosedByServer(message)) {
                factory.closeCon(channel);
                channel = factory.createChannel();
                this.initChannel();
                doSend(content);
            } else {
                throw e;
            }
        }
    }

    private boolean channelClosedByServer(String errorMsg) {
        if (errorMsg != null
            && errorMsg.contains("channel.close")
            && errorMsg.contains("reply-code=541")
            && errorMsg.contains("reply-text=InternalError")) {
            return true;
        } else {
            return false;
        }
    }
}

结果验证

您可以在函数计算控制台使用表盘解读数据指标。

  1. 登录函数计算控制台

  2. 在左侧导航栏,单击函数,然后在顶部菜单栏选择目标地域。

  3. 函数页面,单击目标函数名称。

  4. 在目标函数详情页面,单击日志页签,然后单击函数日志,即可查看目标函数的日志信息。

    2n968ZJ9Lj

常见问题

事件发布失败,我该如何定位问题?

如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。

发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?

您可以按照以下步骤处理:
  1. 登录函数计算控制台,执行目标函数并观察执行时间。
  2. 如果函数执行时间大于15s,请排查网络问题;如果函数执行时间小于15s,请确认您是否可以访问函数计算服务所属地域的Endpoint。
  3. 如您不能访问当前函数计算服务所属地域的Endpoint,请联系函数计算工程师处理。