普通消息是指阿里云RocketMQ中无特性的消息,区别于有特性的定时消息、顺序消息和事务消息。本文提供使用TCP协议下的开源Go SDK收发普通消息的示例代码供您参考。

前提条件

您已完成准备工作。更多信息,请参见准备工作

发送普通消息

发送普通消息的示例代码如下。

package main
import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
)
func main() {
  pConfig := &rocketmq.ProducerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云RocketMQ控制台上申请的GID。
      GroupID: "GID_XXXXXXXXXXXX",
      //设置TCP协议接入点,从阿里云RocketMQ控制台的实例详情页面获取。
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的AccessKey ID,用于身份认证。
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的AccessKey Secret,用于身份认证。
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN。
        Channel: "ALIYUN",
      },
    },
    //主动设置该实例用于发送普通消息。
    ProducerModel: rocketmq.CommonProducer,
  }
  sendMessage(pConfig)
}
func sendMessage(config *rocketmq.ProducerConfig) {
  producer, err := rocketmq.NewProducer(config)
  if err != nil {
    fmt.Println("create common producer failed, error:", err)
    return
  }
  //请确保参数设置完成之后启动Producer。
  err = producer.Start()
  if err != nil {
    fmt.Println("start common producer error", err)
    return
  }
  defer producer.Shutdown()
  fmt.Printf("Common producer: %s started... \n", producer)
  for i := 0; i < 10; i++ {
    msgBody := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
    var message = &rocketmq.Message{Topic: "YourTopicXXXXXXXX", Body: msgBody}
    //Message with tag
    //var message = &rocketmq.Message{Topic: "YourTopicXXXXXXXX", Tags: "ExampleTag", Body: msgBody}
    //Message with unique business key
    //var message = &rocketmq.Message{Topic: "YourTopicXXXXXXXX", Keys: "YourUniqueBusinessKey", Body: msgBody}
    //发送消息时请设置您在阿里云RocketMQ控制台上申请的Topic。
    result, err := producer.SendMessageSync(message)
    if err != nil {
      fmt.Println("Error:", err)
    }
    fmt.Printf("send message: %s result: %s\n", msgBody, result)
  }
  fmt.Println("shutdown common producer.")
}

消费普通消息

消费普通消息的示例代码如下。

package main
import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
  "sync/atomic"
)
func main() {
  pConfig := &rocketmq.PushConsumerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云RocketMQ控制台上申请的GID。
      GroupID: "GID_XXXXXXXXXXXX",
      //设置TCP协议接入点,从阿里云RocketMQ控制台的实例详情页面获取。
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的AccessKey ID,用于身份认证。
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的AccessKey Secret,用于身份认证。
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN。
        Channel: "ALIYUN",
      },
    },
    //设置使用集群模式。
    Model: rocketmq.Clustering,
    //设置该消费者为普通消息消费。
    ConsumerModel: rocketmq.CoCurrently,
  }
  ConsumeWithPush(pConfig)
}
func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
  consumer, err := rocketmq.NewPushConsumer(config)
  if err != nil {
    println("create Consumer failed, error:", err)
    return
  }
  ch := make(chan interface{})
  var count = (int64)(1000000)
  // ********************************************
  // 1. 确保订阅关系的设置在启动之前完成。
  // 2. 确保相同GID下面的消费者的订阅关系一致。
  // *********************************************
  consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
    fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body)
    if atomic.AddInt64(&count, -1) <= 0 {
      ch <- "quit"
    }
    //消费成功回复ConsumeSuccess,消费失败回复ReConsumeLater。此时会触发消费重试。
    return rocketmq.ConsumeSuccess
  })
  err = consumer.Start()
  if err != nil {
    println("consumer start failed,", err)
    return
  }
  fmt.Printf("consumer: %s started...\n", consumer)
  <-ch
  //请保持消费者一直处于运行状态。
  err = consumer.Shutdown()
  if err != nil {
    println("consumer shutdown failed")
    return
  }
  println("consumer has shutdown.")
}