文档

JMS SDK

更新时间:

您可以查看JMS SDK的获取方式、使用限制和示例代码。

准备工作

在本地目录中创建.aliyun-mns.properties文件,并填写服务地址、AccessKeyId和AccessKeySecret。

说明

Linux系统本地目录为/home/YOURNAME/,Windows系统本地目录为C:\Users\YOURNAME

mns.accountendpoint=http://<yourAccountId>.mns.cn-hangzhou.aliyuncs.com
mns.accesskeyid=<yourAccessKeyId>
mns.accesskeysecret=<yourAccessKeySecret>        
  • AccessKeyId、AccessKeySecret

  • Endpoint

    • 访问消息服务MNS的接入地址,登录MNS控制台查看。具体操作,请参见获取接入点

    • 不同地域的接入地址不同,分为公网和私网域名。

获取方式

消息服务MNS JMS库的获取方式如下:

  • 在Maven项目的pom.xml文件中添加依赖

    <dependency>
        <groupId>com.aliyun.mns</groupId>
        <artifactId>java-messaging-lib</artifactId>
        <version>0.2.0</version>
    </dependency>
  • 下载依赖的JAR文件

使用限制

  • 只支持创建队列和收发消息操作。

  • 只支持AUTO_ACKNOWLEDGEMANUAL_ACKNOWLEDGE两种ACK模式:

    • AUTO_ACKNOWLEDGE

      • MessageListener执行完成且无异常抛出时会自动ACK。

      • MessageListener抛出异常就不会ACK,在VisibilityTimeout后消息会重投。

    • MANUAL_ACKNOWLEDGE:您需要在代码中调用acknowledge() 对消息进行ACK。

示例代码

  • 创建队列

    String accessKeyId = ServiceSettings.getMNSAccessKeyId();
    String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
    String endpoint = ServiceSettings.getMNSAccountEndpoint();
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    MNSQueueConnection connection = factory.createQueueConnection();
    
    MNSClientWrapper mnsClientWrapper = connection.getMNSClientWrapper();
    mnsClientWrapper.createQueue(queueName);
  • 发送普通消息

    String accessKeyId = ServiceSettings.getMNSAccessKeyId();
    String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
    String endpoint = ServiceSettings.getMNSAccountEndpoint();
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);
    
        TextMessage textMessage = session.createTextMessage("Hello JMS! ");
        textMessage.setDoubleProperty("TestFloat", 0.127);
        producer.send(textMessage);
    
        System.out.println(textMessage.getJMSMessageID());
    } catch (JMSException e) {
        e.printStackTrace();
    }
  • 发送延迟消息

    String accessKeyId = ServiceSettings.getMNSAccessKeyId();
    String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
    String endpoint = ServiceSettings.getMNSAccountEndpoint();
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);
    
        TextMessage textMessage = session.createTextMessage("Hello JMS! ");
        textMessage.setDoubleProperty("TestFloat", 0.127);
        //设置DelaySeconds。
        MNSMessageHelper.setDelaySeconds(textMessage, 10);
        producer.send(textMessage);
    
        System.out.println(textMessage.getJMSMessageID());
    } catch (JMSException e) {
        e.printStackTrace();
    }
  • AUTO_ACKNOWLEDGE模式消费

    String accessKeyId = ServiceSettings.getMNSAccessKeyId();
    String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
    String endpoint = ServiceSettings.getMNSAccountEndpoint();
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, MNSQueueSession.AUTO_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageConsumer consumer = session.createConsumer(queue);
        MessageListener listener = new MessageListener() {
    
            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        MNSTextMessage textMessage = (MNSTextMessage) message;
    
                        System.out.println(new Date() + " Receive in listener: " + textMessage.getText());
                    } else if (message instanceof BytesMessage) {
                        MNSBytesMessage bytesMessage = (MNSBytesMessage) message;
                        char readChar = bytesMessage.readChar();
                        System.out.println("Read Char: " + readChar);
                        int readInt = bytesMessage.readInt();
                        System.out.println("Read Int: " + readInt);
                    } else if (message instanceof ObjectMessage) {
                        MNSObjectMessage objectMessage = (MNSObjectMessage) message;
                        TestSerializable object = (TestSerializable) objectMessage.getObject();
                        System.out.println(object.getName());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        };
    
        consumer.setMessageListener(listener);
        connection.start();
    } catch (Exception e) {
        e.printStackTrace();
    }
  • MANUAL_ACKNOWLEDGE模式消费消息

    String accessKeyId = ServiceSettings.getMNSAccessKeyId();
    String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
    String endpoint = ServiceSettings.getMNSAccountEndpoint();
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, MNSQueueSession.MANUAL_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageConsumer consumer = session.createConsumer(queue);
        MessageListener listener = new MessageListener() {
    
            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        MNSTextMessage textMessage = (MNSTextMessage) message;
    
                        System.out.println(new Date() + " Receive in listener: " + textMessage.getText());
                    } else if (message instanceof BytesMessage) {
                        MNSBytesMessage bytesMessage = (MNSBytesMessage) message;
                        char readChar = bytesMessage.readChar();
                        System.out.println("Read Char: " + readChar);
                        int readInt = bytesMessage.readInt();
                        System.out.println("Read Int: " + readInt);
                    } else if (message instanceof ObjectMessage) {
                        MNSObjectMessage objectMessage = (MNSObjectMessage) message;
                        TestSerializable object = (TestSerializable) objectMessage.getObject();
                        System.out.println(object.getName());
                    }
    
                    //消费成功后需手动ACK。
                    message.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        };
    
        consumer.setMessageListener(listener);
        connection.start();
    } catch (Exception e) {
        e.printStackTrace();
    }
  • 本页导读 (1)
文档反馈