全部产品

收发顺序消息

更新时间:2020-01-14 15:22:53

顺序消息(FIFO 消息)是 SOFAStack 消息队列提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用 TCP 协议下的 Java SDK 收发顺序消息的示例代码供您参考。

前提条件

您已完成以下操作:

  • 通过 Maven 方式引入依赖。Java SDK 的最新版本号,可参见 SDK 版本说明
    1. <dependencies>
    2. <dependency>
    3. <groupId>com.alipay.sofa</groupId>
    4. <artifactId>sofamq-client-all</artifactId>
    5. <version>"XXX"</version>
    6. //设置为 Java SDK 的最新版本号
    7. </dependency>
    8. </dependencies>
    9. <repositories>
    10. <repository>
    11. <id>antcloudrelease</id>
    12. <name>Ant Cloud</name>
    13. <url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url>
    14. </repository>
    15. </repositories>
  • 准备环境
  • (可选)日志配置

背景信息

  • 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

详情请参见 消息类型 > 顺序消息

说明:对于新手用户,建议在正式收发消息前,阅读 Demo 工程(TCP 版) 来了解搭建消息队列工程的具体步骤。

发送顺序消息

具体的示例代码,请以 消息队列代码库 为准。

示例代码如下。

  1. import java.util.Properties;
  2. import com.alipay.sofa.sofamq.client.PropertyKeyConst;
  3. import io.openmessaging.api.Message;
  4. import io.openmessaging.api.MessagingAccessPoint;
  5. import io.openmessaging.api.OMS;
  6. import io.openmessaging.api.OMSBuiltinKeys;
  7. import io.openmessaging.api.SendResult;
  8. import io.openmessaging.api.order.OrderProducer;
  9. public class Main {
  10. public static void main(String... args) {
  11. Properties credentials = new Properties();
  12. // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
  13. credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
  14. // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
  15. credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
  16. // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
  17. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
  18. .withCredentials(credentials).build();
  19. Properties properties = new Properties();
  20. // 设置用户实例,进入控制台的概览页面查看接入点配置
  21. properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
  22. properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
  23. OrderProducer producer = accessPoint.createOrderProducer(properties);
  24. producer.start();
  25. Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
  26. // 分区顺序消息中区分不同分区的关键字段,Sharding Key 与普通消息的 key 是完全不同的概念。
  27. SendResult sendResult = producer.send(message, "YOUR_SHARDING_KEY");
  28. System.out.println(sendResult);
  29. }
  30. }

订阅顺序消息

全局顺序消息和分区顺序消息的订阅方式基本一样,示例代码如下。

  1. import java.util.Properties;
  2. import com.alipay.sofa.sofamq.client.PropertyKeyConst;
  3. import io.openmessaging.api.Message;
  4. import io.openmessaging.api.MessagingAccessPoint;
  5. import io.openmessaging.api.OMS;
  6. import io.openmessaging.api.OMSBuiltinKeys;
  7. import io.openmessaging.api.order.ConsumeOrderContext;
  8. import io.openmessaging.api.order.MessageOrderListener;
  9. import io.openmessaging.api.order.OrderAction;
  10. import io.openmessaging.api.order.OrderConsumer;
  11. public class Main {
  12. public static void main(String... args) {
  13. Properties credentials = new Properties();
  14. // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
  15. credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
  16. // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
  17. credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
  18. // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
  19. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
  20. .withCredentials(credentials).build();
  21. Properties properties = new Properties();
  22. // 设置用户实例,进入控制台的概览页面查看接入点配置
  23. properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
  24. properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
  25. OrderConsumer consumer = accessPoint.createOrderedConsumer(properties);
  26. consumer.subscribe("YOUR_TOPIC", "YOUR_TAG", new MessageOrderListener() {
  27. @Override
  28. public OrderAction consume(Message message, ConsumeOrderContext context) {
  29. System.out.println(new String(message.getBody()));
  30. return OrderAction.Success;
  31. }
  32. });
  33. consumer.start();
  34. }
  35. }