全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

多线程发送消息

更新时间:2017-08-24 16:49:33

MQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。

您可以在服务器上(或者多台服务器)部署多个生产端和消费端实例,也可以在生产端和消费端采用多线程发送或接收消息,从而提高生产端和消费端的消息发送或接收 TPS。请避免为每个线程创建一个客户端实例。

TCP 接入点域名,请前往查看。

以下是在多线程之间共享 Producer 的示例程序:

  1. public class SharedProducer {
  2. public static void main(String[] args) {
  3. // producer 实例配置初始化
  4. Properties properties = new Properties();
  5. //您在控制台创建的Producer ID
  6. properties.put(PropertyKeyConst.ProducerId, "XXX");
  7. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  8. properties.put(PropertyKeyConst.AccessKey,"XXX");
  9. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  10. properties.put(PropertyKeyConst.SecretKey, "XXX");
  11. //设置发送超时时间,单位毫秒
  12. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  13. // 设置 TCP 接入域名(此处以公共云生产环境为例)
  14. properties.put(PropertyKeyConst.ONSAddr,
  15. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  16. final Producer producer = ONSFactory.createProducer(properties);
  17. // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
  18. producer.start();
  19. //创建的Producer和Consumer对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
  20. final Message msg = new Message( //
  21. // Message所属的Topic
  22. "TopicTestMQ",
  23. // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
  24. "TagA",
  25. // Message Body 可以是任何二进制形式的数据, MQ不做任何干预,
  26. // 需要Producer与Consumer协商好一致的序列化和反序列化方式
  27. "Hello MQ".getBytes());
  28. //在thread和anotherThread中共享producer对象,并发地发送消息至MQ。
  29. Thread thread = new Thread(new Runnable() {
  30. @Override
  31. public void run() {
  32. SendResult sendResult = producer.send(msg);
  33. System.out.println(sendResult);
  34. }
  35. });
  36. thread.start();
  37. Thread anotherThread = new Thread(new Runnable() {
  38. @Override
  39. public void run() {
  40. SendResult sendResult = producer.send(msg);
  41. System.out.println(sendResult);
  42. }
  43. });
  44. anotherThread.start();
  45. // producer 实例若不再使用时,可将 producer 关闭,进行资源释放
  46. // producer.shutdown();
  47. }
  48. }
本文导读目录