全部产品
云市场

发送消息(多线程)

更新时间:2019-09-13 20:45:14

消息队列 MQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。

您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS。 请避免为每个线程创建一个客户端实例。

在多线程之间共享 Producer 的示例代码如下:

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import java.util.Properties;
  7. public class SharedProducer {
  8. public static void main(String[] args) {
  9. // producer 实例配置初始化
  10. Properties properties = new Properties();
  11. //您在控制台创建的 Group ID
  12. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
  13. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  14. properties.put(PropertyKeyConst.AccessKey,"XXX");
  15. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  16. properties.put(PropertyKeyConst.SecretKey, "XXX");
  17. //设置发送超时时间,单位毫秒
  18. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  19. // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
  20. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  21. "XXX");
  22. final Producer producer = ONSFactory.createProducer(properties);
  23. // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
  24. producer.start();
  25. //创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
  26. //在 thread 和 anotherThread 中共享 Producer 对象,并发地发送消息至消息队列 MQ。
  27. Thread thread = new Thread(new Runnable() {
  28. @Override
  29. public void run() {
  30. try {
  31. Message msg = new Message( //
  32. // Message 所属的 Topic
  33. "TopicTestMQ",
  34. // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 MQ 的服务器过滤
  35. "TagA",
  36. // Message Body 可以是任何二进制形式的数据, 消息队列 MQ 不做任何干预,
  37. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
  38. "Hello MQ".getBytes());
  39. SendResult sendResult = producer.send(msg);
  40. // 同步发送消息,只要不抛异常就是成功
  41. if (sendResult != null) {
  42. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  43. }
  44. } catch (Exception e) {
  45. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  46. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  47. e.printStackTrace();
  48. }
  49. }
  50. });
  51. thread.start();
  52. Thread anotherThread = new Thread(new Runnable() {
  53. @Override
  54. public void run() {
  55. try {
  56. Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
  57. SendResult sendResult = producer.send(msg);
  58. // 同步发送消息,只要不抛异常就是成功
  59. if (sendResult != null) {
  60. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  61. }
  62. } catch (Exception e) {
  63. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  64. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  65. e.printStackTrace();
  66. }
  67. }
  68. });
  69. anotherThread.start();
  70. // Producer 实例若不再使用时,可将 Producer 关闭,进行资源释放
  71. // producer.shutdown();
  72. }
  73. }