全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

发送消息(多线程)

更新时间:2018-06-19 15:39:39

MQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。

您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS。 请避免为每个线程创建一个客户端实例。

说明:关于 TCP 接入点域名,请参见TCP 接入说明

在多线程之间共享 Producer 的示例代码如下:

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import java.util.Properties;
  7. public class SharedProducer {
  8. public static void main(String[] args) {
  9. // producer 实例配置初始化
  10. Properties properties = new Properties();
  11. //您在控制台创建的 Producer ID
  12. properties.put(PropertyKeyConst.ProducerId, "XXX");
  13. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  14. properties.put(PropertyKeyConst.AccessKey,"XXX");
  15. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  16. properties.put(PropertyKeyConst.SecretKey, "XXX");
  17. //设置发送超时时间,单位毫秒
  18. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  19. // 设置 TCP 接入域名(此处以公共云生产环境为例)
  20. properties.put(PropertyKeyConst.ONSAddr,
  21. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  22. final Producer producer = ONSFactory.createProducer(properties);
  23. // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
  24. producer.start();
  25. //创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
  26. final Message msg = new Message( //
  27. // Message 所属的 Topic
  28. "TopicTestMQ",
  29. // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
  30. "TagA",
  31. // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
  32. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
  33. "Hello MQ".getBytes());
  34. //在 thread 和 anotherThread 中共享 producer 对象,并发地发送消息至 MQ。
  35. Thread thread = new Thread(new Runnable() {
  36. @Override
  37. public void run() {
  38. try {
  39. SendResult sendResult = producer.send(message);
  40. // 同步发送消息,只要不抛异常就是成功
  41. if (sendResult != null) {
  42. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  43. }
  44. } catch (Exception e) {
  45. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  46. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  47. e.printStackTrace();
  48. }
  49. }
  50. });
  51. thread.start();
  52. Thread anotherThread = new Thread(new Runnable() {
  53. @Override
  54. public void run() {
  55. try {
  56. SendResult sendResult = producer.send(message);
  57. // 同步发送消息,只要不抛异常就是成功
  58. if (sendResult != null) {
  59. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  60. }
  61. } catch (Exception e) {
  62. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  63. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  64. e.printStackTrace();
  65. }
  66. }
  67. });
  68. anotherThread.start();
  69. // producer 实例若不再使用时,可将 producer 关闭,进行资源释放
  70. // producer.shutdown();
  71. }
  72. }
本文导读目录