文档

发送消息(多线程)

更新时间:

SOFAStack 消息队列的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。

您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS 的能力。请避免为每个线程创建一个客户端实例。

在多线程之间共享 Producer 的示例代码如下:

import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;

public class Main {
    public static void main(String... args) {
        Properties credentials = new Properties();
        // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
        // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
        credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
        credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
        // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
                .withCredentials(credentials).build();
        Properties properties = new Properties();
        // 设置用户实例,进入控制台的概览页面查看接入点配置
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");

        // 您在控制台创建的 Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
        finalProducer producer = accessPoint.createProducer(properties);
        producer.start();

        //创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。

        //在 thread 和 anotherThread 中共享 Producer 对象,并发地发送消息至消息队列。
        Thread thread = new Thread(newRunnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(//
                            // Message 所属的 Topic
                            "TopicTestMQ",
                            // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列的服务器过滤
                            "TagA",
                            // Message Body 可以是任何二进制形式的数据,消息队列不做任何干预,
                            // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                            "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();
        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();
        // Producer 实例若不再使用时,可将 Producer 关闭,进行资源释放
        // producer.shutdown();
    }
}
  • 本页导读 (0)
文档反馈