本文介绍如何通过事件总线EventBridge将云消息队列 MQTT 版的数据推送到函数计算。
前提条件
您已完成以下操作:
事件总线EventBridge
函数计算
云消息队列 RocketMQ 版
云消息队列 MQTT 版
注意事项
事件总线EventBridge不支持直接从云消息队列 MQTT 版的Topic拉取事件。您可以通过云消息队列 MQTT 版的数据流出功能,将数据流转到云消息队列 RocketMQ 版的Topic中,并通过添加事件总线EventBridge的自定义事件源云消息队列 RocketMQ 版,实现云消息队列 MQTT 版与事件总线EventBridge的集成。
步骤一:创建数据流出规则
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则。
 - 在创建规则页面完成以下操作。
- 在配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步。
参数 取值示例 说明 规则ID 111111 规则的全局唯一标识,说明如下: - 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
 - 名称长度限制在3~64字符之间,长于64字符将被自动截取。
 - 创建后无法更新。
 
描述 migrate from rocketmq 对规则的描述。 状态 启用 是否启用当前规则,取值说明如下: - 启用
 - 停用
 
规则类型 数据流出 创建的规则类型,取值说明如下: - 数据流出:用于将云消息队列 MQTT 版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出。
 - 数据流入:用于将其他阿里云产品的数据导入至云消息队列 MQTT 版。详细信息,请参见跨云产品数据流入。
 - 上下线通知:用于将获取的云消息队列 MQTT 版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出。
 
 - 在配置规则源配置向导页面,配置数据源,然后单击下一步。
参数 取值示例 说明 Topic TopicA 指定您需导出数据的源Topic,即云消息队列 MQTT 版的Topic。  - 在配置规则目标配置向导页面,配置数据的流转目标,然后单击创建。
参数 取值示例 说明 目标服务类型 消息队列 RocketMQ 版 指定您需将源Topic的数据转发至的目标云产品。 说明 当前仅支持云消息队列 RocketMQ 版。RocketMQ 实例 MQ_INST_13801563067*****_BbyOD2jQ 指定目标云产品的实例ID,即云消息队列 RocketMQ 版的实例ID。 说明 仅支持选择和云消息队列 MQTT 版实例为同一地域的云产品实例。Topic TopicB 指定目标云产品的资源键值,即云消息队列 RocketMQ 版的Topic。源Topic的数据将流转至TopicB。  
您可以在规则管理的规则列表查看到刚创建的数据流出规则。 - 在配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步。
 
步骤二:创建自定义事件源
- 登录事件总线EventBridge控制台。
 - 在左侧导航栏,单击事件总线。
 - 在顶部菜单栏,选择地域。
 在事件总线页面,单击已创建的自定义事件总线。
- 在左侧导航栏,单击事件源。
 在事件源页面,单击添加事件源。
在添加自定义事件源面板,输入名称和描述,事件提供方选择消息队列 RocketMQ 版,并选择已创建的云消息队列 RocketMQ 版的资源信息等,然后单击确定。
步骤三:创建事件规则
目标服务和事件规则必须处于同一地域。
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线。
 - 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
 - 在左侧导航栏,单击事件规则,然后单击创建规则。
 在创建规则页面,完成以下操作。
在配置基本信息配置向导,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步。
在配置事件模式配置向导,事件源类型选择自定义事件源,事件源选择步骤一添加的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步。
如需了解更多信息,请参见事件模式。
在配置事件目标配置向导,配置事件目标,然后单击创建。
说明1个事件规则最多可以添加5个目标。
配置项
说明
服务类型
在下拉列表中选择函数计算。
函数
在下拉列表中选择已创建的函数。
事件
支持完整事件、部分事件、固定值和模板四种事件类型,本文以模板类型为例进行介绍说明。具体事件类型的介绍,请参见事件内容转换。
以下提供的是变量和模板示例。
变量示例:
{ "source":"$.source", "type":"$.type" }模板示例:
The event comes from ${source},event type is ${type}.版本和别名
支持指定函数版本或指定函数别名:
如果您选择指定版本,需要选择函数的具体版本。
如果您选择指定别名,需要选择函数的具体别名。
执行方式
支持以下两种执行方式,具体信息,请参见同步调用和异步调用功能概览。
同步:同步调用是事件被函数处理后直接返回结果。
异步:异步调用是函数计算系统接收异步调用请求后,将请求持久化后会立即返回响应,而不是等待请求执行完成后再返回。
投递方式
支持以下两种投递方式:
Object格式:如果您选用此格式,事件将会以对象(Object) 格式向下游函数进行投递。
ObjectList格式:如果您选用此格式,事件将会以对象数组(Array)格式向下游函数进行投递。
说明此功能为非必选项,如果您不选择投递格式,则默认事件将以Object格式向下游函数进行投递。
重试和死信
请参见重试和死信文档进行设置。
步骤四:发布事件
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * 本代码提供签名鉴权模式下MQ4IOT客户端发送消息到MQ4IOT客户端的示例,其中初始化参数请根据实际情况修改。
 * 签名模式即使用阿里云账号系统提供的AccessKey和SecretKey对每个客户端计算出一个独立的签名供客户端识别使用。
 * 对于实际业务场景使用过程中,考虑到私钥SecretKey的隐私性,可以将签名过程放在受信任的环境完成。
 *
 * 完整demo工程,参考https://github.com/AliwareMQ/lmq-demo。
 */
public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT实例ID,购买后控制台获取。
         */
        String instanceId = "XXXXX";
        /**
         * 接入点地址,购买MQ4IOT实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致客户端异常。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * 账号accesskey,从账号系统控制台获取。
         */
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        /**
         * 账号secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置。
         */
        String secretKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        /**
         * MQ4IOT Client Id,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的Client Id会导致连接异常断开。
         * Client Id 由两部分组成,格式为GroupID@@@DeviceId,其中groupId在MQ4IOT控制台申请,DeviceId由业务方自己设置,Client Id总长度不得超过64个字符。
         */
        String clientId = "GID_XXXXX@@@XXXXX";
        /**
         * MQ4IOT 消息的一级Topic,需要在控制台申请才能使用。
         * 如果使用了没有申请或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
         */
        final String parentTopic = "XXXXX";
        /**
         * MQ4IOT支持子级Topic,用来做自定义的过滤,此处为示意,可以填写任何字符串。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS参数代表传输质量,可选0,1,2。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配。
         * 如果SSL加密则设置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的Topic。
                 */
                System.out.println("connect success");
            }
            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }
            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }
            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的Client Id,则可以直接发送点对点消息。
             * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的Topic格式规范是{{parentTopic}}/p2p/{{targetClientId}}。
             */
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}结果验证
您可以在函数计算控制台使用表盘解读数据指标。
登录函数计算控制台。
在左侧导航栏,单击函数,然后在顶部菜单栏选择目标地域。
在函数页面,单击目标函数名称。
在目标函数详情页面,单击日志页签,然后单击函数日志,即可查看目标函数的日志信息。

常见问题
事件发布失败,我该如何定位问题?
如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。