全部产品
云市场

订阅消息

更新时间:2020-01-13 14:01:04

本文介绍如何通过 SOFAStack 消息队列的 Java SDK 订阅消息。

订阅方式

消息队列支持以下两种订阅方式:

  • 集群订阅
    同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。
    1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • 广播订阅
    同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。
    1. // 广播订阅方式设置
    2. properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

说明

  • 请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详情请参见 订阅关系一致
  • 两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,详情请参见 集群消费和广播消费

示例代码

具体的示例代码,请以 消息队列代码库 为准。

  1. import java.util.Properties;
  2. import com.alipay.sofa.sofamq.client.PropertyKeyConst;
  3. import io.openmessaging.api.Action;
  4. import io.openmessaging.api.ConsumeContext;
  5. import io.openmessaging.api.Consumer;
  6. import io.openmessaging.api.Message;
  7. import io.openmessaging.api.MessageListener;
  8. import io.openmessaging.api.MessagingAccessPoint;
  9. import io.openmessaging.api.OMS;
  10. import io.openmessaging.api.OMSBuiltinKeys;
  11. public class MConsumer {
  12. public static void main(String... args) {
  13. Properties credentials = new Properties();
  14. // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
  15. credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
  16. // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
  17. credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
  18. // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
  19. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
  20. .withCredentials(credentials).build();
  21. Properties properties = new Properties();
  22. // 设置用户实例,进入控制台的概览页面查看接入点配置
  23. properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
  24. properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
  25. // 集群订阅方式 (默认)
  26. // properties.put(PropertyKeyConst.MESSAGE_MODEL, PropertyValueConst.CLUSTERING);
  27. // 广播订阅方式
  28. // properties.put(PropertyKeyConst.MESSAGE_MODEL, PropertyValueConst.BROADCASTING);
  29. Consumer consumer = accessPoint.createConsumer(properties);
  30. consumer.subscribe("YOUR_TOPIC", "TAGA||TAGB", new MessageListener() {
  31. @Override
  32. public Action consume(Message message, ConsumeContext context) {
  33. System.out.println(new String(message.getBody()));
  34. return Action.CommitMessage;
  35. }
  36. });
  37. consumer.start();
  38. }
  39. }