在控制台创建完所有资源之后,您可以调用消息队列RocketMQ版的HTTP协议的SDK收发普通消息。

前提条件

  • 创建资源
    说明 本文以普通消息为例进行说明,因此您创建的普通消息的Topic不能用来收发其他类型的消息,包括定时、延时、顺序和事务消息。换言之,切勿混用不同消息类型的Topic。
  • 创建AccessKey

下载并安装HTTP协议的SDK

消息队列RocketMQ版提供以下语言的HTTP协议SDK,请按需下载并安装相应语言的客户端SDK。

调用HTTP协议的SDK发送普通消息

获取相应语言的客户端SDK后,您即可运行以下示例代码发送普通消息。

                           
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名。
                "${HTTP_ENDPOINT}",
                // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
                "${ACCESS_KEY}",
                // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
                "${SECRET_KEY}"
        );

        // 所属的Topic。
        final String topic = "${TOPIC}";
        // Topic所属实例ID,默认实例为空。
        final String instanceId = "${INSTANCE_ID}";

        // 获取Topic的生产者。
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // 循环发送4条消息。
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // 普通消息。
                    pubMsg = new TopicMessage(
                            // 消息内容。
                            "hello mq!".getBytes(),
                            // 消息标签。
                            "A"
                    );
                    // 设置属性。
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 设置Key。
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // 消息内容。
                            "hello mq!".getBytes(),
                            // 消息标签。
                            "A"
                    );
                    // 设置属性。
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 定时消息,定时时间为10s后。
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步发送消息,只要不抛异常就是成功。
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步发送消息,只要不抛异常就是成功。
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}
                           
package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // 设置HTTP接入域名。
    endpoint := "${HTTP_ENDPOINT}"
    // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
    accessKey := "${ACCESS_KEY}"
    // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
    secretKey := "${SECRET_KEY}"
    // 所属的Topic。
    topic := "${TOPIC}"
    // Topic所属实例ID,默认实例为空。
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // 循环发送4条消息。
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq!",         //消息内容。
                MessageTag:  "",                  // 消息标签。
                Properties:  map[string]string{}, // 消息属性。
            }
            // 设置Key。
            msg.MessageKey = "MessageKey"
            // 设置属性。
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer!",         //消息内容。
                MessageTag:  "",                  // 消息标签。
                Properties:  map[string]string{}, // 消息属性。
            }
            // 设置属性。
            msg.Properties["a"] = strconv.Itoa(i)
            // 定时消息,定时时间为10s后,值为毫秒级别的Unix时间戳。
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
        }
        ret, err := mqProducer.PublishMessage(msg)

        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}
                           
<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            // 设置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
            "${ACCESS_KEY}",
            // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
            "${SECRET_KEY}"
        );

        // 所属的Topic。
        $topic = "${TOPIC}";
        // Topic所属实例ID,默认实例为空NULL。
        $instanceId = "${INSTANCE_ID}";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx"// 消息内容。
                );
                // 设置属性。
                $publishMessage->putProperty("a", $i);
                // 设置消息Key。
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // 定时消息,定时时间为10s后。
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}


$instance = new ProducerTest();
$instance->run();

?>
                           
#!/usr/bin/env python
# coding=utf8
import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

#初始化Client。
mq_client = MQClient(
    #设置HTTP接入域名。
    "${HTTP_ENDPOINT}",
    #AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
    "${ACCESS_KEY}",
    #AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
    "${SECRET_KEY}"
	)
#所属的Topic。
topic_name = "${TOPIC}"
#Topic所属实例ID,默认实例为空None。
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# 循环发布多条消息。
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            msg = TopicMessage(
                    # 消息内容。
                    "I am test message %s.你好" % i, 
                    # 消息标签。
                    ""
                    )
            # 设置属性。
            msg.put_property("a", "i")
            # 设置Key。
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        else:
            msg = TopicMessage(
                    # 消息内容。
                    "I am test message %s." % i, 
                    # 消息标签。
                    ""
                    )
            msg.put_property("a", i)
            # 定时消息,毫秒级绝对时间。
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)
                           
const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

