普通消息是指阿里云 RocketMQ 中无特性的消息,区别于有特性的定时消息、顺序消息和事务消息。本文提供使用 TCP 协议下的开源 Go SDK 收发普通消息的示例代码供您参考。

前提条件

您已完成准备工作。详情请参见准备工作

发送普通消息

发送普通消息的示例代码如下。

package main
import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
)
func main() {
  pConfig := &rocketmq.ProducerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云 RocketMQ 控制台上申请的 GID。
      GroupID:    "GID_XXXXXXXXXXXX",
      //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN。
        Channel:   "ALIYUN",
      },
    },
    //主动设置该实例用于发送普通消息。
    ProducerModel: rocketmq.CommonProducer,
  }
  sendMessage(pConfig)
}
func sendMessage(config *rocketmq.ProducerConfig) {
  producer, err := rocketmq.NewProducer(config)
  if err != nil {
    fmt.Println("create common producer failed, error:", err)
    return
  }
  //请确保参数设置完成之后启动 Producer。
  err = producer.Start()
  if err != nil {
    fmt.Println("start common producer error", err)
    return
  }
  defer producer.Shutdown()
  fmt.Printf("Common producer: %s started... \n", producer)
  for i := 0; i < 10; i++ {
    msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
    //发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic。
    result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "YourTopicXXXXXXXX", Body: msg})
    if err != nil {
      fmt.Println("Error:", err)
    }
    fmt.Printf("send message: %s result: %s\n", msg, result)
  }
  fmt.Println("shutdown common producer.")
}

消费普通消息

消费普通消息的示例代码如下。

package main
import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
  "sync/atomic"
)
func main() {
  pConfig := &rocketmq.PushConsumerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云 RocketMQ 控制台上申请的 GID。
      GroupID:    "GID_XXXXXXXXXXXX",
      //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN。
        Channel:   "ALIYUN",
      },
    },
    //设置使用集群模式。
    Model:         rocketmq.Clustering,
    //设置该消费者为普通消息消费。
    ConsumerModel: rocketmq.CoCurrently,
  }
  ConsumeWithPush(pConfig)
}
func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
  consumer, err := rocketmq.NewPushConsumer(config)
  if err != nil {
    println("create Consumer failed, error:", err)
    return
  }
  ch := make(chan interface{})
  var count = (int64)(1000000)
  // ********************************************
  // 1. 确保订阅关系的设置在启动之前完成。
  // 2. 确保相同 GID 下面的消费者的订阅关系一致。
  // *********************************************
  consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
    fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body)
    if atomic.AddInt64(&count, -1) <= 0 {
      ch <- "quit"
    }
    //消费成功回复 ConsumeSuccess,消费失败回复 ReConsumeLater。此时会触发消费重试。
    return rocketmq.ConsumeSuccess
  })
  err = consumer.Start()
  if err != nil {
    println("consumer start failed,", err)
    return
  }
  fmt.Printf("consumer: %s started...\n", consumer)
  <-ch
  //请保持消费者一直处于运行状态。
  err = consumer.Shutdown()
  if err != nil {
    println("consumer shutdown failed")
    return
  }
  println("consumer has shutdown.")
}