本文介绍如何通过事件总线EventBridge将消息队列RocketMQ版的数据推送到函数计算。
前提条件
您已完成以下操作:
事件总线EventBridge
函数计算
消息队列RocketMQ版
步骤一:添加自定义事件源
- 登录事件总线EventBridge控制台。
- 在左侧导航栏,单击事件总线。
- 在顶部菜单栏,选择地域。
在事件总线页面,单击已创建的自定义事件总线。
- 在左侧导航栏,单击事件源。
在事件源页面,单击添加事件源。
在添加自定义事件源面板,输入名称和描述,事件提供方选择消息队列 RocketMQ 版,并选择已创建的云消息队列 RocketMQ 版的资源信息等,然后单击确定。
步骤二:创建事件规则
重要 目标服务和事件规则必须处于同一地域。
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线。
- 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
- 在左侧导航栏,单击事件规则,然后单击创建规则。
- 在创建规则页面,完成以下操作。
- 在配置基本信息配置向导,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步。
- 在配置事件模式配置向导,事件源类型选择自定义事件源,事件源选择步骤一添加的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步。
如需了解更多信息,请参见事件模式。
- 在配置事件目标配置向导,配置事件目标,然后单击创建。
步骤三:发布事件
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
//设置发送超时时间,单位毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
Message msg = new Message(
// Message所属的Topic。
"TopicTestMQ",
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
// Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
// 异步发送消息,发送结果通过callback返回给客户端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消息发送成功。
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// 在callback返回之前即可取得msgId。
System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
// 在应用退出前,销毁Producer对象。 注意:如果不销毁也没有问题。
producer.shutdown();
}
}
结果验证
您可以在函数计算控制台使用表盘解读数据指标。
登录函数计算控制台。
- 在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域。
- 在服务列表页面,找到目标服务,在其右侧操作列单击函数管理。
- 在函数管理页面,找到目标函数,单击目标函数名称。
- 在函数详情页面,单击调用日志页签,查看日志。
FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6**** 2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject. 2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****
常见问题
事件发布失败,我该如何定位问题?
如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。
发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?
文档内容是否对您有帮助?