本文介绍如何通过事件总线EventBridge将云消息队列 RabbitMQ 版的数据推送到函数计算。
前提条件
您已完成以下操作:
事件总线EventBridge
函数计算
云消息队列 RabbitMQ 版
步骤一:添加自定义事件源
- 登录事件总线EventBridge控制台。
- 在左侧导航栏,单击事件总线。
- 在顶部菜单栏,选择地域。
在事件总线页面,单击已创建的自定义事件总线。
- 在左侧导航栏,单击事件源。
在事件源页面,单击添加事件源。
在添加自定义事件源面板,输入名称和描述,事件提供方选择消息队列 RabbitMQ 版,并选择已创建的云消息队列 RabbitMQ 版的资源信息等,然后单击确定。
步骤二:创建事件规则
重要 目标服务和事件规则必须处于同一地域。
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线。
- 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
- 在左侧导航栏,单击事件规则,然后单击创建规则。
- 在创建规则页面,完成以下操作。
- 在配置基本信息配置向导,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步。
- 在配置事件模式配置向导,事件源类型选择自定义事件源,事件源选择步骤一添加的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步。
如需了解更多信息,请参见事件模式。
- 在配置事件目标配置向导,配置事件目标,然后单击创建。
步骤三:发布事件
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
factory.setHost("xxx.xxx.aliyuncs.com");
// 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
factory.setUsername("${UserName}");
// 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
factory.setPassword("${PassWord}");
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
factory.setVirtualHost("${VhostName}");
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
// 开始发送消息。
for (int i = 0; i < 100; i++ ) {
// ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
// BindingKey根据业务需求填入相应的BindingKey。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
("消息发送Body" + i).getBytes(StandardCharsets.UTF_8));
}
connection.close();
}
}
结果验证
您可以在函数计算控制台使用表盘解读数据指标。
- 登录函数计算控制台。
- 在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域。
- 在服务列表页面,找到目标服务,在其右侧操作列单击函数管理。
- 在函数管理页面,找到目标函数,单击目标函数名称。
- 在函数详情页面,单击调用日志页签,查看日志。
FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6**** 2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject. 2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****
常见问题
事件发布失败,我该如何定位问题?
如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。
发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?
文档内容是否对您有帮助?