文档

示例代码

更新时间:
一键部署

云消息队列 RocketMQ 版5.x版本实例可兼容Java ONS 1.x SDK客户端接入,您可以使用ONS 1.x SDK的接入5.x实例进行消息收发。本文为您介绍Java ONS 1.x SDK消息收发示例代码。

重要
  • 推荐您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作为主力研发版本,和云消息队列 RocketMQ 版5.x服务端完全兼容,提供了更全面的功能并支持更多增强特性。更多信息,请参见5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK后续仅做功能维护,建议仅存量业务使用。

Serverless版实例公网访问版本说明

Serverless版实例使用公网访问接入云消息队列 RocketMQ 版时,需要保证使用的Java ONS 1.x SDK版本为1.9.0.Final及以上版本,并在消息收发代码中补充如下内容:

properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");

说明

其中,InstanceId需要替换为您实际使用的实例ID。

普通消息收发示例

发送普通消息(同步发送)

    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            /**
            * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
            * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
            * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
            * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
            */
            // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
            properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
            // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
            properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
            //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
 
            // 设置发送超时时间,单位:毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
            Producer producer = ONSFactory.createProducer(properties);
            // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
            producer.start();

            // 循环发送消息。
            for (int i = 0; i < 100; i++){
                Message msg = new Message( 
                    // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                    // 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
                    "TopicTestMQ",
                    // Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    // Tag的具体格式和设置方法,请参见消息过滤。
                    "TagA",
                    // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
                    // 需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());
                // 设置代表消息的业务关键属性,请尽可能全局唯一。
                // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
                // 注意:不设置也不会影响消息正常收发。
                msg.setKey("ORDERID_" + i);

                try {
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                }
                catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }

            // 在应用退出前,销毁Producer对象。
            // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
            producer.shutdown();
        }
    }               

发送普通消息(异步发送)

    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.OnExceptionContext;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendCallback;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;
    import java.util.concurrent.TimeUnit;

    public class ProducerTest {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
            /**
            * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
            * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
            * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
            * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
            */
            // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
            properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
            // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
            properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
            //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
      
            //设置发送超时时间,单位毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

            Producer producer = ONSFactory.createProducer(properties);
            // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
            producer.start();

            Message msg = new Message(
                    // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                    // 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
                    "TopicTestMQ",
                    // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    "TagA",
                    // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());

            // 设置代表消息的业务关键属性,请尽可能全局唯一。 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey("ORDERID_100");

            // 异步发送消息, 发送结果通过callback返回给客户端。
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    // 消息发送成功。
                    System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                }

                @Override
                public void onException(OnExceptionContext context) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                }
            });

            // 阻塞当前线程3秒,等待异步发送结果。
            TimeUnit.SECONDS.sleep(3);

            // 在应用退出前,销毁Producer对象。
            // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
            producer.shutdown();
        }
    }                

发送普通消息(单向发送)

import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            /**
            * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
            * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
            * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
            * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
            */
            // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
            properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
            // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
            properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
            //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
  
            // 设置发送超时时间,单位:毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

            Producer producer = ONSFactory.createProducer(properties);
            // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
            producer.start();
            // 循环发送消息。
            for (int i = 0; i < 100; i++){
                Message msg = new Message(
                        // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                        // 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
                        "TopicTestMQ",
                        // Message Tag,
                        // 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                        "TagA",
                        // Message Body
                        // 任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                        "Hello MQ".getBytes());

                // 设置代表消息的业务关键属性,请尽可能全局唯一。
                // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
                // 注意:不设置也不会影响消息正常收发。
                msg.setKey("ORDERID_" + i);

                // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
                producer.sendOneway(msg);
            }

            // 在应用退出前,销毁Producer对象。
            // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
            producer.shutdown();
        }
    }

发送普通消息(多线程发送)

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // producer实例配置初始化。
        Properties properties = new Properties();
        // 设置为您在消息队列RocketMQ版控制台创建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置发送超时时间,单位:毫秒。 
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();

        // 创建的Producer和Consumer对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。

        // thread和anotherThread共享Producer对象,并发地发送消息至消息队列RocketMQ版。
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(
                    // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                    // 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
                    "TopicTestMQ",
                    // Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    "TagA",
                    // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
                    // 需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // (可选)Producer实例若不再使用时,可将Producer关闭,进行资源释放。
        // producer.shutdown();
    }
}

订阅普通消息(PushConsumer)

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
   public static void main(String[] args) {
       Properties properties = new Properties();
        // 设置为您在消息队列RocketMQ版控制台创建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
          // 集群订阅方式(默认)。
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
          // 广播订阅方式。
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

       Consumer consumer = ONSFactory.createConsumer(properties);
       consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag。
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

        // 订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // 订阅全部Tag。
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

       consumer.start();
       System.out.println("Consumer Started");
   }
}            

