本文介绍如何使用Java SDK中的sample代码,完成创建主题、创建订阅、发布消息和接收消息等操作。

步骤一:准备工作

  1. 下载最新版Java SDK,解压到aliyun-sdk-mns-samples文件夹。
  2. 用Eclipse导入Maven工程,选中aliyun-sdk-mns-samples文件夹。
  3. 在用户目录中创建.aliyun-mns.properties文件,并填写服务地址、AccessKey ID和AccessKey Secret。
    说明 Linux系统的用户目录为/home/YOURNAME/,Windows系统的用户目录为C:\Users\YOURNAME
    • AccessKeyId、AccessKeySecret
    • Endpoint
      • 访问消息服务MNS的接入地址,登录MNS控制台查看。具体操作,请参见获取接入点
      • 不同地域的接入地址不同,分为公网以及私网域名。

步骤二:创建主题

创建主题的代码示例如下。详细说明,请参见Topic

public class CreateTopicDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        String topicName = "TestTopic";
        TopicMeta meta = new TopicMeta();
        meta.setTopicName(topicName);

        try {
            CloudTopic topic = client.createTopic(meta);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("create topic error, " + e.getMessage());
        }

        client.close();
    }
}

步骤三:创建队列

创建队列的代码段如下。

public class CreateQueueDemo {

    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient(); 

        try
        {   
            QueueMeta qMeta = new QueueMeta();
            qMeta.setQueueName("queue-demo");
            qMeta.setPollingWaitSeconds(30);
            CloudQueue cQueue = client.createQueue(qMeta);
            System.out.println("Create queue successfully. URL: " + cQueue.getQueueURL());
        } catch (ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availability.");
            ce.printStackTrace();
        } catch (ServiceException se)
        {
            if (se.getErrorCode().equals("QueueNotExist"))
            {
                System.out.println("Queue is not exist.Please create before use");
            } else if (se.getErrorCode().equals("TimeExpired"))
            {
                System.out.println("The request is time expired. Please check your local machine timeclock");
            }
            se.printStackTrace();
        } catch (Exception e)
        {
            System.out.println("Unknown exception happened!");
            e.printStackTrace();
        }

        client.close();
    }

}

步骤四:创建订阅

对已创建的主题进行订阅,在订阅时需要设置对应的推送Endpoint地址(目前仅支持队列)、错误重试策略、推送消息格式等。

public class SubscribeDemo {
    public static void main(String[] args) {
        String region = "";
        String accountId = "";
        String queueName = "TestQueue";
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        CloudTopic topic = client.getTopicRef("TestTopic");
        try {
            SubscriptionMeta subMeta = new SubscriptionMeta();
            subMeta.setSubscriptionName("QueueEndpoint2");
            subMeta.setEndpoint(String.format("acs:mns:%s:%s:queues/%s", region, accountId, queueName));
            subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.XML);
            String subUrl = topic.subscribe(subMeta);
            System.out.println("subscription url: " + subUrl);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("subscribe/unsubribe error");
        }

        client.close();
    }
}        

步骤五:发布消息

完成创建主题和订阅,即可向主题发布消息。

public class PublishMessageDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        CloudTopic topic = client.getTopicRef("TestTopic");
        try {
            TopicMessage msg = new Base64TopicMessage(); //可以使用TopicMessage结构,选择不进行Base64加密。
            msg.setMessageBody("hello world!");
            //msg.setMessageTag("filterTag"); //设置该条发布消息的filterTag。
            msg = topic.publishMessage(msg);
            System.out.println(msg.getMessageId());
            System.out.println(msg.getMessageBodyMD5());
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("subscribe error");
        }

        client.close();
    }
}         

步骤六:从队列接收和删除消息

消息从主题推送到队列后,从队列中取出并删除该条消息。

public class ConsumerDemo {

    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();

        try{
            CloudQueue queue = client.getQueueRef("queue-demo");
            for (int i = 0; i < 10; i++)
            {
                Message popMsg = queue.popMessage();
                if (popMsg != null){
                    System.out.println("message handle: " + popMsg.getReceiptHandle());
                    System.out.println("message body: " + popMsg.getMessageBodyAsString());
                    System.out.println("message id: " + popMsg.getMessageId());
                    System.out.println("message dequeue count:" + popMsg.getDequeueCount());

                    queue.deleteMessage(popMsg.getReceiptHandle());
                    System.out.println("delete message successfully.\n");
                }
            }
        } catch (ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availability.");
            ce.printStackTrace();
        } catch (ServiceException se)
        {
            if (se.getErrorCode().equals("QueueNotExist"))
            {
                System.out.println("Queue is not exist.Please create queue before use");
            } else if (se.getErrorCode().equals("TimeExpired"))
            {
                System.out.println("The request is time expired. Please check your local machine timeclock");
            }

            se.printStackTrace();
        } catch (Exception e)
        {
            System.out.println("Unknown exception happened!");
            e.printStackTrace();
        }

        client.close();
    }
}

步骤七:删除主题

删除测试用的主题。

public class DeleteTopicDemo {
    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全。

        CloudTopic topic = client.getTopicRef("TestTopic");
        try {
            topic.delete();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("delete topic error");
        }

        client.close();
    }
}        

步骤八:删除队列

删除测试用的队列。

public class DeleteQueueDemo {

    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();

        try
        {
            CloudQueue queue = client.getQueueRef("queue-demo");
            queue.delete();
            System.out.println("Delete cloud-queue-demo successfully!");
        } catch (ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availability.");
            ce.printStackTrace();
        } catch (ServiceException se)
        {
            if (se.getErrorCode().equals("QueueNotExist"))
            {
                System.out.println("Queue is not exist.Please create before use");
            } else if (se.getErrorCode().equals("TimeExpired"))
            {
                System.out.println("The request is time expired. Please check your local machine timeclock");
            }
            se.printStackTrace();
        } catch (Exception e)
        {
            System.out.println("Unknown exception happened!");
            e.printStackTrace();
        }

        client.close();
    }

}

FilterTag使用示例

FilterTag的使用示例如下。

package com.aliyun.mns.samples;


import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.*;

public class TopicSample {

    public static void main(String[] args) {
        CloudAccount account = new CloudAccount(
                ServiceSettings.getMNSAccessKeyId(),
                ServiceSettings.getMNSAccessKeySecret(),
                ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();

        // 1.创建队列。
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName("TestSubForQueue");
        CloudQueue queue = client.createQueue(queueMeta);
        // 2.创建主题。
        TopicMeta topicMeta = new TopicMeta();
        topicMeta.setTopicName("TestTopic");
        CloudTopic topic = client.createTopic(topicMeta);
        // 3.创建订阅。
        SubscriptionMeta subMeta = new SubscriptionMeta();
        subMeta.setSubscriptionName("TestForQueueSub");
        subMeta.setNotifyContentFormat(SubscriptionMeta.NotifyContentFormat.SIMPLIFIED);
        subMeta.setEndpoint(topic.generateQueueEndpoint("TestSubForQueue"));
        subMeta.setFilterTag("filterTag");
        topic.subscribe(subMeta);
        // 4.发布消息。
        TopicMessage msg = new Base64TopicMessage();
        msg.setMessageBody("hello world");
        msg.setMessageTag("filterTag");
        msg = topic.publishMessage(msg);
        // 5.从订阅的队列中获取消息。
        Message msgReceive = queue.popMessage(30);
        System.out.println("ReceiveMessage From TestSubForQueue:");
        System.out.println(msgReceive.getMessageBody());
        System.exit(0);
    }
}