顺序消息(FIFO消息)是云消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的社区版Java SDK收发顺序消息的示例代码供您参考。
背景信息
顺序消息分为两类:
全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。
分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
更多信息,请参见顺序消息。
前提条件
您已完成以下操作:
下载4.5.2或以上版本的社区版Java SDK。
准备工作。
获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey。
发送顺序消息
重要
云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致。
发送顺序消息的示例代码如下。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOrderProducer {
private static RPCHook getAclRPCHook()
{
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
* 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* 创建Producer,并开启消息轨迹。
* 如果不想开启消息轨迹,可以按照以下方式创建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook(), true, null);
/**
* 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
* 设置为您从阿里云控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
producer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("YOUR ORDER TOPIC",
"YOUR MESSAGE TAG",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
/**
* 注意!!!请务必设置该配置项,顺序消息才能均匀分布到各队列中。
* 如果SDK版本为5.x以上,可以按照以下方式设置:
* msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
*/
msg.putUserProperty("__SHARDINGKEY", orderId + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 选择适合自己的分区选择算法,保证同一个参数得到的结果相同。
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
订阅顺序消息
订阅顺序消息的示例代码如下。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQOrderConsumer {
private static RPCHook getAclRPCHook()
{
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
* 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* 创建Consumer,并开启消息轨迹。
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/**
* 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
*/
consumer.setAccessChannel(AccessChannel.CLOUD);
/**
* 设置为您从阿里云控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
consumer.subscribe("YOUR ORDER TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;// 消费失败则挂起重试返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
文档内容是否对您有帮助?