订阅普通消息(PushConsumer批量消费)

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        // 设置为您在消息队列RocketMQ版控制台创建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
        
        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        // 设置批量消费最大消息数量,当指定Topic的消息数量已经攒够128条,SDK立即执行回调进行消费。默认值:32,取值范围:1~1024。
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // 设置批量消费最大等待时长,当等待时间达到10秒,SDK立即执行回调进行消费。默认值:0,取值范围:0~450,单位:秒。
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Batch-size: %d\n", messages.size());
           // 批量消息处理。
                return Action.CommitMessage;
            }
        });
        // 启动batchConsumer。
        batchConsumer.start();
        System.out.println("Consumer start success.");

        // 等待固定时间防止进程退出。
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}         

订阅普通消息(PullConsumer)

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class PullConsumerClient {
    public static void main(String[] args){
        Properties properties = new Properties();
        // 设置为您在消息队列RocketMQ版控制台创建的Group ID。
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        PullConsumer consumer = ONSFactory.createPullConsumer(properties);
        // 启动Consumer。
        consumer.start();
        // 获取topic-xxx下的所有分区。
        Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
        // 指定需要拉取消息的分区。
        consumer.assign(topicPartitions);

        while (true) {
            // 拉取消息,超时时间为3000 ms。
            List<Message> messages = consumer.poll(3000);
            System.out.printf("Received message: %s %n", messages);
        }
    }
}

顺序消息收发示例

发送顺序消息

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Properties;


public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置为您在消息队列RocketMQ版控制台创建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                    "Order_global_topic",
                    // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    "TagA",
                    // Message Body,可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "send order global msg".getBytes()
            );
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey(orderId);
            // 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
            // 全局顺序消息,该字段可以设置为任意非空字符串。
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // 发送消息,只要不抛异常就是成功。
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // 在应用退出前,销毁Producer对象。
        // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
        producer.shutdown();
    }

}

订阅顺序消息

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置为您在消息队列RocketMQ版控制台创建的Group ID。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        // 顺序消息消费失败进行重试前的等待时间,单位(毫秒),取值范围:10毫秒~30000毫秒。
        properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
        // 消息消费失败时的最大重试次数。
        properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

        // 在订阅消息前,必须调用start方法来启动Consumer,只需调用一次即可。
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                "Order_global_topic",
                // 订阅指定Topic下的Tags:
                // 1. * 表示订阅所有消息。
                // 2. TagA || TagB || TagC表示订阅TagA或TagB或TagC的消息。
                "*",
                new MessageOrderListener() {
                    /**
                     * 1. 消息消费处理失败或者处理出现异常,返回OrderAction.Suspend。
                     * 2. 消息处理成功,返回OrderAction.Success。
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}

定时消息收发示例

发送定时消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();
        Message msg = new Message(
                // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                "Topic",
                // Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                "tag",
                // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());
        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发。
        msg.setKey("ORDERID_100");

        try {
            // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如2016-03-07 16:21:00投递。如果被设置成当前时间戳之前的某个时刻,消息将立即被投递给消费者。
            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();

            msg.setStartDeliverTime(timeStamp);
            // 发送消息,只要不抛异常就是成功。
            SendResult sendResult = producer.send(msg);
            System.out.println("Message Id:" + sendResult.getMessageId());
        }
        catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
        producer.shutdown();
    }
}       

订阅定时消息

订阅定时消息的示例代码和订阅普通消息一样,请参见订阅普通消息

延时消息收发示例

发送延时消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();
        Message msg = new Message( 
                // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
                "Topic",
                // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版服务器过滤。
                "tag",
                // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                "Hello MQ".getBytes());
        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发。
        msg.setKey("ORDERID_100");
        try {
            // 延时消息,在指定延迟时间(当前时间之后)进行投递。最大可设置延迟40天投递,单位毫秒(ms)。
            // 以下示例表示消息在3秒后投递。
            long delayTime = System.currentTimeMillis() + 3000;

            // 设置消息需要被投递的时间。
            msg.setStartDeliverTime(delayTime);

            SendResult sendResult = producer.send(msg);
            // 同步发送消息,只要不抛异常就是成功。
            if (sendResult != null) {
            System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
            }
            } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }
        // 在应用退出前,销毁Producer对象。
        // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
        producer.shutdown();
    }
}           

订阅延时消息

订阅延时消息的示例代码和订阅普通消息一样,请参见订阅普通消息

事务消息收发示例

发送事务消息

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 您在消息队列RocketMQ版控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        
        /**
        * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        // 设置为消息队列RocketMQ版控制台实例详情页的实例用户名。
        properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
        // 设置为消息队列RocketMQ版控制台实例详情页的实例密码。
        properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
        //注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

        // 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");

        // 初始化事务消息Producer时,需要注册一个本地事务状态的Checker。
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try{
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("执行本地事务,并根据本地事务的状态提交TransactionStatus。");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            }catch (ONSClientException e){
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}
// 本地事务检查器。
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
   
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("收到事务消息的回查请求,MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

订阅事务消息

订阅事务消息的示例代码和订阅普通消息一样,请参见订阅普通消息

  • 本页导读 (1)
文档反馈