本文介绍如何通过事件总线EventBridge微消息队列MQTT版的数据推送到函数计算。

注意事项

事件总线EventBridge不支持直接从微消息队列MQTT版的Topic拉取事件。您可以通过微消息队列MQTT版的数据流出功能,将数据流转到消息队列RocketMQ版的Topic中,并通过事件总线EventBridge消息队列RocketMQ版连接器实现与微消息队列MQTT版的集成。

步骤一:创建数据流出规则

  1. 在左侧导航栏,选择规则管理 > 创建规则
  2. 创建规则面板,单击数据流出页签。
  3. 数据流出页签,按提示填写以下参数,单击确定
    参数 取值示例 说明
    规则ID 111111 规则的全局唯一标识,说明如下:
    • 只能包含字母、数字、横划线(-)和下划线(_),至少包含一个字母或数字。
    • 名称长度限制在3~64字符之间,长于64字符将被自动截取。
    • 创建后无法更新。
    规则描述 migrate to rocketmq 对规则的描述。
    状态 启用 是否启用当前规则,取值说明如下:
    • 启用
    • 禁用
    数据源
    Topic TopicA 指定您需导出数据的源Topic。
    流转目标
    说明 当前仅支持消息队列RocketMQ版
    云产品 消息队列RocketMQ 指定您需将源Topic的数据转发至的目标云产品。
    实例ID MQ_INST_13801563067*****_BbyOD2jQ 指定的目标云产品的实例ID。
    Topic Topic_test 指定的目标云产品的资源键值,即消息队列RocketMQ版的Topic。源Topic的数据将流转至Topic_test。
    您可以在规则管理的规则列表查看到刚创建的数据流出规则。

步骤二:创建自定义事件源

  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,选择事件驱动 > 事件源
  3. 在顶部菜单栏,选择地域。
  4. 快速添加自定义事件源区域,单击消息队列RocketMQ版
  5. 添加自定义事件源面板,输入名称,输入描述,选择RocketMQ实例,选择Topic,选择自定义总线,然后单击确定

步骤三:创建事件规则

注意 目标服务和事件规则必须处于同一地域。
  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,选择事件驱动 > 事件规则
  3. 在顶部菜单栏,选择地域。
  4. 事件规则页面,选择自定义总线,然后单击创建规则
  5. 创建规则页面,完成以下操作。
    1. 配置基本信息页面,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步
    2. 配置事件模式页面,事件源类型选择自定义事件源事件源选择步骤一的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步

      如需了解更多信息,请参见事件模式

    3. 配置事件目标页面,配置事件目标,然后单击创建
      说明 1个事件规则最多可以添加5个目标。
      • 服务类型:单击函数计算
      • 服务:选择已创建的服务。
      • 函数:选择已创建的函数。
      • 事件:单击模板

        以下提供变量和模板的示例。

        变量示例:

        {
          "source":"$.source",
          "type":"$.type"
        }

        模板示例:

        The event comes from ${source},event type is ${type}.

        如需了解更多信息,请参见事件内容转换

      • 服务版本和别名:选择服务版本或服务别名。
        • 默认版本:LATEST。
        • 指定版本:选择服务版本。更多信息,请参见版本简介
        • 指定别名:选择服务别名。更多信息,请参见别名简介

步骤四:发布事件

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 本代码提供签名鉴权模式下MQ4IOT客户端发送消息到MQ4IOT客户端的示例,其中初始化参数请根据实际情况修改。
 * 签名模式即使用阿里云账号系统提供的AccessKey和SecretKey对每个客户端计算出一个独立的签名供客户端识别使用。
 * 对于实际业务场景使用过程中,考虑到私钥SecretKey的隐私性,可以将签名过程放在受信任的环境完成。
 *
 * 完整demo工程,参考https://github.com/AliwareMQ/lmq-demo。
 */
public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT实例ID,购买后控制台获取。
         */
        String instanceId = "XXXXX";
        /**
         * 接入点地址,购买MQ4IOT实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致客户端异常。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * 账号accesskey,从账号系统控制台获取。
         */
        String accessKey = "XXXXX";
        /**
         * 账号secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置。
         */
        String secretKey = "XXXXX";
        /**
         * MQ4IOT Client Id,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的Client Id会导致连接异常断开。
         * Client Id 由两部分组成,格式为GroupID@@@DeviceId,其中groupId在MQ4IOT控制台申请,DeviceId由业务方自己设置,Client Id总长度不得超过64个字符。
         */
        String clientId = "GID_XXXXX@@@XXXXX";
        /**
         * MQ4IOT 消息的一级Topic,需要在控制台申请才能使用。
         * 如果使用了没有申请或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
         */
        final String parentTopic = "XXXXX";
        /**
         * MQ4IOT支持子级Topic,用来做自定义的过滤,此处为示意,可以填写任何字符串。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS参数代表传输质量,可选0,1,2。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配。
         * 如果SSL加密则设置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的Topic。
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的Client Id,则可以直接发送点对点消息。
             * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的Topic格式规范是{{parentTopic}}/p2p/{{targetClientId}}。
             */
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

结果验证

您可以在函数计算控制台使用表盘解读数据指标。

  1. 登录函数计算控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击服务及函数
  4. 服务及函数页面,在服务列表中单击目标服务。
  5. 函数列表页签,找到目标函数,然后单击函数名称列下的目标函数名称。
  6. 单击日志查询页签,查看日志。
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****