全部产品
云市场

使用开源 Go SDK 访问阿里云 MQ

更新时间:2019-09-13 22:00:01

开源的 Go SDK 使用 cgo 封装 CPP SDK 连接阿里云 MQ,因此只需要更新 CPP 动态库版本和重新修改部分配置信息即可,使用方式和默认配置保持兼容。

前提条件

  • 请确保平台版本满足以下条件:

    • Linux:CentOS 6.X、CentOS 7.X、ReHL 6.x、ReHL 7.x
    • Darwin:macOS Mojave 10.14.x
  • 已下载 Go SDK 1.2.0 版本,以及 rocketmq-client-cpp 1.2.2 或以上版本的 CPP SDK。

    由于从 rocketmq-client-cpp 1.2.2 版本开始,开源版本支持连接阿里云 MQ。因此 Go SDK 将 CPP 动态库依赖升级至 1.2.2 后,也可以无缝上云。

  • 已在阿里云 MQ 创建资源,包括 Topic、Group ID(GID)、接入点(Endpoint)以及 AccessKeyId 和 AccessKeySecret。针对新建实例(有独立命名空间),还包括 Instance ID。

    命名空间详情请参见产品更新日志

设置参数

下文中只涉及部分核心代码的示例,完整的代码请参见 Example

访问阿里云需要使用阿里云的鉴权方式和接入点。下文针对默认实例(无独立命名空间)和新建实例(有独立命名空间),分别列出需要修改的配置信息,其余的配置和开源 RocketMQ 的使用完全兼容。

默认实例(无独立命名空间)

  • 生产者示例代码

    1. producerConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
    2. //控制台上申请的 GID 或者 PID
    3. GroupID: "GID_XXXX",
    4. //控制台上获取的接入点信息
    5. NameServer: "http://XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
    6. Credentials:&rocketmq.SessionCredentials{
    7. AccessKey:"mq acesskey",
    8. SecretKey:"mq acesskey",
    9. Channel:"mq acesskey",
    10. },
    11. }}
    12. result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "Topic", Body: msg})
  • 消费者示例代码

    1. config := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
    2. //控制台上申请的 GID 或者 PID
    3. GroupID: "GID_XXXX",
    4. //控制台上获取的接入点信息
    5. NameServer: "http://XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
    6. Credentials:&rocketmq.SessionCredentials{
    7. AccessKey:"mq acesskey",
    8. SecretKey:"mq acesskey",
    9. Channel:"mq acesskey",
    10. },
    11. }}
    12. consumer, err := rocketmq.NewPushConsumer(config)
    13. //设置订阅的 Topic 和 TAGs,注册回调
    14. consumer.Subscribe("TOPIC", "TAGs", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
    15. fmt.Printf("A message received: \"%s\" \n", msg.Body)
    16. return rocketmq.ConsumeSuccess
    17. })
    18. ........

新建实例(有独立命名空间)

由于新建实例给用户提供了自己的命名空间,用于逻辑资源隔离,所以使用开源 SDK 访问时,需要显式的设置实例 ID,即 InstanceId,从控制台获取,格式为 MQ_INST_XXXXX_XXXX

  • 生产者示例代码

    1. producerConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
    2. //控制台上申请的 GID 或者 PID
    3. GroupID: "MQ_INST_XXXX_XXXX%GID_XXXX",
    4. //控制台上获取的接入点信息
    5. NameServer: "http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
    6. Credentials:&rocketmq.SessionCredentials{
    7. AccessKey:"mq acesskey",
    8. SecretKey:"mq acesskey",
    9. Channel:"mq acesskey",
    10. }
    11. }}
    12. ......
    13. result, err := producer.SendMessageSync(&rocketmq.Message{
    14. Topic: "MQ_INST_XXXX_XXXX%TOPIC",
    15. Body: msg})
  • 消费者示例代码

    1. config := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
    2. //控制台上申请的 GID 或者 PID
    3. GroupID: "MQ_INST_XXXX_XXXX%GID_XXXX",
    4. //控制台上获取的接入点信息
    5. NameServer: "http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
    6. Credentials:&rocketmq.SessionCredentials{
    7. AccessKey:"mq acesskey",
    8. SecretKey:"mq acesskey",
    9. Channel:"mq acesskey",
    10. }
    11. }}
    12. consumer, err := rocketmq.NewPushConsumer(config)
    13. //设置订阅的 Topic 和 TAGs,注册回调
    14. consumer.Subscribe("MQ_INST_XXXX_XXXX%TOPIC", "TAGs", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
    15. fmt.Printf("A message received: \"%s\" \n", msg.Body)
    16. return rocketmq.ConsumeSuccess
    17. })
    18. ........

更多信息

Go SDK 使用方法详情请参见 Introduction