消息队列 RocketMQ 版的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。

您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS。请避免为每个线程创建一个客户端实例。

在多线程之间共享 Producer 的示例代码如下。

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // producer 实例配置初始化。
        Properties properties = new Properties();
        //您在控制台创建的 Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        //设置发送超时时间,单位毫秒。 
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        final Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可。
        producer.start();

        //创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。

        //在 thread 和 anotherThread 中共享 Producer 对象,并发地发送消息至消息队列 RocketMQ 版。
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message( //
                    // 普通消息所属的 Topic,切勿使用普通消息的 Topic 来收发其他类型的消息。
                    "TopicTestMQ",
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 版的服务器过滤。
                    "TagA",
                    // Message Body 可以是任何二进制形式的数据,消息队列 RocketMQ 版不做任何干预。
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        //(可选)Producer 实例若不再使用时,可将 Producer 关闭,进行资源释放。
        // producer.shutdown();
    }
}