本文介绍如何使用MNS实现一对多拉取消息消费模型,以解决同时满足一对多订阅、主动拉取的场景。

背景描述

MNS已经提供队列(Queue)和主题(Topic)两种模型,基本能满足大多数应用场景。其中:
  • 队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull)模式;
  • 主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push)模式。

推送模式的好处是即时性能较好,但需暴露客户端地址来接收服务端的消息推送。有些情况下有的信息,例如企业内网,无法暴露推送地址,希望改用拉取(Pull)的方式。虽然MNS不直接提供这种消费模型,但可以结合主题和队列来实现一对多的拉取消息消费模型。

解决方案

通过创建订阅,让主题将消息先推送到队列,然后由消费者从队列拉取消息。这样既可以做到一对多的广播消息,又可避免暴露消费者的地址,如下图所示。消息流

接口说明

最新的Java SDK(1.1.5)中的CloudPullTopic默认支持上述解决方案。其中MNSClient提供以下接口来快速创建CloudPullTopic:

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
参数说明如下:
  • TopicMeta表示创建主题的Meta设置;
  • queueNameList里指定主题消息推送的队列名列表;
  • needCreateQueue表明queueNameList是否需要创建;
  • queueMetaTemplate是创建队列需要的QueueMeta参数设置。

示例代码

CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient();

        // build consumer name list.
        Vector<String> consumerNameList = new Vector<String>();
        String consumerName1 = "consumer001";
        String consumerName2 = "consumer002";
        String consumerName3 = "consumer003";
        consumerNameList.add(consumerName1);
        consumerNameList.add(consumerName2);
        consumerNameList.add(consumerName3);
        QueueMeta queueMetaTemplate = new QueueMeta();
        queueMetaTemplate.setPollingWaitSeconds(30);

        try{
            //producer code:
            // create pull topic which will send message to 3 queues for consumer.
            String topicName = "demo-topic-for-pull";
            TopicMeta topicMeta = new TopicMeta();
            topicMeta.setTopicName(topicName);
            CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

            //publish message and consume message.
            String messageBody = "broadcast message to all the consumers:hello the world.";
            // if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly.
            TopicMessage tMessage = new RawTopicMessage(); 
            tMessage.setBaseMessageBody(messageBody);
            pullTopic.publishMessage(tMessage);

            // consumer code:
            //3 consumers receive the message.
            CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
            CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
            CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

            Message consumer1Msg = queueForConsumer1.popMessage(30);
            if(consumer1Msg != null) 
            {
                System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            Message consumer2Msg = queueForConsumer2.popMessage(30);
            if(consumer2Msg != null) 
            {
                System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            Message consumer3Msg = queueForConsumer3.popMessage(30);
            if(consumer3Msg != null) 
            {
                System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            // delete the pullTopic.
            pullTopic.delete();
        }catch(ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availablity.");
            ce.printStackTrace();
        }
        catch(ServiceException se)
        {
            /*you can get more MNS service error code in following link.
              https://help.aliyun.com/document_detail/mns/api_reference/error_code/error_code.html?spm=5176.docmns/api_reference/error_code/error_response
            */
            se.printStackTrace();
        }

        client.close();