微消息队列MQTT版推送事件到函数计算

本文介绍如何通过事件总线EventBridge云消息队列 MQTT 版的数据推送到函数计算。

前提条件

您已完成以下操作:

注意事项

事件总线EventBridge不支持直接从云消息队列 MQTT 版Topic拉取事件。您可以通过云消息队列 MQTT 版的数据流出功能,将数据流转到云消息队列 RocketMQ 版Topic中,并通过添加事件总线EventBridge的自定义事件源云消息队列 RocketMQ 版,实现云消息队列 MQTT 版事件总线EventBridge的集成。

步骤一:创建数据流出规则

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。

  3. 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则
  4. 创建规则页面完成以下操作。
    1. 配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步
      参数取值示例说明
      规则ID111111规则的全局唯一标识,说明如下:
      • 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
      • 名称长度限制在3~64字符之间,长于64字符将被自动截取。
      • 创建后无法更新。
      描述migrate from rocketmq对规则的描述。
      状态启用是否启用当前规则,取值说明如下:
      • 启用
      • 停用
      规则类型数据流出创建的规则类型,取值说明如下:
      • 数据流出:用于将云消息队列 MQTT 版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出
      • 数据流入:用于将其他阿里云产品的数据导入至云消息队列 MQTT 版。详细信息,请参见跨云产品数据流入
      • 上下线通知:用于将获取的云消息队列 MQTT 版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出
    2. 配置规则源配置向导页面,配置数据源,然后单击下一步
      参数取值示例说明
      TopicTopicA指定您需导出数据的源Topic,即云消息队列 MQTT 版Topic。
    3. 配置规则目标配置向导页面,配置数据的流转目标,然后单击创建
      参数取值示例说明
      目标服务类型消息队列 RocketMQ 版指定您需将源Topic的数据转发至的目标云产品。
      说明 当前仅支持云消息队列 RocketMQ 版
      RocketMQ 实例MQ_INST_13801563067*****_BbyOD2jQ指定目标云产品的实例ID,即云消息队列 RocketMQ 版的实例ID。
      说明 仅支持选择和云消息队列 MQTT 版实例为同一地域的云产品实例。
      TopicTopicB指定目标云产品的资源键值,即云消息队列 RocketMQ 版Topic。源Topic的数据将流转至TopicB。
    您可以在规则管理的规则列表查看到刚创建的数据流出规则。

步骤二:创建自定义事件源

  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,单击事件总线
  3. 在顶部菜单栏,选择地域。
  4. 事件总线页面,单击已创建的自定义事件总线。

  5. 在左侧导航栏,单击事件源
  6. 事件源页面,单击添加事件源

  7. 添加自定义事件源面板,输入名称描述事件提供方选择消息队列 RocketMQ 版,并选择已创建的云消息队列 RocketMQ 版的资源信息等,然后单击确定

步骤三:创建事件规则

重要

目标服务和事件规则必须处于同一地域。

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件总线
  2. 在顶部菜单栏,选择地域,在事件总线页面,单击目标总线名称。
  3. 在左侧导航栏,单击事件规则,然后单击创建规则
  4. 创建规则页面,完成以下操作。

    1. 配置基本信息配置向导,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步

    2. 配置事件模式配置向导,事件源类型选择自定义事件源事件源选择步骤一添加的自定义事件源,在事件模式内容代码框输入事件模式,然后单击下一步

      如需了解更多信息,请参见事件模式

    3. 配置事件目标配置向导,配置事件目标,然后单击创建

      说明

      1个事件规则最多可以添加5个目标。

      配置项

      说明

      服务类型

      在下拉列表中选择函数计算

      函数

      在下拉列表中选择已创建的函数。

      事件

      支持完整事件部分事件固定值模板四种事件类型,本文以模板类型为例进行介绍说明。具体事件类型的介绍,请参见事件内容转换

      以下提供的是变量模板示例。

      变量示例:

      {
        "source":"$.source",
        "type":"$.type"
      }

      模板示例:

      The event comes from ${source},event type is ${type}.

      版本和别名

      支持指定函数版本或指定函数别名:

      • 如果您选择指定版本,需要选择函数的具体版本。

      • 如果您选择指定别名,需要选择函数的具体别名。

      执行方式

      支持以下两种执行方式,具体信息,请参见同步调用异步调用功能概览

      • 同步:同步调用是事件被函数处理后直接返回结果。

      • 异步:异步调用是函数计算系统接收异步调用请求后,将请求持久化后会立即返回响应,而不是等待请求执行完成后再返回。

      投递方式

      支持以下两种投递方式:

      • Object格式:如果您选用此格式,事件将会以对象(Object) 格式向下游函数进行投递。

      • ObjectList格式:如果您选用此格式,事件将会以对象数组(Array)格式向下游函数进行投递。

      说明

      此功能为非必选项,如果您不选择投递格式,则默认事件将以Object格式向下游函数进行投递。

      重试和死信

      请参见重试和死信文档进行设置。

