云消息队列 RocketMQ 版5.x版本实例可兼容Java ONS 1.x SDK客户端接入,您可以使用ONS 1.x SDK的接入5.x实例进行消息收发。本文为您介绍Java ONS 1.x SDK消息收发示例代码。
重要
- 推荐您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作为主力研发版本,和云消息队列 RocketMQ 版5.x服务端完全兼容,提供了更全面的功能并支持更多增强特性。更多信息,请参见5.x系列SDK。
- RocketMQ 4.x/3.x系列SDK和ONS系列SDK后续仅做功能维护,建议仅存量业务使用。
Serverless版实例公网访问版本说明
Serverless版实例使用公网访问接入云消息队列 RocketMQ 版时,需要保证使用的Java ONS 1.x SDK版本为1.9.0.Final及以上版本,并在消息收发代码中补充如下内容:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
说明
其中,InstanceId
需要替换为您实际使用的实例ID。
普通消息收发示例
发送普通消息(同步发送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置发送超时时间,单位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
// 循环发送消息。
for (int i = 0; i < 100; i++){
Message msg = new Message(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
// 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
"TopicTestMQ",
// Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
// Tag的具体格式和设置方法,请参见消息过滤。
"TagA",
// Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
// 需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_" + i);
try {
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// 在应用退出前,销毁Producer对象。
// 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
producer.shutdown();
}
}
发送普通消息(异步发送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
//设置发送超时时间,单位毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
Message msg = new Message(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
// 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
"TopicTestMQ",
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
// Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
// 异步发送消息, 发送结果通过callback返回给客户端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消息发送成功。
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// 阻塞当前线程3秒,等待异步发送结果。
TimeUnit.SECONDS.sleep(3);
// 在应用退出前,销毁Producer对象。
// 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
producer.shutdown();
}
}
发送普通消息(单向发送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置发送超时时间,单位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
// 循环发送消息。
for (int i = 0; i < 100; i++){
Message msg = new Message(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
// 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
"TopicTestMQ",
// Message Tag,
// 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
// Message Body
// 任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_" + i);
// 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
producer.sendOneway(msg);
}
// 在应用退出前,销毁Producer对象。
// 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
producer.shutdown();
}
}
发送普通消息(多线程发送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
// producer实例配置初始化。
Properties properties = new Properties();
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置发送超时时间,单位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
Producer producer = ONSFactory.createProducer(properties);
producer.start();
// 创建的Producer和Consumer对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
// thread和anotherThread共享Producer对象,并发地发送消息至消息队列RocketMQ版。
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
// 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
"TopicTestMQ",
// Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
// Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
// 需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
anotherThread.start();
// (可选)Producer实例若不再使用时,可将Producer关闭,进行资源释放。
// producer.shutdown();
}
}
订阅普通消息(PushConsumer)
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 集群订阅方式(默认)。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 广播订阅方式。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
// 订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // 订阅全部Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
订阅普通消息(PushConsumer批量消费)
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
consumerProperties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
consumerProperties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 设置批量消费最大消息数量,当指定Topic的消息数量已经攒够128条,SDK立即执行回调进行消费。默认值:32,取值范围:1~1024。
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// 设置批量消费最大等待时长,当等待时间达到10秒,SDK立即执行回调进行消费。默认值:0,取值范围:0~450,单位:秒。
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {
@Override
public Action consume(final List<Message> messages, ConsumeContext context) {
System.out.printf("Batch-size: %d\n", messages.size());
// 批量消息处理。
return Action.CommitMessage;
}
});
// 启动batchConsumer。
batchConsumer.start();
System.out.println("Consumer start success.");
// 等待固定时间防止进程退出。
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
订阅普通消息(PullConsumer)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class PullConsumerClient {
public static void main(String[] args){
Properties properties = new Properties();
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
PullConsumer consumer = ONSFactory.createPullConsumer(properties);
// 启动Consumer。
consumer.start();
// 获取topic-xxx下的所有分区。
Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
// 指定需要拉取消息的分区。
consumer.assign(topicPartitions);
while (true) {
// 拉取消息,超时时间为3000 ms。
List<Message> messages = consumer.poll(3000);
System.out.printf("Received message: %s %n", messages);
}
}
}
顺序消息收发示例
发送顺序消息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Date;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
"Order_global_topic",
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"TagA",
// Message Body,可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
"send order global msg".getBytes()
);
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey(orderId);
// 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
// 全局顺序消息,该字段可以设置为任意非空字符串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
// 发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// 在应用退出前,销毁Producer对象。
// 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
producer.shutdown();
}
}
订阅顺序消息
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import java.util.Properties;
public class ConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 顺序消息消费失败进行重试前的等待时间,单位(毫秒),取值范围:10毫秒~30000毫秒。
properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
// 消息消费失败时的最大重试次数。
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
// 在订阅消息前,必须调用start方法来启动Consumer,只需调用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
"Order_global_topic",
// 订阅指定Topic下的Tags:
// 1. * 表示订阅所有消息。
// 2. TagA || TagB || TagC表示订阅TagA或TagB或TagC的消息。
"*",
new MessageOrderListener() {
/**
* 1. 消息消费处理失败或者处理出现异常,返回OrderAction.Suspend。
* 2. 消息处理成功,返回OrderAction.Success。
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
System.out.println(message);
return OrderAction.Success;
}
});
consumer.start();
}
}
定时消息收发示例
发送定时消息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
Message msg = new Message(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
"Topic",
// Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
"tag",
// Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
try {
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如2016-03-07 16:21:00投递。如果被设置成当前时间戳之前的某个时刻,消息将立即被投递给消费者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 发送消息,只要不抛异常就是成功。
SendResult sendResult = producer.send(msg);
System.out.println("Message Id:" + sendResult.getMessageId());
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// 在应用退出前,销毁Producer对象。
// 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
producer.shutdown();
}
}
订阅定时消息
订阅定时消息的示例代码和订阅普通消息一样,请参见订阅普通消息。
延时消息收发示例
发送延时消息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
Message msg = new Message(
// 设置为您在消息队列RocketMQ版控制台上创建的Topic。
"Topic",
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版服务器过滤。
"tag",
// Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
try {
// 延时消息,在指定延迟时间(当前时间之后)进行投递。最大可设置延迟40天投递,单位毫秒(ms)。
// 以下示例表示消息在3秒后投递。
long delayTime = System.currentTimeMillis() + 3000;
// 设置消息需要被投递的时间。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// 在应用退出前,销毁Producer对象。
// 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
producer.shutdown();
}
}
订阅延时消息
订阅延时消息的示例代码和订阅普通消息一样,请参见订阅普通消息。
事务消息收发示例
发送事务消息
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在消息队列RocketMQ版控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverlesss实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您从消息队列RocketMQ版控制台获取的接入点,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 初始化事务消息Producer时,需要注册一个本地事务状态的Checker。
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("执行本地事务,并根据本地事务的状态提交TransactionStatus。");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
}catch (ONSClientException e){
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}
// 本地事务检查器。
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("收到事务消息的回查请求,MsgId: " + msg.getMsgID());
return TransactionStatus.CommitTransaction;
}
}
订阅事务消息
订阅事务消息的示例代码和订阅普通消息一样,请参见订阅普通消息。
文档内容是否对您有帮助?