在控制台创建完所有资源之后,您可以调用云消息队列 RocketMQ 版的HTTP协议的SDK收发普通消息。
前提条件
- 说明
本文以普通消息为例进行说明,因此您创建的普通消息的Topic不能用来收发其他类型的消息,包括定时、延时、顺序和事务消息。换言之,切勿混用不同消息类型的Topic。
下载并安装HTTP协议的SDK
云消息队列 RocketMQ 版提供以下语言的HTTP协议SDK,请按需下载并安装相应语言的客户端SDK。
调用HTTP协议的SDK发送普通消息
获取相应语言的客户端SDK后,您即可运行以下示例代码发送普通消息。
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP接入域名。
"${HTTP_ENDPOINT}",
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
);
// 所属的Topic。
final String topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空。
final String instanceId = "${INSTANCE_ID}";
// 获取Topic的生产者。
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// 循环发送4条消息。
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// 普通消息。
pubMsg = new TopicMessage(
// 消息内容。
"hello mq!".getBytes(),
// 消息标签。
"A"
);
// 设置属性。
pubMsg.getProperties().put("a", String.valueOf(i));
// 设置Key。
pubMsg.setMessageKey("MessageKey");
} else {
pubMsg = new TopicMessage(
// 消息内容。
"hello mq!".getBytes(),
// 消息标签。
"A"
);
// 设置属性。
pubMsg.getProperties().put("a", String.valueOf(i));
// 定时消息,定时时间为10s后。
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// 同步发送消息,只要不抛异常就是成功。
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步发送消息,只要不抛异常就是成功。
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}
Go
package main
import (
"fmt"
"time"
"strconv"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// 设置HTTP接入域名。
endpoint := "${HTTP_ENDPOINT}"
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
accessKey := "${ACCESS_KEY}"
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
secretKey := "${SECRET_KEY}"
// 所属的Topic。
topic := "${TOPIC}"
// Topic所属实例ID,默认实例为空。
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// 循环发送4条消息。
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
if i%2 == 0 {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", //消息内容。
MessageTag: "", // 消息标签。
Properties: map[string]string{}, // 消息属性。
}
// 设置Key。
msg.MessageKey = "MessageKey"
// 设置属性。
msg.Properties["a"] = strconv.Itoa(i)
} else {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq timer!", //消息内容。
MessageTag: "", // 消息标签。
Properties: map[string]string{}, // 消息属性。
}
// 设置属性。
msg.Properties["a"] = strconv.Itoa(i)
// 定时消息,定时时间为10s后,值为毫秒级别的Unix时间戳。
msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
}
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
// 设置HTTP接入域名。
"${HTTP_ENDPOINT}",
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
);
// 所属的Topic。
$topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空NULL。
$instanceId = "${INSTANCE_ID}";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i=1; $i<=4; $i++)
{
$publishMessage = new TopicMessage(
"xxxxxxxx"// 消息内容。
);
// 设置属性。
$publishMessage->putProperty("a", $i);
// 设置消息Key。
$publishMessage->setMessageKey("MessageKey");
if ($i % 2 == 0) {
// 定时消息,定时时间为10s后。
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
}
$result = $this->producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?>
Python
#!/usr/bin/env python
# coding=utf8
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
#初始化Client。
mq_client = MQClient(
#设置HTTP接入域名。
"${HTTP_ENDPOINT}",
#AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
#AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
)
#所属的Topic。
topic_name = "${TOPIC}"
#Topic所属实例ID,默认实例为空None。
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
# 循环发布多条消息。
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
if i % 2 == 0:
msg = TopicMessage(
# 消息内容。
"I am test message %s.你好" % i,
# 消息标签。
""
)
# 设置属性。
msg.put_property("a", "i")
# 设置Key。
msg.set_message_key("MessageKey")
re_msg = producer.publish_message(msg)
print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
else:
msg = TopicMessage(
# 消息内容。
"I am test message %s." % i,
# 消息标签。
""
)
msg.put_property("a", i)
# 定时消息,毫秒级绝对时间。
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
time.sleep(1)
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)
Node.js
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
// 设置HTTP接入域名。
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
const accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
const accessKeySecret = "${SECRET_KEY}";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// 所属的Topic。
const topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空。
const instanceId = "${INSTANCE_ID}";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
// 循环发送4条消息。
for(var i = 0; i < 4; i++) {
let res;
if (i % 2 == 0) {
msgProps = new MessageProperties();
// 设置属性。
msgProps.putProperty("a", i);
// 设置Key。
msgProps.messageKey("MessageKey");
res = await producer.publishMessage("hello mq.", "", msgProps);
} else {
msgProps = new MessageProperties();
// 设置属性。
msgProps.putProperty("a", i);
// 定时消息,定时时间为10s后。
msgProps.startDeliverTime(Date.now() + 10 * 1000);
res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
}
console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
console.log(e)
}
})();
C++
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// 设置HTTP接入域名。
"${HTTP_ENDPOINT}",
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
);
// 所属的Topic。
string topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空。
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
if (i % 4 == 0) {
// publish message, only have body.
producer->publishMessage("Hello, mq!", pmResp);
} else if (i % 4 == 1) {
// publish message, only have body and tag.
producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
} else if (i % 4 == 2) {
// publish message, have body,tag,properties and key.
TopicMessage pubMsg("Hello, mq!have key!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
} else {
// publish timer message, message will be consumed after StartDeliverTime
TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
// StartDeliverTime is an absolute time in millisecond.
pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
pubMsg.putProperty("b",std::to_string(i));
pubMsg.putProperty("c",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
}
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}
C#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class ProducerSample
{
// 设置HTTP接入域名。
private const string _endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
private const string _accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
private const string _secretAccessKey = "${SECRET_KEY}";
// 所属的Topic。
private const string _topicName = "${TOPIC}";
// Topic所属实例ID,默认实例为空。
private const string _instanceId = "${INSTANCE_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
// 循环发送4条消息。
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
if (i % 2 == 0)
{
sendMsg = new TopicMessage("dfadfadfadf");
// 设置属性。
sendMsg.PutProperty("a", i.ToString());
// 设置Key。
sendMsg.MessageKey = "MessageKey";
}
else
{
sendMsg = new TopicMessage("dfadfadfadf", "tag");
// 设置属性。
sendMsg.PutProperty("a", i.ToString());
// 定时消息,定时时间为10s后。
sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
}
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}
同时,您也可以在控制台页面,找到您创建的Topic,在其操作列,单击更多,在下拉列表中,选择快速体验,按需通过控制台或Docker快速体验。
调用HTTP协议的SDK消费普通消息
消息发送成功后,需要启动消费者来消费普通消息。请按需运行对应语言的示例代码来启动消费者,并测试消费消息的功能。请按照说明正确设置相关参数。
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP接入域名。
"${HTTP_ENDPOINT}",
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
);
// 所属的Topic。
final String topic = "${TOPIC}";
// 您在控制台创建的Group ID(Consumer ID)。
final String groupId = "${GROUP_ID}";
// Topic所属实例ID,默认实例为空。
final String instanceId = "${INSTANCE_ID}";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。
do {
List<Message> messages = null;
try {
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
messages = consumer.consumeMessage(
3,// 一次最多消费3条(最多可设置为16条)。
3// 长轮询时间3秒(最多可设置为30秒)。
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// 没有消息。
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}
// 处理业务逻辑。
for (Message message : messages) {
System.out.println("Receive message: " + message);
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// 某些消息的句柄可能超时了会导致确认不成功。
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}
Go
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// 设置HTTP接入域名。
endpoint := "${HTTP_ENDPOINT}"
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
accessKey := "${ACCESS_KEY}"
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
secretKey := "${SECRET_KEY}"
// 所属的Topic。
topic := "${TOPIC}"
// Topic所属实例ID,默认实例为空。
instanceId := "${INSTANCE_ID}"
// 您在控制台创建的Group ID(Consumer ID)。
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// 处理业务逻辑。
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// NextConsumeTime前若不确认消息消费成功,则消息会重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// 某些消息的句柄可能超时了会导致确认不成功。
fmt.Println(ackerr)
for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// 没有消息。
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
mqConsumer.ConsumeMessage(respChan, errChan,
3, // 一次最多消费3条(最多可设置为16条)。
3, // 长轮询时间3秒(最多可设置为30秒)。
)
<-endChan
}
}
PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
// 设置HTTP接入域名。
"${HTTP_ENDPOINT}",
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
);
// 所属的Topic。
$topic = "${TOPIC}";
// 您在控制台创建的Group ID(Consumer ID)。
$groupId = "${GROUP_ID}";
// Topic所属实例ID,默认实例为空NULL。
$instanceId = "${INSTANCE_ID}";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function run()
{
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。
while (True) {
try {
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
$messages = $this->consumer->consumeMessage(
3, // 一次最多消费3条(最多可设置为16条)。
3 // 长轮询时间3秒(最多可设置为30秒)。
);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
// 没有消息可以消费,接着轮询。
printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "consume finish, messages:\n";
// 处理业务逻辑。
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// $message->getNextConsumeTime() 前若不确认消息消费成功,则消息会重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
// 某些消息的句柄可能超时了会导致确认不成功。
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
print "ack finish\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?>
Python
#!/usr/bin/env python
# coding=utf8
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
#初始化client
mq_client = MQClient(
#设置HTTP接入域名。
"",
#AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
#AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
)
#所属的Topic。
topic_name = "${TOPIC}"
#您在控制台创建的Group ID。
group_id = "GID_test"
#Topic所属实例ID,默认实例为空None。
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
#长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
#长轮询时间3秒(最多可设置为30秒)。
wait_seconds = 3
#一次最多消费3条(最多可设置为16条)。
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
try:
#长轮询消费消息。
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print "Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle)
except MQExceptionBase, e:
if e.type == "MessageNotExist":
print "No new message! RequestId: %s" % e.req_id
continue
print "Consume Message Fail! Exception:%s\n" % e
time.sleep(2)
continue
#msg.next_consume_time前若不确认消息消费成功,则消息会重复消费。
#消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
except MQExceptionBase, e:
print "\nAk Message Fail! Exception:%s" % e
#某些消息的句柄可能超时了会导致确认不成功。
if e.sub_errors:
for sub_error in e.sub_errors:
print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
Node.js
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
// 设置HTTP接入域名。
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
const accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
const accessKeySecret = "${SECRET_KEY}";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// 所属的Topic。
const topic = "${TOPIC}";
// 您在控制台创建的Group ID(Consumer ID)。
const groupId = "${GROUP_ID}";
// Topic所属实例ID,默认实例为空。
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
// 循环消费消息。
while(true) {
try {
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
res = await consumer.consumeMessage(
3, // 一次最多消费3条(最多可设置为16条)。
3 // 长轮询时间3秒(最多可设置为30秒)。
);
if (res.code == 200) {
// 消费消息,处理业务逻辑。
console.log("Consume Messages, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
return message.ReceiptHandle;
});
// message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
res = await consumer.ackMessage(handles);
if (res.code != 204) {
// 某些消息的句柄可能超时了会导致确认不成功。
console.log("Ack Message Fail:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
// 消息确认消费成功。
console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
// 没有消息,则继续长轮询服务器。
console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();
C++
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// 设置HTTP接入域名。
"${HTTP_ENDPOINT}",
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
"${ACCESS_KEY}",
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
"${SECRET_KEY}"
);
// 所属的Topic。
string topic = "${TOPIC}";
// 您在控制台创建的Group ID(Consumer ID)。
string groupId = "${GROUP_ID}";
// Topic所属实例ID,默认实例为空。
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
consumer->consumeMessage(
3,// 一次最多消费3条(最多可设置为16条)。
3,//长轮询时间3秒(最多可设置为30秒) 。
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// 处理消息。
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// 确认消息消费成功。
// Message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// 某些消息的句柄可能超时了会导致确认不成功。
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}
C#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
// 设置HTTP接入域名。
private const string _endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。
private const string _accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。
private const string _secretAccessKey = "${SECRET_KEY}";
// 所属的Topic。
private const string _topicName = "${TOPIC}";
// Topic所属实例ID,默认实例为空。
private const string _instanceId = "${INSTANCE_ID}";
// 您在控制台创建的Group ID(Consumer ID)。
private const string _groupId = "${GROUP_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。
while (true)
{
try
{
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessage(
3, // 一次最多消费3条(最多可设置为16条)。
3 // 长轮询时间3秒(最多可设置为30秒)。
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
// 处理业务逻辑。
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Property a is:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// 某些消息的句柄可能超时了会导致确认不成功。
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}
后续步骤
文档内容是否对您有帮助?