顺序消息(FIFO 消息)是阿里云 RocketMQ 提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用 TCP 协议下的开源 Go SDK 收发顺序消息的示例代码供您参考。

顺序消息分为两类:

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(First In First Out,简称 FIFO)的顺序进行发布和消费。
  • 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

详情请参见顺序消息

前提条件

您已完成准备工作。详情请参见准备工作

发送顺序消息

发送顺序消息的示例代码如下。

package main

import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
  "time"
)

func main() {
  pConfig := &rocketmq.ProducerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云 RocketMQ 控制台上申请的 GID。
      GroupID:    "GID_XXXXXXXXXXXX",
      //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN。
        Channel:   "ALIYUN",
      },
    },
    //主动设置该实例用于发送顺序消息。
    ProducerModel: rocketmq.OrderlyProducer,
  }
  sendMessageOrderlyByShardingKey(pConfig)
}
func sendMessageOrderlyByShardingKey(config *rocketmq.ProducerConfig) {
  producer, err := rocketmq.NewProducer(config)
  if err != nil {
    fmt.Println("create Producer failed, error:", err)
    return
  }
  //请确保参数设置完成之后启动 Producer。
  producer.Start()
  defer producer.Shutdown()
  for i := 0; i < 1000; i++ {
    msg := fmt.Sprintf("%s-%d", "Hello Lite Orderly Message", i)

    //发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic。
    r, err := producer.SendMessageOrderlyByShardingKey(
      &rocketmq.Message{Topic: "YourOrderLyTopicXXXXXXXX", Body: msg}, "ShardingKey" /*orderID*/)
    if err != nil {
      println("Send Orderly Message Error:", err)
    }
    fmt.Printf("send orderly message result:%+v\n", r)
    time.Sleep(time.Duration(1) * time.Second)
  }

}           

消费顺序消息

消费顺序消息的示例代码如下。

package main

import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
  "math/rand"
  "sync/atomic"
)

func main() {
  pConfig := &rocketmq.PushConsumerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云 RocketMQ 控制台上申请的 GID。
      GroupID:    "GID_XXXXXXXXXXXX",
      //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN。
        Channel:   "ALIYUN",
      },
    },
    //设置使用集群模式。
    Model:         rocketmq.Clustering,
    //设置该消费者为顺序消息消费。
    ConsumerModel: rocketmq.Orderly,
  }
  ConsumeWithOrderly(pConfig)
}
func ConsumeWithOrderly(config *rocketmq.PushConsumerConfig) {

  consumer, err := rocketmq.NewPushConsumer(config)
  if err != nil {
    println("create Consumer failed, error:", err)
    return
  }

  ch := make(chan interface{})
  var count = (int64)(1000000)
  // ********************************************
  // 1. 确保订阅关系的设置在启动之前完成。
  // 2. 确保相同 GID 下面的消费者的订阅关系一致。
  // 3. 确保此处设置的 Topic 一定是控制台上申请的顺序类型的。
  // *********************************************
  consumer.Subscribe("YourOrderlyTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
    fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body)
    if atomic.AddInt64(&count, -1) <= 0 {
      ch <- "quit"
    }
    //消费成功回复 ConsumeSuccess,消费失败回复 ReConsumeLater。此时会触发消费重试。
    if 0 == rand.Int()%7 {
      fmt.Printf("Consumer Later, MessageID:%s \n", msg.MessageID)
      return rocketmq.ReConsumeLater
    }
    return rocketmq.ConsumeSuccess
  })

  // 确保订阅关系的设置在启动之前完成。
  err = consumer.Start()
  if err != nil {
    println("consumer start failed,", err)
    return
  }

  fmt.Printf("consumer: %s started...\n", consumer)
  <-ch
  //请保持消费者一直处于运行状态。
  err = consumer.Shutdown()
  if err != nil {
    println("consumer shutdown failed")
    return
  }
  println("consumer has shutdown.")
}