本文介绍如何通过事件总线EventBridge将微消息队列MQTT版的数据推送到函数计算。
前提条件
您已完成以下操作:
- 事件总线EventBridge
- 函数计算
- 消息队列RocketMQ版
- 微消息队列MQTT版
注意事项
事件总线EventBridge不支持直接从微消息队列MQTT版的Topic拉取事件。您可以通过微消息队列MQTT版的数据流出功能,将数据流转到消息队列RocketMQ版的Topic中,并通过添加事件总线EventBridge的自定义事件源消息队列RocketMQ版,实现微消息队列MQTT版与事件总线EventBridge的集成。
步骤一:创建数据流出规则
步骤二:创建自定义事件源
- 登录事件总线EventBridge控制台。
- 在左侧导航栏,选择 。
- 在顶部菜单栏,选择地域。
- 在快速添加自定义事件源区域,单击消息队列RocketMQ版。
- 在添加自定义事件源面板,输入名称,输入描述,选择RocketMQ实例,选择Topic,选择自定义总线,然后单击确定。
步骤三:创建事件规则
注意 目标服务和事件规则必须处于同一地域。
- 登录事件总线EventBridge控制台。
- 在左侧导航栏,选择 。
- 在顶部菜单栏,选择地域。
- 在事件规则页面,选择自定义总线,然后单击创建规则。
- 在配置向导页面,完成以下操作。
步骤四:发布事件
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 本代码提供签名鉴权模式下MQ4IOT客户端发送消息到MQ4IOT客户端的示例,其中初始化参数请根据实际情况修改。
* 签名模式即使用阿里云账号系统提供的AccessKey和SecretKey对每个客户端计算出一个独立的签名供客户端识别使用。
* 对于实际业务场景使用过程中,考虑到私钥SecretKey的隐私性,可以将签名过程放在受信任的环境完成。
*
* 完整demo工程,参考https://github.com/AliwareMQ/lmq-demo。
*/
public class MQ4IoTProducerDemo {
public static void main(String[] args) throws Exception {
/**
* MQ4IOT实例ID,购买后控制台获取。
*/
String instanceId = "XXXXX";
/**
* 接入点地址,购买MQ4IOT实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致客户端异常。
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* 账号accesskey,从账号系统控制台获取。
*/
String accessKey = "XXXXX";
/**
* 账号secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置。
*/
String secretKey = "XXXXX";
/**
* MQ4IOT Client Id,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的Client Id会导致连接异常断开。
* Client Id 由两部分组成,格式为GroupID@@@DeviceId,其中groupId在MQ4IOT控制台申请,DeviceId由业务方自己设置,Client Id总长度不得超过64个字符。
*/
String clientId = "GID_XXXXX@@@XXXXX";
/**
* MQ4IOT 消息的一级Topic,需要在控制台申请才能使用。
* 如果使用了没有申请或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
*/
final String parentTopic = "XXXXX";
/**
* MQ4IOT支持子级Topic,用来做自定义的过滤,此处为示意,可以填写任何字符串。
*/
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
/**
* QoS参数代表传输质量,可选0,1,2。
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客户端使用的协议和端口必须匹配。
* 如果SSL加密则设置ssl://endpoint:8883。
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 客户端设置好发送超时时间,防止无限阻塞。
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客户端连接成功后就需要尽快订阅需要的Topic。
*/
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
* 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制。
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
/**
* 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
*/
mqttClient.publish(mq4IotTopic, message);
/**
* MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的Client Id,则可以直接发送点对点消息。
* 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的Topic格式规范是{{parentTopic}}/p2p/{{targetClientId}}。
*/
String receiverId = "xxx";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}
结果验证
您可以在函数计算控制台使用表盘解读数据指标。
常见问题
如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。
发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?