本文介绍如何通过事件总线EventBridge将消息队列RocketMQ版的数据推送到函数计算。
前提条件
您已完成以下操作:
- 事件总线EventBridge
- 函数计算
- 消息队列RocketMQ版
步骤一:添加自定义事件源
- 登录事件总线EventBridge控制台。
- 在左侧导航栏,单击事件总线。
- 在顶部菜单栏,选择地域。
- 在事件总线页面,单击已创建的自定义事件总线。
- 在左侧导航栏,单击事件源。
- 在事件源页面,单击添加事件源。
- 在添加自定义事件源面板,输入名称和描述,事件提供方选择消息队列 RocketMQ 版,并选择已创建的消息队列RocketMQ版的资源信息等,然后单击确定。
步骤二:创建事件规则
注意 目标服务和事件规则必须处于同一地域。
- 登录事件总线EventBridge控制台。
- 在左侧导航栏,单击事件总线。
- 在顶部菜单栏,选择地域。
- 在事件总线页面,单击目标总线名称。
- 在左侧导航栏,单击事件规则。
- 在事件规则页面,单击创建规则。
- 在创建规则页面,完成以下操作。
步骤三:发布事件
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.OnExceptionContext; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendCallback; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。 properties.put(PropertyKeyConst.AccessKey, "XXX"); // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。 properties.put(PropertyKeyConst.SecretKey, "XXX"); //设置发送超时时间,单位毫秒。 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。 producer.start(); Message msg = new Message( // Message所属的Topic。 "TopicTestMQ", // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。 "TagA", // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一。以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey("ORDERID_100"); // 异步发送消息,发送结果通过callback返回给客户端。 producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { // 消息发送成功。 System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId()); } }); // 在callback返回之前即可取得msgId。 System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID()); // 在应用退出前,销毁Producer对象。 注意:如果不销毁也没有问题。 producer.shutdown(); } }
using System; using ons; public class ProducerExampleForEx { public ProducerExampleForEx() { } static void Main(string[] args) { // 配置账号,从控制台获取设置。 ONSFactoryProperty factoryInfo = new ONSFactoryProperty(); // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。 factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key"); // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。 factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret"); // 您在控制台创建的Group ID。 factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example"); // 您在控制台创建的Topic。 factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name"); // 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。 factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr"); // 设置日志路径。 factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log"); // 创建生产者实例。 // 说明:生产者实例是线程安全的,可用于发送不同Topic的消息。基本上,您每一个线程只需要一个生产者实例。 Producer producer = ONSFactory.getInstance().createProducer(factoryInfo); // 启动客户端实例。 producer.start(); // 创建消息对象。 Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body"); msg.setKey(Guid.NewGuid().ToString()); for (int i = 0; i < 32; i++) { try { SendResultONS sendResult = producer.send(msg); Console.WriteLine("send success {0}", sendResult.getMessageId()); } catch (Exception ex) { Console.WriteLine("send failure{0}", ex.ToString()); } } // 在您的线程即将退出时,关闭生产者实例。 producer.shutdown(); } }
#include "ONSFactory.h" #include "ONSClientException.h" using namespace ons; int main() { //创建Producer,并配置发送消息所必需的信息。 ONSFactoryProperty factoryInfo; factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");//您在控制台创建的Group ID。 factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。 factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// 在控制台创建的Topic。 factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//消息内容。 factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//AccessKeyId阿里云身份验证,在阿里云服务器管理控制台创建。 factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );//AccessKeySecret阿里云身份验证,在阿里云服务器管理控制台创建。 //create producer; Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo); //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。 pProducer->start(); Message msg( //Message Topic factoryInfo.getPublishTopics(), //Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。 "TagA", //Message Body,不能为空,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。 factoryInfo.getMessageContent() ); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey("ORDERID_100"); //发送消息,只要不抛出异常,就代表发送成功。 try { SendResultONS sendResult = pProducer->send(msg); } catch(ONSClientException & e) { //自定义处理exception的细节。 } // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。 pProducer->shutdown(); return 0; }
结果验证
您可以在函数计算控制台使用表盘解读数据指标。
常见问题
如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。
发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?