全部产品
云市场

发送消息(多线程)

更新时间:2020-07-06 19:13:20

SOFAStack 消息队列的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。

您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS。请避免为每个线程创建一个客户端实例。

在多线程之间共享 Producer 的示例代码如下:

  1. import java.util.Properties;
  2. import com.alipay.sofa.sofamq.client.PropertyKeyConst;
  3. import io.openmessaging.api.Message;
  4. import io.openmessaging.api.MessagingAccessPoint;
  5. import io.openmessaging.api.OMS;
  6. import io.openmessaging.api.OMSBuiltinKeys;
  7. import io.openmessaging.api.Producer;
  8. import io.openmessaging.api.SendResult;
  9. public class Main {
  10. public static void main(String... args) {
  11. Properties credentials = new Properties();
  12. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  13. credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
  14. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  15. credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
  16. // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
  17. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
  18. .withCredentials(credentials).build();
  19. Properties properties = new Properties();
  20. // 设置用户实例,进入控制台的概览页面查看接入点配置
  21. properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
  22. // 您在控制台创建的 Group ID
  23. properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
  24. final Producer producer = accessPoint.createProducer(properties);
  25. producer.start();
  26. //创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
  27. //在 thread 和 anotherThread 中共享 Producer 对象,并发地发送消息至消息队列。
  28. Thread thread = new Thread(new Runnable() {
  29. @Override
  30. public void run() {
  31. try {
  32. Message msg = new Message( //
  33. // Message 所属的 Topic
  34. "TopicTestMQ",
  35. // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列的服务器过滤
  36. "TagA",
  37. // Message Body 可以是任何二进制形式的数据,消息队列不做任何干预,
  38. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
  39. "Hello MQ".getBytes());
  40. SendResult sendResult = producer.send(msg);
  41. // 同步发送消息,只要不抛异常就是成功
  42. if (sendResult != null) {
  43. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  44. }
  45. } catch (Exception e) {
  46. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  47. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  48. e.printStackTrace();
  49. }
  50. }
  51. });
  52. thread.start();
  53. Thread anotherThread = new Thread(new Runnable() {
  54. @Override
  55. public void run() {
  56. try {
  57. Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
  58. SendResult sendResult = producer.send(msg);
  59. // 同步发送消息,只要不抛异常就是成功
  60. if (sendResult != null) {
  61. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  62. }
  63. } catch (Exception e) {
  64. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  65. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  66. e.printStackTrace();
  67. }
  68. }
  69. });
  70. anotherThread.start();
  71. // Producer 实例若不再使用时,可将 Producer 关闭,进行资源释放
  72. // producer.shutdown();
  73. }
  74. }