步骤四:发布事件

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 本代码提供签名鉴权模式下MQ4IOT客户端发送消息到MQ4IOT客户端的示例,其中初始化参数请根据实际情况修改。
 * 签名模式即使用阿里云账号系统提供的AccessKey和SecretKey对每个客户端计算出一个独立的签名供客户端识别使用。
 * 对于实际业务场景使用过程中,考虑到私钥SecretKey的隐私性,可以将签名过程放在受信任的环境完成。
 *
 * 完整demo工程,参考https://github.com/AliwareMQ/lmq-demo。
 */
public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT实例ID,购买后控制台获取。
         */
        String instanceId = "XXXXX";
        /**
         * 接入点地址,购买MQ4IOT实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致客户端异常。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * 账号accesskey,从账号系统控制台获取。
         */
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        /**
         * 账号secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置。
         */
        String secretKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        /**
         * MQ4IOT Client Id,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的Client Id会导致连接异常断开。
         * Client Id 由两部分组成,格式为GroupID@@@DeviceId,其中groupId在MQ4IOT控制台申请,DeviceId由业务方自己设置,Client Id总长度不得超过64个字符。
         */
        String clientId = "GID_XXXXX@@@XXXXX";
        /**
         * MQ4IOT 消息的一级Topic,需要在控制台申请才能使用。
         * 如果使用了没有申请或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
         */
        final String parentTopic = "XXXXX";
        /**
         * MQ4IOT支持子级Topic,用来做自定义的过滤,此处为示意,可以填写任何字符串。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS参数代表传输质量,可选0,1,2。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配。
         * 如果SSL加密则设置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的Topic。
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的Client Id,则可以直接发送点对点消息。
             * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的Topic格式规范是{{parentTopic}}/p2p/{{targetClientId}}。
             */
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

结果验证

您可以在函数计算控制台使用表盘解读数据指标。

  1. 登录函数计算控制台

  2. 在左侧导航栏,单击函数,然后在顶部菜单栏选择目标地域。

  3. 函数页面,单击目标函数名称。

  4. 在目标函数详情页面,单击日志页签,然后单击函数日志,即可查看目标函数的日志信息。

    2n968ZJ9Lj

常见问题

事件发布失败,我该如何定位问题?

如果事件发布失败,您可以查看事件轨迹,在事件轨迹页面的事件投递区域查看投递详情,获取投递响应。针对不同投递响应提示,采取相应的解决措施。

发布到函数计算的事件发布失败,且投递响应为[500]ConnectErrorconnectiontimedout,我该如何处理?

您可以按照以下步骤处理:
  1. 登录函数计算控制台,执行目标函数并观察执行时间。
  2. 如果函数执行时间大于15s,请排查网络问题;如果函数执行时间小于15s,请确认您是否可以访问函数计算服务所属地域的Endpoint。
  3. 如您不能访问当前函数计算服务所属地域的Endpoint,请联系函数计算工程师处理。