// 设置HTTP接入域名。
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
const accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// 所属的Topic。
const topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空。
const instanceId = "${INSTANCE_ID}";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    // 循环发送4条消息。
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        msgProps = new MessageProperties();
        // 设置属性。
        msgProps.putProperty("a", i);
        // 设置Key。
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        msgProps = new MessageProperties();
        // 设置属性。
        msgProps.putProperty("a", i);
        // 定时消息,定时时间为10s后。
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
    console.log(e)
  }
})();
                           
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // 设置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
            "${ACCESS_KEY}",
            // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
            "${SECRET_KEY}"
            );

    // 所属的Topic。
    string topic = "${TOPIC}";
    // Topic所属实例ID,默认实例为空。
    string instanceId = "${INSTANCE_ID}";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // publish message, only have body. 
                producer->publishMessage("Hello, mq!", pmResp);
            } else if (i % 4 == 1) {
                // publish message, only have body and tag.
                producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
            } else if (i % 4 == 2) {
                // publish message, have body,tag,properties and key.
                TopicMessage pubMsg("Hello, mq!have key!");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // publish timer message, message will be consumed after StartDeliverTime
                TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
                // StartDeliverTime is an absolute time in millisecond.
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic 
                << ", msgId is:" << pmResp.getMessageId() 
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // 设置HTTP接入域名。
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
        private const string _accessKeyId = "${ACCESS_KEY}";
        // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
        private const string _secretAccessKey = "${SECRET_KEY}";
        // 所属的Topic。
        private const string _topicName = "${TOPIC}";
        // Topic所属实例ID,默认实例为空。
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // 循环发送4条消息。
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        sendMsg = new TopicMessage("dfadfadfadf");
                        // 设置属性。
                        sendMsg.PutProperty("a", i.ToString());
                        // 设置Key。
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        // 设置属性。
                        sendMsg.PutProperty("a", i.ToString());
                        // 定时消息,定时时间为10s后。
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}

同时,您也可以在消息队列RocketMQ版控制台页面,找到您创建的Topic,在其操作列,单击更多,在下拉列表中,选择快速体验,按需快通过控制台或Docker快速体验。

调用HTTP协议的SDK消费普通消息

消息发送成功后,需要启动消费者来消费普通消息。请按需运行对应语言的示例代码来启动消费者,并测试消费消息的功能。请按照说明正确设置相关参数。

                           
import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class Consumer {
 
     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 // 设置HTTP接入域名。
                 "${HTTP_ENDPOINT}",
                 // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
                 "${ACCESS_KEY}",
                 // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
                 "${SECRET_KEY}"
         );
 
         // 所属的Topic。
         final String topic = "${TOPIC}";
         // 您在控制台创建的Group ID(Consumer ID)。
         final String groupId = "${GROUP_ID}";
         // Topic所属实例ID,默认实例为空。
         final String instanceId = "${INSTANCE_ID}";
 
         final MQConsumer consumer;
         if (instanceId != null && instanceId != "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }
 
         // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。
         do {
             List<Message> messages = null;
 
             try {
                 // 长轮询消费消息。
                 // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
                 messages = consumer.consumeMessage(
                         3,//  一次最多消费3条(最多可设置为16条)。
                         3// 长轮询时间3秒(最多可设置为30秒)。
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             // 没有消息。
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                 continue;
             }
 
             // 处理业务逻辑。
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }
 
             // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费。
             // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }
 
                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     // 某些消息的句柄可能超时了会导致确认不成功。
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() != null) {
                             for (String errorHandle :errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                         + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
 }
 
                           
package main

import (
	"fmt"
	"github.com/gogap/errors"
	"strings"
	"time"

	"github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
	// 设置HTTP接入域名。
	endpoint := "${HTTP_ENDPOINT}"
	// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
	accessKey := "${ACCESS_KEY}"
	// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
	secretKey := "${SECRET_KEY}"
	// 所属的Topic。
	topic := "${TOPIC}"
	// Topic所属实例ID,默认实例为空。
	instanceId := "${INSTANCE_ID}"
	// 您在控制台创建的Group ID(Consumer ID)。
	groupId := "${GROUP_ID}"

	client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

	mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

	for {
		endChan := make(chan int)
		respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
		errChan := make(chan error)
		go func() {
			select {
			case resp := <-respChan:
				{
					// 处理业务逻辑。
					var handles []string
					fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
					for _, v := range resp.Messages {
						handles = append(handles, v.ReceiptHandle)
						fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
							"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
							v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
							v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
					}

                    // NextConsumeTime前若不确认消息消费成功,则消息会重复消费。
                    // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
                    ackerr := mqConsumer.AckMessage(handles)
					if ackerr != nil {
						// 某些消息的句柄可能超时了会导致确认不成功。
						fmt.Println(ackerr)
						for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
							fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
								errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
						}
						time.Sleep(time.Duration(3) * time.Second)
					} else {
						fmt.Printf("Ack ---->\n\t%s\n", handles)
					}

					endChan <- 1
				}
			case err := <-errChan:
				{
					// 没有消息。
					if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
						fmt.Println("\nNo new message, continue!")
					} else {
						fmt.Println(err)
						time.Sleep(time.Duration(3) * time.Second)
					}
					endChan <- 1
				}
			case <-time.After(35 * time.Second):
				{
					fmt.Println("Timeout of consumer message ??")
					endChan <- 1
				}
			}
		}()

		// 长轮询消费消息。
		// 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
		mqConsumer.ConsumeMessage(respChan, errChan,
			3, //  一次最多消费3条(最多可设置为16条)。
			3, // 长轮询时间3秒(最多可设置为30秒)。
		)
		<-endChan
	}
}
                           
