本文介绍JMS SDK的获取方式、使用限制和示例代码。

获取方式

消息服务MNS JMS库的获取方式如下:

  • 在Maven项目的pom.xml文件中添加依赖
    <dependency>
        <groupId>com.aliyun.mns</groupId>
        <artifactId>java-messaging-lib</artifactId>
        <version>0.2.0</version>
    </dependency>
  • 下载依赖的JAR文件

使用限制

  • 只支持创建队列和收发消息操作。
  • 只支持AUTO_ACKNOWLEDGEMANUAL_ACKNOWLEDGE两种ACK模式:
    • AUTO_ACKNOWLEDGE
      • MessageListener执行完成且无异常抛出时会自动ACK。
      • MessageListener抛出异常就不会ACK,在VisibilityTimeout后消息会重投。
    • MANUAL_ACKNOWLEDGE:您需要在代码中调用acknowledge() 对消息进行ACK。

示例代码

  • 创建队列
    String accessKeyId = "<yourAccessKeyId>";
    String accessKeySecret = "<yourAccessKeySecret>";
    String endpoint = "<yourEndpoint>";
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    MNSQueueConnection connection = factory.createQueueConnection();
    
    MNSClientWrapper mnsClientWrapper = connection.getMNSClientWrapper();
    mnsClientWrapper.createQueue(queueName);
  • 发送普通消息
    String accessKeyId = "<yourAccessKeyId>";
    String accessKeySecret = "<yourAccessKeySecret>";
    String endpoint = "<yourEndpoint>";
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);
    
        TextMessage textMessage = session.createTextMessage("Hello JMS! ");
        textMessage.setDoubleProperty("TestFloat", 0.127);
        producer.send(textMessage);
    
        System.out.println(textMessage.getJMSMessageID());
    } catch (JMSException e) {
        e.printStackTrace();
    }
  • 发送延迟消息
    String accessKeyId = "<yourAccessKeyId>";
    String accessKeySecret = "<yourAccessKeySecret>";
    String endpoint = "<yourEndpoint>";
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);
    
        TextMessage textMessage = session.createTextMessage("Hello JMS! ");
        textMessage.setDoubleProperty("TestFloat", 0.127);
        //设置DelaySeconds。
        MNSMessageHelper.setDelaySeconds(textMessage, 10);
        producer.send(textMessage);
    
        System.out.println(textMessage.getJMSMessageID());
    } catch (JMSException e) {
        e.printStackTrace();
    }
  • AUTO_ACKNOWLEDGE模式消费
    String accessKeyId = "<yourAccessKeyId>";
    String accessKeySecret = "<yourAccessKeySecret>";
    String endpoint = "<yourEndpoint>";
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, MNSQueueSession.AUTO_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageConsumer consumer = session.createConsumer(queue);
        MessageListener listener = new MessageListener() {
    
            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        MNSTextMessage textMessage = (MNSTextMessage) message;
    
                        System.out.println(new Date() + " Receive in listener: " + textMessage.getText());
                    } else if (message instanceof BytesMessage) {
                        MNSBytesMessage bytesMessage = (MNSBytesMessage) message;
                        char readChar = bytesMessage.readChar();
                        System.out.println("Read Char: " + readChar);
                        int readInt = bytesMessage.readInt();
                        System.out.println("Read Int: " + readInt);
                    } else if (message instanceof ObjectMessage) {
                        MNSObjectMessage objectMessage = (MNSObjectMessage) message;
                        TestSerializable object = (TestSerializable) objectMessage.getObject();
                        System.out.println(object.getName());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        };
    
        consumer.setMessageListener(listener);
        connection.start();
    } catch (Exception e) {
        e.printStackTrace();
    }
  • MANUAL_ACKNOWLEDGE模式消费消息
    String accessKeyId = "<yourAccessKeyId>";
    String accessKeySecret = "<yourAccessKeySecret>";
    String endpoint = "<yourEndpoint>";
    String queueName = "<yourQueueName>";
    
    MNSConnectionFactory factory = MNSConnectionFactory.builder()
        .withAccessKeyId(accessKeyId)
        .withAccessKeySecret(accessKeySecret)
        .withEndpoint(endpoint)
        .build();
    
    try {
        MNSQueueConnection connection = factory.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, MNSQueueSession.MANUAL_ACKNOWLEDGE);
    
        Queue queue = session.createQueue(queueName);
        MessageConsumer consumer = session.createConsumer(queue);
        MessageListener listener = new MessageListener() {
    
            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        MNSTextMessage textMessage = (MNSTextMessage) message;
    
                        System.out.println(new Date() + " Receive in listener: " + textMessage.getText());
                    } else if (message instanceof BytesMessage) {
                        MNSBytesMessage bytesMessage = (MNSBytesMessage) message;
                        char readChar = bytesMessage.readChar();
                        System.out.println("Read Char: " + readChar);
                        int readInt = bytesMessage.readInt();
                        System.out.println("Read Int: " + readInt);
                    } else if (message instanceof ObjectMessage) {
                        MNSObjectMessage objectMessage = (MNSObjectMessage) message;
                        TestSerializable object = (TestSerializable) objectMessage.getObject();
                        System.out.println(object.getName());
                    }
    
                    //消费成功后需手动ACK。
                    message.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        };
    
        consumer.setMessageListener(listener);
        connection.start();
    } catch (Exception e) {
        e.printStackTrace();
    }