阿里云 RocketMQ 提供三种方式来发送普通消息:同步发送、异步发送和单向(Oneway)发送。本文介绍了每种发送方式的原理、使用场景、示例代码,以及三种发送方式的对比;此外还提供了订阅普通消息的示例代码。

前提条件

您已完成以下操作:

同步发送

  • 原理
    同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。同步发送
  • 应用场景

    此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

  • 示例代码
    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQProducer {
        //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您的 AccessKeyId 和 AccessKeySecret
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * 创建 Producer,并开启消息轨迹
             * 如果不想开启消息轨迹,可以按照如下方式创建:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项.
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * 设置为您从阿里云控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在应用退出前,销毁 Producer 对象
            // 注意:如果不销毁也没有问题
            producer.shutdown();
        }
    }

异步发送

  • 原理

    异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。阿里云 RocketMQ 的异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

    异步发送
  • 应用场景

    异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

  • 示例代码
    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQAsyncProducer {
        //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您的 AccessKeyId 和 AccessKeySecret
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * 创建 Producer,并开启消息轨迹
             * 如果不想开启消息轨迹,可以按照如下方式创建:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项.
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * 设置为您从阿里云控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override public void onSuccess(SendResult result) {
                            // 消费发送成功
                            System.out.println("send message success. msgId= " + result.getMsgId());
                        }
    
                        @Override public void onException(Throwable throwable) {
                            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                            System.out.println("send message failed.");
                            throwable.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在应用退出前,销毁 Producer 对象
            // 注意:如果不销毁也没有问题
            producer.shutdown();
        }
    }

单向(Oneway)发送

  • 原理

    发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

  • 应用场景

    适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

  • 示例代码
    import java.util.Date;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    public class RocketMQOnewayProducer {
        //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您的 AccessKeyId 和 AccessKeySecret
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * 创建 Producer,并开启消息轨迹
             * 如果不想开启消息轨迹,可以按照如下方式创建:
             * DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
             */
            DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
            /**
             * 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项.
             */
            producer.setAccessChannel(AccessChannel.CLOUD);
            /**
             * 设置为您从阿里云控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”
             */
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(msg);
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在应用退出前,销毁 Producer 对象
            // 注意:如果不销毁也没有问题
            producer.shutdown();
        }
    }

三种发送方式的对比

下表概括了三者的特点和主要区别。

发送方式 发送 TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失

订阅普通消息

订阅普通消息的方式只有以下一种。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您的 AccessKeyId 和 AccessKeySecret
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }
    public static void main(String[] args) throws MQClientException {
        //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您阿里云账号的 AccessKeyId 和 AccessKeySecret
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
        //设置为阿里云 RocketMQ 实例的接入点
        consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        //阿里云上消息轨迹需要设置为 CLOUD 方式
        consumer.setAccessChannel(AccessChannel.CLOUD);
        // 设置为您在阿里云 RocketMQ 控制台上创建的 Topic
        consumer.subscribe("YOUR TOPIC", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}