<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // 设置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
            "${ACCESS_KEY}",
            // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
            "${SECRET_KEY}"
        );

        // 所属的Topic。
        $topic = "${TOPIC}";
        // 您在控制台创建的Group ID(Consumer ID)。
        $groupId = "${GROUP_ID}";
        // Topic所属实例ID,默认实例为空NULL。
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。
        while (True) {
            try {
                // 长轮询消费消息。
                // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
                $messages = $this->consumer->consumeMessage(
                    3, //  一次最多消费3条(最多可设置为16条)。
                    3 // 长轮询时间3秒(最多可设置为30秒)。
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // 没有消息可以消费,接着轮询。
                    printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // 处理业务逻辑。
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // $message->getNextConsumeTime() 前若不确认消息消费成功,则消息会重复消费。
            // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    // 某些消息的句柄可能超时了会导致确认不成功。
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";


        }

    }
}


$instance = new ConsumerTest();
$instance->run();

?>
                           
#!/usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

#初始化client
mq_client = MQClient(
    #设置HTTP接入域名。
    "",
    #AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
    "${ACCESS_KEY}",
    #AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
    "${SECRET_KEY}"
  )
#所属的Topic。
topic_name = "${TOPIC}"
#您在控制台创建的Group ID。
group_id = "GID_test"
#Topic所属实例ID,默认实例为空None。
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

#长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
#长轮询时间3秒(最多可设置为30秒)。
wait_seconds = 3
#一次最多消费3条(最多可设置为16条)。
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
    try:
        #长轮询消费消息。
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print "Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s" % \
                             (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle)
    except MQExceptionBase, e:
        if e.type == "MessageNotExist":
            print "No new message! RequestId: %s" % e.req_id
            continue

        print "Consume Message Fail! Exception:%s\n" % e
        time.sleep(2)
        continue

    #msg.next_consume_time前若不确认消息消费成功,则消息会重复消费。
    #消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
    except MQExceptionBase, e:
        print "\nAk Message Fail! Exception:%s" % e
        #某些消息的句柄可能超时了会导致确认不成功。
        if e.sub_errors:
          for sub_error in e.sub_errors:
            print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
                           
const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

// 设置HTTP接入域名。
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
const accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// 所属的Topic。
const topic = "${TOPIC}";
// 您在控制台创建的Group ID(Consumer ID)。
const groupId = "${GROUP_ID}";
// Topic所属实例ID,默认实例为空。
const instanceId = "${INSTANCE_ID}";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  // 循环消费消息。
  while(true) {
    try {
      // 长轮询消费消息。
      // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
      res = await consumer.consumeMessage(
          3, //  一次最多消费3条(最多可设置为16条)。
          3 // 长轮询时间3秒(最多可设置为30秒)。
          );

      if (res.code == 200) {
        // 消费消息,处理业务逻辑。
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" + 
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
          return message.ReceiptHandle;
        });

        // message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费。
        // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          // 某些消息的句柄可能超时了会导致确认不成功。
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          // 消息确认消费成功。
          console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        // 没有消息,则继续长轮询服务器。
        console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
	public class ConsumerSample
    {
        // 设置HTTP接入域名。
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
        private const string _accessKeyId = "${ACCESS_KEY}";
        // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
        private const string _secretAccessKey = "${SECRET_KEY}";
        // 所属的Topic。
        private const string _topicName = "${TOPIC}";
        // Topic所属实例ID,默认实例为空。
        private const string _instanceId = "${INSTANCE_ID}";
        // 您在控制台创建的Group ID(Consumer ID)。
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。
            while (true)
            {
                try
                {
                    // 长轮询消费消息。
                    // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, //  一次最多消费3条(最多可设置为16条)。
                            3 // 长轮询时间3秒(最多可设置为30秒)。
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // 处理业务逻辑。
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费。
                    // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // 某些消息的句柄可能超时了会导致确认不成功。
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}
                           
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // 设置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。
            "${ACCESS_KEY}",
            // AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。
            "${SECRET_KEY}"
            );

    // 所属的Topic。
    string topic = "${TOPIC}";
    // 您在控制台创建的Group ID(Consumer ID)。
    string groupId = "${GROUP_ID}";
    // Topic所属实例ID,默认实例为空。
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // 长轮询消费消息。
            // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
            consumer->consumeMessage(
                    3,// 一次最多消费3条(最多可设置为16条)。
                    3,//长轮询时间3秒(最多可设置为30秒) 。
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages!" << endl;

            // 处理消息。
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes() 
                    << " Properties: " << iter->getPropertiesAsString() 
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // 确认消息消费成功。
            // Message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费。
            // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                // 某些消息的句柄可能超时了会导致确认不成功。
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}

后续步骤

您可通过查询消息及其轨迹的方式验证消息是否消费成功。详情请参见消息查询查询消息轨迹