在控制台创建完所有资源之后,您可以调用消息队列RocketMQ版的HTTP协议的SDK收发普通消息。
前提条件
- 创建资源说明 本文以普通消息为例进行说明,因此您创建的普通消息的Topic不能用来收发其他类型的消息,包括定时、延时、顺序和事务消息。换言之,切勿混用不同消息类型的Topic。
- 创建AccessKey
下载并安装HTTP协议的SDK
消息队列RocketMQ版提供以下语言的HTTP协议SDK,请按需下载并安装相应语言的客户端SDK。
调用HTTP协议的SDK发送普通消息
获取相应语言的客户端SDK后,您即可运行以下示例代码发送普通消息。
import com.aliyun.mq.http.MQClient; import com.aliyun.mq.http.MQProducer; import com.aliyun.mq.http.model.TopicMessage; import java.util.Date; public class Producer { public static void main(String[] args) { MQClient mqClient = new MQClient( // 设置HTTP接入域名。 "${HTTP_ENDPOINT}", // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ); // 所属的Topic。 final String topic = "${TOPIC}"; // Topic所属实例ID,默认实例为空。 final String instanceId = "${INSTANCE_ID}"; // 获取Topic的生产者。 MQProducer producer; if (instanceId != null && instanceId != "") { producer = mqClient.getProducer(instanceId, topic); } else { producer = mqClient.getProducer(topic); } try { // 循环发送4条消息。 for (int i = 0; i < 4; i++) { TopicMessage pubMsg; if (i % 2 == 0) { // 普通消息。 pubMsg = new TopicMessage( // 消息内容。 "hello mq!".getBytes(), // 消息标签。 "A" ); // 设置属性。 pubMsg.getProperties().put("a", String.valueOf(i)); // 设置Key。 pubMsg.setMessageKey("MessageKey"); } else { pubMsg = new TopicMessage( // 消息内容。 "hello mq!".getBytes(), // 消息标签。 "A" ); // 设置属性。 pubMsg.getProperties().put("a", String.valueOf(i)); // 定时消息,定时时间为10s后。 pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000); } // 同步发送消息,只要不抛异常就是成功。 TopicMessage pubResultMsg = producer.publishMessage(pubMsg); // 同步发送消息,只要不抛异常就是成功。 System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId() + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()); } } catch (Throwable e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + topic); e.printStackTrace(); } mqClient.close(); } }
package main import ( "fmt" "time" "strconv" "github.com/aliyunmq/mq-http-go-sdk" ) func main() { // 设置HTTP接入域名。 endpoint := "${HTTP_ENDPOINT}" // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 accessKey := "${ACCESS_KEY}" // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 secretKey := "${SECRET_KEY}" // 所属的Topic。 topic := "${TOPIC}" // Topic所属实例ID,默认实例为空。 instanceId := "${INSTANCE_ID}" client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "") mqProducer := client.GetProducer(instanceId, topic) // 循环发送4条消息。 for i := 0; i < 4; i++ { var msg mq_http_sdk.PublishMessageRequest if i%2 == 0 { msg = mq_http_sdk.PublishMessageRequest{ MessageBody: "hello mq!", //消息内容。 MessageTag: "", // 消息标签。 Properties: map[string]string{}, // 消息属性。 } // 设置Key。 msg.MessageKey = "MessageKey" // 设置属性。 msg.Properties["a"] = strconv.Itoa(i) } else { msg = mq_http_sdk.PublishMessageRequest{ MessageBody: "hello mq timer!", //消息内容。 MessageTag: "", // 消息标签。 Properties: map[string]string{}, // 消息属性。 } // 设置属性。 msg.Properties["a"] = strconv.Itoa(i) // 定时消息,定时时间为10s后,值为毫秒级别的Unix时间戳。 msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000 } ret, err := mqProducer.PublishMessage(msg) if err != nil { fmt.Println(err) return } else { fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5) } time.Sleep(time.Duration(100) * time.Millisecond) } }
<?php require "vendor/autoload.php"; use MQ\Model\TopicMessage; use MQ\MQClient; class ProducerTest { private $client; private $producer; public function __construct() { $this->client = new MQClient( // 设置HTTP接入域名。 "${HTTP_ENDPOINT}", // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ); // 所属的Topic。 $topic = "${TOPIC}"; // Topic所属实例ID,默认实例为空NULL。 $instanceId = "${INSTANCE_ID}"; $this->producer = $this->client->getProducer($instanceId, $topic); } public function run() { try { for ($i=1; $i<=4; $i++) { $publishMessage = new TopicMessage( "xxxxxxxx"// 消息内容。 ); // 设置属性。 $publishMessage->putProperty("a", $i); // 设置消息Key。 $publishMessage->setMessageKey("MessageKey"); if ($i % 2 == 0) { // 定时消息,定时时间为10s后。 $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000); } $result = $this->producer->publishMessage($publishMessage); print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n"; } } catch (\Exception $e) { print_r($e->getMessage() . "\n"); } } } $instance = new ProducerTest(); $instance->run(); ?>
#!/usr/bin/env python # coding=utf8 import sys from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.mq_producer import * from mq_http_sdk.mq_client import * import time #初始化Client。 mq_client = MQClient( #设置HTTP接入域名。 "${HTTP_ENDPOINT}", #AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", #AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ) #所属的Topic。 topic_name = "${TOPIC}" #Topic所属实例ID,默认实例为空None。 instance_id = "${INSTANCE_ID}" producer = mq_client.get_producer(instance_id, topic_name) # 循环发布多条消息。 msg_count = 4 print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count)) try: for i in range(msg_count): if i % 2 == 0: msg = TopicMessage( # 消息内容。 "I am test message %s.你好" % i, # 消息标签。 "" ) # 设置属性。 msg.put_property("a", "i") # 设置Key。 msg.set_message_key("MessageKey") re_msg = producer.publish_message(msg) print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5)) else: msg = TopicMessage( # 消息内容。 "I am test message %s." % i, # 消息标签。 "" ) msg.put_property("a", i) # 定时消息,毫秒级绝对时间。 msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000) re_msg = producer.publish_message(msg) print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5)) time.sleep(1) except MQExceptionBase as e: if e.type == "TopicNotExist": print("Topic not exist, please create it.") sys.exit(1) print("Publish Message Fail. Exception:%s" % e)
const { MQClient, MessageProperties } = require('@aliyunmq/mq-http-sdk'); // 设置HTTP接入域名。 const endpoint = "${HTTP_ENDPOINT}"; // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 const accessKeyId = "${ACCESS_KEY}"; // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 const accessKeySecret = "${SECRET_KEY}"; var client = new MQClient(endpoint, accessKeyId, accessKeySecret); // 所属的Topic。 const topic = "${TOPIC}"; // Topic所属实例ID,默认实例为空。 const instanceId = "${INSTANCE_ID}"; const producer = client.getProducer(instanceId, topic); (async function(){ try { // 循环发送4条消息。 for(var i = 0; i < 4; i++) { let res; if (i % 2 == 0) { msgProps = new MessageProperties(); // 设置属性。 msgProps.putProperty("a", i); // 设置Key。 msgProps.messageKey("MessageKey"); res = await producer.publishMessage("hello mq.", "", msgProps); } else { msgProps = new MessageProperties(); // 设置属性。 msgProps.putProperty("a", i); // 定时消息,定时时间为10s后。 msgProps.startDeliverTime(Date.now() + 10 * 1000); res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps); } console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5); } } catch(e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 console.log(e) } })();
//#include <iostream> #include <fstream> #include <time.h> #include "mq_http_sdk/mq_client.h" using namespace std; using namespace mq::http::sdk; int main() { MQClient mqClient( // 设置HTTP接入域名。 "${HTTP_ENDPOINT}", // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ); // 所属的Topic。 string topic = "${TOPIC}"; // Topic所属实例ID,默认实例为空。 string instanceId = "${INSTANCE_ID}"; MQProducerPtr producer; if (instanceId == "") { producer = mqClient.getProducerRef(topic); } else { producer = mqClient.getProducerRef(instanceId, topic); } try { for (int i = 0; i < 4; i++) { PublishMessageResponse pmResp; if (i % 4 == 0) { // publish message, only have body. producer->publishMessage("Hello, mq!", pmResp); } else if (i % 4 == 1) { // publish message, only have body and tag. producer->publishMessage("Hello, mq!have tag!", "tag", pmResp); } else if (i % 4 == 2) { // publish message, have body,tag,properties and key. TopicMessage pubMsg("Hello, mq!have key!"); pubMsg.putProperty("a",std::to_string(i)); pubMsg.setMessageKey("MessageKey" + std::to_string(i)); producer->publishMessage(pubMsg, pmResp); } else { // publish timer message, message will be consumed after StartDeliverTime TopicMessage pubMsg("Hello, mq!timer msg!", "tag"); // StartDeliverTime is an absolute time in millisecond. pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000); pubMsg.putProperty("b",std::to_string(i)); pubMsg.putProperty("c",std::to_string(i)); producer->publishMessage(pubMsg, pmResp); } cout << "Publish mq message success. Topic is: " << topic << ", msgId is:" << pmResp.getMessageId() << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl; } } catch (MQServerException& me) { cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl; return -1; } catch (MQExceptionBase& mb) { cout << "Request Failed: " + mb.ToString() << endl; return -2; } return 0; }
using System; using System.Collections.Generic; using System.Threading; using Aliyun.MQ.Model; using Aliyun.MQ.Model.Exp; using Aliyun.MQ.Util; namespace Aliyun.MQ.Sample { public class ProducerSample { // 设置HTTP接入域名。 private const string _endpoint = "${HTTP_ENDPOINT}"; // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 private const string _accessKeyId = "${ACCESS_KEY}"; // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 private const string _secretAccessKey = "${SECRET_KEY}"; // 所属的Topic。 private const string _topicName = "${TOPIC}"; // Topic所属实例ID,默认实例为空。 private const string _instanceId = "${INSTANCE_ID}"; private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint); static MQProducer producer = _client.GetProducer(_instanceId, _topicName); static void Main(string[] args) { try { // 循环发送4条消息。 for (int i = 0; i < 4; i++) { TopicMessage sendMsg; if (i % 2 == 0) { sendMsg = new TopicMessage("dfadfadfadf"); // 设置属性。 sendMsg.PutProperty("a", i.ToString()); // 设置Key。 sendMsg.MessageKey = "MessageKey"; } else { sendMsg = new TopicMessage("dfadfadfadf", "tag"); // 设置属性。 sendMsg.PutProperty("a", i.ToString()); // 定时消息,定时时间为10s后。 sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000; } TopicMessage result = producer.PublishMessage(sendMsg); Console.WriteLine("publis message success:" + result); } } catch (Exception ex) { Console.Write(ex); } } } }
同时,您也可以在消息队列RocketMQ版控制台页面,找到您创建的Topic,在其操作列,单击更多,在下拉列表中,选择快速体验,按需快通过控制台或Docker快速体验。
调用HTTP协议的SDK消费普通消息
消息发送成功后,需要启动消费者来消费普通消息。请按需运行对应语言的示例代码来启动消费者,并测试消费消息的功能。请按照说明正确设置相关参数。
import com.aliyun.mq.http.MQClient; import com.aliyun.mq.http.MQConsumer; import com.aliyun.mq.http.common.AckMessageException; import com.aliyun.mq.http.model.Message; import java.util.ArrayList; import java.util.List; public class Consumer { public static void main(String[] args) { MQClient mqClient = new MQClient( // 设置HTTP接入域名。 "${HTTP_ENDPOINT}", // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ); // 所属的Topic。 final String topic = "${TOPIC}"; // 您在控制台创建的Group ID(Consumer ID)。 final String groupId = "${GROUP_ID}"; // Topic所属实例ID,默认实例为空。 final String instanceId = "${INSTANCE_ID}"; final MQConsumer consumer; if (instanceId != null && instanceId != "") { consumer = mqClient.getConsumer(instanceId, topic, groupId, null); } else { consumer = mqClient.getConsumer(topic, groupId); } // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。 do { List<Message> messages = null; try { // 长轮询消费消息。 // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。 messages = consumer.consumeMessage( 3,// 一次最多消费3条(最多可设置为16条)。 3// 长轮询时间3秒(最多可设置为30秒)。 ); } catch (Throwable e) { e.printStackTrace(); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } } // 没有消息。 if (messages == null || messages.isEmpty()) { System.out.println(Thread.currentThread().getName() + ": no new message, continue!"); continue; } // 处理业务逻辑。 for (Message message : messages) { System.out.println("Receive message: " + message); } // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费。 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 { List<String> handles = new ArrayList<String>(); for (Message message : messages) { handles.add(message.getReceiptHandle()); } try { consumer.ackMessage(handles); } catch (Throwable e) { // 某些消息的句柄可能超时了会导致确认不成功。 if (e instanceof AckMessageException) { AckMessageException errors = (AckMessageException) e; System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:"); if (errors.getErrorMessages() != null) { for (String errorHandle :errors.getErrorMessages().keySet()) { System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode() + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage()); } } continue; } e.printStackTrace(); } } } while (true); } }
package main import ( "fmt" "github.com/gogap/errors" "strings" "time" "github.com/aliyunmq/mq-http-go-sdk" ) func main() { // 设置HTTP接入域名。 endpoint := "${HTTP_ENDPOINT}" // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 accessKey := "${ACCESS_KEY}" // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 secretKey := "${SECRET_KEY}" // 所属的Topic。 topic := "${TOPIC}" // Topic所属实例ID,默认实例为空。 instanceId := "${INSTANCE_ID}" // 您在控制台创建的Group ID(Consumer ID)。 groupId := "${GROUP_ID}" client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "") mqConsumer := client.GetConsumer(instanceId, topic, groupId, "") for { endChan := make(chan int) respChan := make(chan mq_http_sdk.ConsumeMessageResponse) errChan := make(chan error) go func() { select { case resp := <-respChan: { // 处理业务逻辑。 var handles []string fmt.Printf("Consume %d messages---->\n", len(resp.Messages)) for _, v := range resp.Messages { handles = append(handles, v.ReceiptHandle) fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+ "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+ "\tBody: %s\n"+ "\tProps: %s\n", v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes, v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties) } // NextConsumeTime前若不确认消息消费成功,则消息会重复消费。 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 ackerr := mqConsumer.AckMessage(handles) if ackerr != nil { // 某些消息的句柄可能超时了会导致确认不成功。 fmt.Println(ackerr) for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) { fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg) } time.Sleep(time.Duration(3) * time.Second) } else { fmt.Printf("Ack ---->\n\t%s\n", handles) } endChan <- 1 } case err := <-errChan: { // 没有消息。 if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") { fmt.Println("\nNo new message, continue!") } else { fmt.Println(err) time.Sleep(time.Duration(3) * time.Second) } endChan <- 1 } case <-time.After(35 * time.Second): { fmt.Println("Timeout of consumer message ??") endChan <- 1 } } }() // 长轮询消费消息。 // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。 mqConsumer.ConsumeMessage(respChan, errChan, 3, // 一次最多消费3条(最多可设置为16条)。 3, // 长轮询时间3秒(最多可设置为30秒)。 ) <-endChan } }
<?php require "vendor/autoload.php"; use MQ\Model\TopicMessage; use MQ\MQClient; class ConsumerTest { private $client; private $consumer; public function __construct() { $this->client = new MQClient( // 设置HTTP接入域名。 "${HTTP_ENDPOINT}", // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ); // 所属的Topic。 $topic = "${TOPIC}"; // 您在控制台创建的Group ID(Consumer ID)。 $groupId = "${GROUP_ID}"; // Topic所属实例ID,默认实例为空NULL。 $instanceId = "${INSTANCE_ID}"; $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId); } public function run() { // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。 while (True) { try { // 长轮询消费消息。 // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。 $messages = $this->consumer->consumeMessage( 3, // 一次最多消费3条(最多可设置为16条)。 3 // 长轮询时间3秒(最多可设置为30秒)。 ); } catch (\Exception $e) { if ($e instanceof MQ\Exception\MessageNotExistException) { // 没有消息可以消费,接着轮询。 printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId()); continue; } print_r($e->getMessage() . "\n"); sleep(3); continue; } print "consume finish, messages:\n"; // 处理业务逻辑。 $receiptHandles = array(); foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n", $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(), $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(), $message->getMessageKey()); print_r($message->getProperties()); } // $message->getNextConsumeTime() 前若不确认消息消费成功,则消息会重复消费。 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 print_r($receiptHandles); try { $this->consumer->ackMessage($receiptHandles); } catch (\Exception $e) { if ($e instanceof MQ\Exception\AckMessageException) { // 某些消息的句柄可能超时了会导致确认不成功。 printf("Ack Error, RequestId:%s\n", $e->getRequestId()); foreach ($e->getAckMessageErrorItems() as $errorItem) { printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode()); } } } print "ack finish\n"; } } } $instance = new ConsumerTest(); $instance->run(); ?>
#!/usr/bin/env python # coding=utf8 from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.mq_consumer import * from mq_http_sdk.mq_client import * #初始化client mq_client = MQClient( #设置HTTP接入域名。 "", #AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", #AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ) #所属的Topic。 topic_name = "${TOPIC}" #您在控制台创建的Group ID。 group_id = "GID_test" #Topic所属实例ID,默认实例为空None。 instance_id = "MQ_INST_1380156306793859_BbXbx0Y4" consumer = mq_client.get_consumer(instance_id, topic_name, group_id) #长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。 #长轮询时间3秒(最多可设置为30秒)。 wait_seconds = 3 #一次最多消费3条(最多可设置为16条)。 batch = 3 print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds) while True: try: #长轮询消费消息。 recv_msgs = consumer.consume_message(batch, wait_seconds) for msg in recv_msgs: print "Receive, MessageId: %s\nMessageBodyMD5: %s \ \nMessageTag: %s\nConsumedTimes: %s \ \nPublishTime: %s\nBody: %s \ \nNextConsumeTime: %s \ \nReceiptHandle: %s" % \ (msg.message_id, msg.message_body_md5, msg.message_tag, msg.consumed_times, msg.publish_time, msg.message_body, msg.next_consume_time, msg.receipt_handle) except MQExceptionBase, e: if e.type == "MessageNotExist": print "No new message! RequestId: %s" % e.req_id continue print "Consume Message Fail! Exception:%s\n" % e time.sleep(2) continue #msg.next_consume_time前若不确认消息消费成功,则消息会重复消费。 #消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 try: receipt_handle_list = [msg.receipt_handle for msg in recv_msgs] consumer.ack_message(receipt_handle_list) print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list) except MQExceptionBase, e: print "\nAk Message Fail! Exception:%s" % e #某些消息的句柄可能超时了会导致确认不成功。 if e.sub_errors: for sub_error in e.sub_errors: print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
const { MQClient } = require('@aliyunmq/mq-http-sdk'); // 设置HTTP接入域名。 const endpoint = "${HTTP_ENDPOINT}"; // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 const accessKeyId = "${ACCESS_KEY}"; // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 const accessKeySecret = "${SECRET_KEY}"; var client = new MQClient(endpoint, accessKeyId, accessKeySecret); // 所属的Topic。 const topic = "${TOPIC}"; // 您在控制台创建的Group ID(Consumer ID)。 const groupId = "${GROUP_ID}"; // Topic所属实例ID,默认实例为空。 const instanceId = "${INSTANCE_ID}"; const consumer = client.getConsumer(instanceId, topic, groupId); (async function(){ // 循环消费消息。 while(true) { try { // 长轮询消费消息。 // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。 res = await consumer.consumeMessage( 3, // 一次最多消费3条(最多可设置为16条)。 3 // 长轮询时间3秒(最多可设置为30秒)。 ); if (res.code == 200) { // 消费消息,处理业务逻辑。 console.log("Consume Messages, requestId:%s", res.requestId); const handles = res.body.map((message) => { console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" + ",Props:%j,MessageKey:%s,Prop-A:%s", message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes, message.MessageBody,message.Properties,message.MessageKey,message.Properties.a); return message.ReceiptHandle; }); // message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费。 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 res = await consumer.ackMessage(handles); if (res.code != 204) { // 某些消息的句柄可能超时了会导致确认不成功。 console.log("Ack Message Fail:"); const failHandles = res.body.map((error)=>{ console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage); return error.ReceiptHandle; }); handles.forEach((handle)=>{ if (failHandles.indexOf(handle) < 0) { console.log("\tSucHandle:%s\n", handle); } }); } else { // 消息确认消费成功。 console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(',')); } } } catch(e) { if (e.Code.indexOf("MessageNotExist") > -1) { // 没有消息,则继续长轮询服务器。 console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code); } else { console.log(e); } } } })();
using System; using System.Collections.Generic; using System.Threading; using Aliyun.MQ.Model; using Aliyun.MQ.Model.Exp; using Aliyun.MQ; namespace Aliyun.MQ.Sample { public class ConsumerSample { // 设置HTTP接入域名。 private const string _endpoint = "${HTTP_ENDPOINT}"; // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 private const string _accessKeyId = "${ACCESS_KEY}"; // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 private const string _secretAccessKey = "${SECRET_KEY}"; // 所属的Topic。 private const string _topicName = "${TOPIC}"; // Topic所属实例ID,默认实例为空。 private const string _instanceId = "${INSTANCE_ID}"; // 您在控制台创建的Group ID(Consumer ID)。 private const string _groupId = "${GROUP_ID}"; private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint); static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null); static void Main(string[] args) { // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息。 while (true) { try { // 长轮询消费消息。 // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。 List<Message> messages = null; try { messages = consumer.ConsumeMessage( 3, // 一次最多消费3条(最多可设置为16条)。 3 // 长轮询时间3秒(最多可设置为30秒)。 ); } catch (Exception exp1) { if (exp1 is MessageNotExistException) { Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId); continue; } Console.WriteLine(exp1); Thread.Sleep(2000); } if (messages == null) { continue; } List<string> handlers = new List<string>(); Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:"); // 处理业务逻辑。 foreach (Message message in messages) { Console.WriteLine(message); Console.WriteLine("Property a is:" + message.GetProperty("a")); handlers.Add(message.ReceiptHandle); } // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费。 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 try { consumer.AckMessage(handlers); Console.WriteLine("Ack message success:"); foreach (string handle in handlers) { Console.Write("\t" + handle); } Console.WriteLine(); } catch (Exception exp2) { // 某些消息的句柄可能超时了会导致确认不成功。 if (exp2 is AckMessageException) { AckMessageException ackExp = (AckMessageException)exp2; Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId); foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems) { Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage); } } } } catch (Exception ex) { Console.WriteLine(ex); Thread.Sleep(2000); } } } } }
#include <vector> #include <fstream> #include "mq_http_sdk/mq_client.h" #ifdef _WIN32 #include <windows.h> #else #include <unistd.h> #endif using namespace std; using namespace mq::http::sdk; int main() { MQClient mqClient( // 设置HTTP接入域名。 "${HTTP_ENDPOINT}", // AccessKey ID,阿里云身份验证标识。获取方式,请参见本文前提条件中的获取AccessKey。 "${ACCESS_KEY}", // AccessKey Secret,阿里云身份验证密钥。获取方式,请参见本文前提条件中的获取AccessKey。 "${SECRET_KEY}" ); // 所属的Topic。 string topic = "${TOPIC}"; // 您在控制台创建的Group ID(Consumer ID)。 string groupId = "${GROUP_ID}"; // Topic所属实例ID,默认实例为空。 string instanceId = "${INSTANCE_ID}"; MQConsumerPtr consumer; if (instanceId == "") { consumer = mqClient.getConsumerRef(topic, groupId); } else { consumer = mqClient.getConsumerRef(instanceId, topic, groupId, ""); } do { try { std::vector<Message> messages; // 长轮询消费消息。 // 长轮询表示如果Topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回。 consumer->consumeMessage( 3,// 一次最多消费3条(最多可设置为16条)。 3,//长轮询时间3秒(最多可设置为30秒) 。 messages ); cout << "Consume: " << messages.size() << " Messages!" << endl; // 处理消息。 std::vector<std::string> receiptHandles; for (std::vector<Message>::iterator iter = messages.begin(); iter != messages.end(); ++iter) { cout << "MessageId: " << iter->getMessageId() << " PublishTime: " << iter->getPublishTime() << " Tag: " << iter->getMessageTag() << " Body: " << iter->getMessageBody() << " FirstConsumeTime: " << iter->getFirstConsumeTime() << " NextConsumeTime: " << iter->getNextConsumeTime() << " ConsumedTimes: " << iter->getConsumedTimes() << " Properties: " << iter->getPropertiesAsString() << " Key: " << iter->getMessageKey() << endl; receiptHandles.push_back(iter->getReceiptHandle()); } // 确认消息消费成功。 // Message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费。 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 AckMessageResponse bdmResp; consumer->ackMessage(receiptHandles, bdmResp); if (!bdmResp.isSuccess()) { // 某些消息的句柄可能超时了会导致确认不成功。 const std::vector<AckMessageFailedItem>& failedItems = bdmResp.getAckMessageFailedItem(); for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin(); iter != failedItems.end(); ++iter) { cout << "AckFailedItem: " << iter->errorCode << " " << iter->receiptHandle << endl; } } else { cout << "Ack: " << messages.size() << " messages suc!" << endl; } } catch (MQServerException& me) { if (me.GetErrorCode() == "MessageNotExist") { cout << "No message to consume! RequestId: " + me.GetRequestId() << endl; continue; } cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl; #ifdef _WIN32 Sleep(2000); #else usleep(2000 * 1000); #endif } catch (MQExceptionBase& mb) { cout << "Request Failed: " + mb.ToString() << endl; #ifdef _WIN32 Sleep(2000); #else usleep(2000 * 1000); #endif } } while(true); }