本文提供使用 TCP 协议下的开源 Go SDK 来收发定时和延时消息的示例代码供您参考。

概念介绍

  • 定时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。
  • 延时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同。消息在发送到阿里云 RocketMQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。详情请参见定时和延时消息

注意 开源版本的 Apache RocketMQ 支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。阿里云 RocketMQ 的延时消息是通过设置定时时间来实现的。如需使用云上定时消息,请参照以下步骤。

前提条件

您已完成准备工作。详情请参见准备工作

发送定时消息

发送定时消息的示例代码如下。
package main

import (
    "fmt"
    "github.com/apache/rocketmq-client-go/core"
    "time"
)

func main() {
    pConfig := &rocketmq.ProducerConfig{
        ClientConfig: rocketmq.ClientConfig{
            //您在阿里云 RocketMQ 的控制台上获取的 GID
            GroupID: "GID_XXXXX",
            //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
            NameServer: "http://XXXXXXXX:80",
            Credentials: &rocketmq.SessionCredentials{
                //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证
                AccessKey: "AK",
                //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证
                SecretKey: "SK",
                //用户渠道,默认值为:ALIYUN
                Channel: "ALIYUN",
            },
        },
        //主动设置该实例用于发送普通消息
        ProducerModel: rocketmq.CommonProducer,
    }
    sendMessage(pConfig)
}
func sendMessage(config *rocketmq.ProducerConfig) {
    producer, err := rocketmq.NewProducer(config)

    if err != nil {
        fmt.Println("create common producer failed, error:", err)
        return
    }
    //请确保参数设置完成之后启动 Producer
    err = producer.Start()
    if err != nil {
        fmt.Println("start common producer error", err)
        return
    }
    defer producer.Shutdown()

    fmt.Printf("Common producer: %s started... \n", producer)
    for i := 0; i < 10; i++ {
        key := time.Now().String()
        msg := fmt.Sprintf("%s---[[%s]]---%d", "Hello,Delay MQ Message-", key, i)

        // 设置需要发送或者延迟的时间,此处以延迟 20s 为例。
        // 单位毫秒(ms),在指定时间戳(当前时间之后)进行投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者
        startDeliverTime := time.Now().Unix()*1000 + 20*1000
        property := map[string]string{"__STARTDELIVERTIME": strconv.FormatInt(startDeliverTime, 10)}
        // 发送消息时请设置您在阿里云 RocketMQ 控制台上获取的Topic
        result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "Your delay time topic", Body: msg, Property: property})
        if err != nil {
            fmt.Println("Error:", err)
        }
        fmt.Printf("send delay message: %s result: %s\n", msg, result)
    }
    fmt.Println("shutdown common producer.")
}

消费定时消息

消费定时消息的示例代码如下。

package main
import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
  "sync/atomic"
)
func main() {
  pConfig := &rocketmq.PushConsumerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云 RocketMQ 控制台上申请的 GID
      GroupID:    "GID_XXXXXXXXXXXX",
      //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN
        Channel:   "ALIYUN",
      },
    },
    //设置使用集群模式
    Model:         rocketmq.Clustering,
    //设置该消费者为普通消息消费
    ConsumerModel: rocketmq.CoCurrently,
  }
  ConsumeWithPush(pConfig)
}
func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
  consumer, err := rocketmq.NewPushConsumer(config)
  if err != nil {
    println("create Consumer failed, error:", err)
    return
  }
  ch := make(chan interface{})
  var count = (int64)(1000000)
  // ********************************************
  // 1. 确保订阅关系的设置在启动之前完成
  // 2. 确保相同 GID 下面的消费者的订阅关系一致
  // *********************************************
  consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
    fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body)
    if atomic.AddInt64(&count, -1) <= 0 {
      ch <- "quit"
    }
    //消费成功回复 ConsumeSuccess,消费失败回复 ReConsumeLater。此时会触发消费重试
    return rocketmq.ConsumeSuccess
  })
  err = consumer.Start()
  if err != nil {
    println("consumer start failed,", err)
    return
  }
  fmt.Printf("consumer: %s started...\n", consumer)
  <-ch
  //请保持消费者一直处于运行状态
  err = consumer.Shutdown()
  if err != nil {
    println("consumer shutdown failed")
    return
  }
  println("consumer has shutdown.")
}