本文介绍如何使用Java SDK中的sample代码,完成创建主题、创建订阅、发布消息和接收消息等操作。

步骤一:准备工作

  1. 下载最新版Java SDK,解压到aliyun-sdk-mns-samples文件夹。
  2. 用Eclipse导入Maven工程,选中aliyun-sdk-mns-samples文件夹。
  3. 在用户目录中创建.aliyun-mns.properties文件,并填写服务地址、AccessKey ID和AccessKey Secret。
    说明 Linux系统用户目录为/home/YOURNAME/,Windows系统用户目录为C:\Users\YOURNAME
    • AccessKeyId、AccessKeySecret
    • Endpoint
      • 访问消息服务MNS的接入地址,登录MNS控制台,单击右上角获取Endpoint查看。
      • 不同地域的接入地址不同,分为公网以及私网域名。

步骤二:创建主题

创建主题的代码示例如下。详细说明,请参见Topic

public class CreateTopicDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        String topicName = "TestTopic";
        TopicMeta meta = new TopicMeta();
        meta.setTopicName(topicName);

        try {
            CloudTopic topic = client.createTopic(meta);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("create topic error, " + e.getMessage());
        }

        client.close();
    }
}         

步骤三:创建队列

创建队列的代码段如下。

public class CreateQueueDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。
        String queueName = "TestQueue";
        QueueMeta meta = new QueueMeta(); //生成本地QueueMeta属性。详细说明,请参见Queue。

        meta.setQueueName(queueName);  // 设置队列名。
        meta.setPollingWaitSeconds(15);
        meta.setMaxMessageSize(2048L);

        try {
            CloudQueue queue = client.createQueue(meta);
        } catch (ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                   + "Please check your network and DNS availablity.");
            ce.printStackTrace();
        } catch (ServiceException se)
        {
            se.printStackTrace();
            logger.error("MNS exception requestId:" + se.getRequestId(), se);
            if (se.getErrorCode() != null) {
                if (se.getErrorCode().equals("QueueNotExist"))
                {
                    System.out.println("Queue is not exist.Please create before use");
                } else if (se.getErrorCode().equals("TimeExpired"))
                {
                    System.out.println("The request is time expired. Please check your local machine timeclock");
                }
            //更多错误码信息,请参见错误码。
            }
        } catch (Exception e)
        {
            System.out.println("Unknown exception happened!");
            e.printStackTrace();
        }

        client.close();  // 程序退出时,需主动调用client的close方法进行资源释放。
    }
}

步骤四:创建订阅

对已创建的主题进行订阅,在订阅时需要设置对应的推送Endpoint地址(目前仅支持队列)、错误重试策略、推送消息格式等。

public class SubscribeDemo {
    public static void main(String[] args) {
        String region = "";
        String accountId = "";
        String queueName = "TestQueue";
        CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        CloudTopic topic = client.getTopicRef("TestTopic");
        try {
            SubscriptionMeta subMeta = new SubscriptionMeta();
            subMeta.setSubscriptionName("QueueEndpoint2");
            subMeta.setEndpoint(String.format("acs:mns:%s:%s:queues/%s", region, accountId, queueName));
            subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.XML);
            String subUrl = topic.subscribe(subMeta);
            System.out.println("subscription url: " + subUrl);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("subscribe/unsubribe error");
        }

        client.close();
    }
}           

步骤五:发布消息

完成创建主题和订阅,即可向主题发布消息。

public class PublishMessageDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        CloudTopic topic = client.getTopicRef("TestTopic");
        try {
            TopicMessage msg = new Base64TopicMessage(); //可以使用TopicMessage结构,选择不进行Base64加密。
            msg.setMessageBody("hello world!");
            //msg.setMessageTag("filterTag"); //设置该条发布消息的filterTag。
            msg = topic.publishMessage(msg);
            System.out.println(msg.getMessageId());
            System.out.println(msg.getMessageBodyMD5());
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("subscribe error");
        }

        client.close();
    }
}          

步骤六:从队列接收和删除消息

消息从主题推送到队列后,从队列中取出并删除该条消息。

public class ConsumerDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");

        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        try{
            CloudQueue queue = client.getQueueRef("TestQueue");
            Message popMsg = queue.popMessage();
            if (popMsg != null){
                System.out.println("message handle: " + popMsg.getReceiptHandle());
                System.out.println("message body: " + popMsg.getMessageBodyAsString());
                System.out.println("message id: " + popMsg.getMessageId());
                System.out.println("message dequeue count:" + popMsg.getDequeueCount());

                //删除已经取出消费的消息。
                 queue.deleteMessage(popMsg.getReceiptHandle());
                    System.out.println("delete message successfully.\n");
            }
            else{
                System.out.println("message not exist in TestQueue.\n");
            }
        } catch (ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availablity.");
            ce.printStackTrace();
        } catch (ServiceException se)
        {
            se.printStackTrace();
            logger.error("MNS exception requestId:" + se.getRequestId(), se);
            if (se.getErrorCode() != null) {
                if (se.getErrorCode().equals("QueueNotExist"))
                {
                    System.out.println("Queue is not exist.Please create before use");
                } else if (se.getErrorCode().equals("TimeExpired"))
                {
                    System.out.println("The request is time expired. Please check your local machine timeclock");
                }
             //更多错误码信息,请参见错误码。
            }
        } catch (Exception e)
        {
            System.out.println("Unknown exception happened!");
            e.printStackTrace();
        }

        client.close();
    }
}

步骤七:删除主题

删除测试用的主题。

public class DeleteTopicDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        CloudTopic topic = client.getTopicRef("TestTopic");
        try {
            topic.delete();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("delete topic error");
        }

        client.close();
    }
}          

步骤八:删除队列

删除测试用的队列。

public class DeleteQueueDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");

        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        try{
            CloudQueue queue = client.getQueueRef("TestQueue");
            queue.delete();
        } catch (ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availablity.");
            ce.printStackTrace();
        } catch (ServiceException se)
        {
            se.printStackTrace();
        } catch (Exception e)
        {
            System.out.println("Unknown exception happened!");
            e.printStackTrace();
        }

        client.close();
    }
}

FilterTag使用示例

FilterTag的使用示例如下。

package com.aliyun.mns.samples;


import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.*;

public class TopicSample {

    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();

        // 1.创建队列。
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName("TestSubForQueue");
        CloudQueue queue = client.createQueue(queueMeta);
        // 2.创建主题。
        TopicMeta topicMeta = new TopicMeta();
        topicMeta.setTopicName("TestTopic");
        CloudTopic topic = client.createTopic(topicMeta);
        // 3.创建订阅。
        SubscriptionMeta subMeta = new SubscriptionMeta();
        subMeta.setSubscriptionName("TestForQueueSub");
        subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.SIMPLIFIED);
        subMeta.setEndpoint(topic.generateQueueEndpoint("TestSubForQueue"));
        subMeta.setFilterTag("filterTag");
        topic.subscribe(subMeta);
        // 4.发布消息。
        TopicMessage msg = new Base64TopicMessage();
        msg.setMessageBody("hello world");
        msg.setMessageTag("filterTag");
        msg = topic.publishMessage(msg);
        // 5.从订阅的队列中获取消息。
        Message msgReceive = queue.popMessage(30);
        System.out.println("ReceiveMessage From TestSubForQueue:");
        System.out.println(msgReceive.getMessageBody());
        System.exit(0);
    }
}