全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

发送消息(多线程)

更新时间:2017-12-15 20:41:34

MQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。

您可以在服务器上(或者多台服务器)部署多个生产端和消费端实例,也可以在生产端和消费端采用多线程发送或接收消息,从而提高生产端和消费端的消息发送或接收 TPS。请避免为每个线程创建一个客户端实例。

说明:关于 TCP 接入点域名,请参见TCP 接入说明

在多线程之间共享 Producer 的示例代码如下:


import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // producer 实例配置初始化
        Properties properties = new Properties();
        //您在控制台创建的 Producer ID
        properties.put(PropertyKeyConst.ProducerId, "XXX");
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        //设置发送超时时间,单位毫秒 
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 设置 TCP 接入域名(此处以公共云生产环境为例)
        properties.put(PropertyKeyConst.ONSAddr,
          "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
        final Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();

        //创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。

        final Message msg = new Message( //
            // Message 所属的 Topic
            "TopicTestMQ",
            // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
            "TagA",
            // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
            // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
            "Hello MQ".getBytes());

        //在 thread 和 anotherThread 中共享 producer 对象,并发地发送消息至 MQ。
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    SendResult sendResult = producer.send(message);
                    // 同步发送消息,只要不抛异常就是成功
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    SendResult sendResult = producer.send(message);
                    // 同步发送消息,只要不抛异常就是成功
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // producer 实例若不再使用时,可将 producer 关闭,进行资源释放
        // producer.shutdown();
    }
}
本文导读目录