本文为您提供当前主流编程语言的消息回执示例,您可通过下载相应的SDK安装包拉取队列消息。
所属语言 | SDK下载链接 |
Java | |
Node.js | |
.NET/C# | 说明 依赖.NET语言的阿里云SDK核心库及dybaseapi,其中dybaseapi包用于拉取队列消息。 |
Python | 说明 依赖Python语言的阿里云SDK核心库及dybaseapi,其中dybaseapi包用于拉取队列消息。 |
Go |
消息回执示例
说明
调用接口前需配置环境变量,通过环境变量读取访问凭证。AccessKey ID和AccessKey Secret的环境变量名:ALIBABA_CLOUD_ACCESS_KEY_ID
、ALIBABA_CLOUD_ACCESS_KEY_SECRET
。配置详情请参见在Linux、macOS和Windows系统配置环境变量。
C# Demo
using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Dybaseapi.Model.V20170525;
using Aliyun.Acs.Dybaseapi.MNS;
using Aliyun.Acs.Dybaseapi.MNS.Model;
using System.Threading;
using System.Collections.Generic;
using System.Text;
using System;
using QueryTokenForMnsQueue_MessageTokenDTO = Aliyun.Acs.Dybaseapi.Model.V20170525.QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO;
namespace CommonRpc
{
class Program
{
static void Main(string[] args)
{
// 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
// 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
// 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
IClientProfile profile = DefaultProfile.GetProfile("cn-hangzhou", Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID"), Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // todo: 补充AK信息
profile.AddEndpoint("cn-hangzhou", "cn-hangzhou", "Dybaseapi", "dybaseapi.aliyuncs.com");
DefaultAcsClient client = new DefaultAcsClient(profile);
String queueName = "<QueueName>"; // todo: 补充队列名称
String messageType = "<MessageType>"; // todo: 补充消息类型
int maxThread = 2;
for (int i = 0; i < maxThread; i++)
{
TestTask testTask = new TestTask("PullMessageTask-thread-" + i, messageType, queueName, client);
Thread t = new Thread(new ThreadStart(testTask.Handle));
//启动线程
t.Start();
}
Console.ReadKey();
try
{
QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
{
MessageType = messageType,
QueueName = queueName
};
QueryTokenForMnsQueueResponse response = client.GetAcsResponse(request);
Console.WriteLine(response.MessageTokenDTO.SecurityToken);
}
catch (ServerException ex)
{
Console.WriteLine(ex.ToString());
}
catch (ClientException ex)
{
Console.WriteLine(ex.ToString());
}
}
}
class TestTask
{
object o = new object();
const int sleepTime = 50;
const long bufferTime = 60 * 2; // 过期时间小于2分钟则重新获取,防止服务器时间误差
const String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"; // 阿里通信消息的endpoint,固定
public String name { get; private set; }
public String messageType { get; private set; }
public String QueueName { get; private set; }
public int TaskID { get; private set; }
public IAcsClient AcsClient { get; private set; }
public TestTask(String name, String messageType, String queueName, IAcsClient acsClient)
{
this.name = name;
this.messageType = messageType;
this.QueueName = queueName;
this.AcsClient = acsClient;
}
readonly Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO>();
readonly Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();
public QueryTokenForMnsQueue_MessageTokenDTO GetTokenByMessageType(IAcsClient acsClient, String messageType)
{
QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
{
MessageType = messageType
};
QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse = acsClient.GetAcsResponse(request);
QueryTokenForMnsQueue_MessageTokenDTO token = queryTokenForMnsQueueResponse.MessageTokenDTO;
return token;
}
/// 处理消息
public void Handle()
{
while (true)
{
try
{
QueryTokenForMnsQueue_MessageTokenDTO token = null;
Queue queue = null;
lock (o)
{
if (tokenMap.ContainsKey(messageType))
{
token = tokenMap[messageType];
}
if (queueMap.ContainsKey(QueueName))
{
queue = queueMap[QueueName];
}
TimeSpan ts = new TimeSpan(0);
if (token != null)
{
DateTime b = Convert.ToDateTime(token.ExpireTime);
DateTime c = Convert.ToDateTime(DateTime.Now);
ts = b - c;
}
if (token == null || ts.TotalSeconds < bufferTime || queue == null)
{
token = GetTokenByMessageType(AcsClient, messageType);
IMNS client = new MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
queue = client.GetNativeQueue(QueueName);
if (tokenMap.ContainsKey(messageType))
{
tokenMap.Remove(messageType);
}
if (queueMap.ContainsKey(QueueName))
{
queueMap.Remove(QueueName);
}
tokenMap.Add(messageType, token);
queueMap.Add(QueueName, queue);
}
}
BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
List<Message> messages = batchReceiveMessageResponse.Messages;
for (int i = 0; i <= messages.Count - 1; i++)
{
try
{
byte[] outputb = Convert.FromBase64String(messages[i].Body);
string orgStr = Encoding.UTF8.GetString(outputb);
Console.WriteLine(orgStr);
// TODO 具体消费逻辑,待客户自己实现.
// 消费成功的前提下删除消息
// queue.DeleteMessage(messages[i].ReceiptHandle);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
Thread.Sleep(sleepTime);
}
}
}
}
Python Demo
#!/usr/bin/env python
# coding=utf8
import os
import time
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkdybaseapi.request.v20170525.QueryTokenForMnsQueueRequest import QueryTokenForMnsQueueRequest
from aliyunsdkcore.profile import region_provider
from datetime import datetime
from aliyunsdkdybaseapi.mns.account import Account
from aliyunsdkdybaseapi.mns.queue import *
from aliyunsdkdybaseapi.mns.mns_exception import *
try:
import json
except ImportError:
import simplejson as json
# TODO 需要替换成您需要接收的消息类型
message_type = "<MessageType>"
# TODO 需要替换成您的队列名称。在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName
queue_name = "<QueueName>"
# 云通信固定的endpoint地址
endpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"
# 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
# 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
# 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
acs_client = AcsClient(os.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), os.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "cn-hangzhou")
region_provider.add_endpoint("Dybaseapi", "dybaseapi.aliyuncs.com", "cn-hangzhou")
# 云通信业务token存在失效时间,需动态更新。
class Token():
def __init__(self):
self.token = None
self.tmp_access_id = None
self.tmp_access_key = None
self.expire_time = None
def is_refresh(self):
if self.expire_time is None:
return 1
# 失效时间与当前系统时间比较,提前2分钟刷新token
now = datetime.now()
expire = datetime.strptime(self.expire_time, "%Y-%m-%d %H:%M:%S")
if expire <= now or (expire - now).seconds < 120:
return 1
return 0
def refresh(self):
print("start refresh token...")
request = QueryTokenForMnsQueueRequest()
request.set_MessageType(message_type)
request.set_QueueName(queue_name)
response = acs_client.do_action_with_exception(request)
# print response
if response is None:
raise ServerException("GET_TOKEN_FAIL", "获取token时无响应")
response_body = json.loads(response)
if response_body.get("Code") != "OK":
raise ServerException("GET_TOKEN_FAIL", "获取token失败")
sts_token = response_body.get("MessageTokenDTO")
self.tmp_access_key = sts_token.get("AccessKeySecret")
self.tmp_access_id = sts_token.get("AccessKeyId")
self.expire_time = sts_token.get("ExpireTime")
self.token = sts_token.get("SecurityToken")
print("finish refresh token...")
# 初始化 token, my_account, my_queue
token, my_account, my_queue = Token(), None, None
# 循环读取删除消息直到队列空
# receive message请求使用long polling方式,通过wait_seconds指定长轮询时间为3秒
# long polling 解析:
## 当队列中有消息时,请求立即返回;
## 当队列中没有消息时,请求在MNS服务器端挂3秒钟,在这期间,有消息写入队列,请求会立即返回消息,3秒后,请求返回队列没有消息;
wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
10 * "=", 10 * "=", queue_name, wait_seconds))
while True:
receipt_handles = []
# 读取消息
try:
# token过期是否需要刷新
if token.is_refresh() == 1:
# 刷新token
token.refresh()
if my_account:
my_account.mns_client.close_connection()
my_account = None
if not my_account:
my_account = Account(endpoint, token.tmp_access_id, token.tmp_access_key, token.token)
my_queue = my_account.get_queue(queue_name)
# 接收消息
recv_msgs = my_queue.batch_receive_message(10, wait_seconds)
for recv_msg in recv_msgs:
# TODO 业务处理
# receipt_handles.append(recv_msg.receipt_handle)
print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))
except MNSExceptionBase as e:
if e.type == "QueueNotExist":
print("Queue not exist, please create queue before receive message.")
break
elif e.type == "MessageNotExist":
print("Queue is empty! sleep 10s")
time.sleep(10)
continue
print("Receive Message Fail! Exception:%s\n" % e)
break
# 删除消息
try:
if len(receipt_handles) > 0:
# my_queue.batch_delete_message(receipt_handles)
print("Delete Message Succeed! ReceiptHandles:%s" % receipt_handles)
except MNSExceptionBase as e:
print("Delete Message Fail! Exception:%s\n" % e)
Go Demo
package main
import (
"os"
"encoding/base64"
"fmt"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/endpoints"
"github.com/aliyun/alibaba-cloud-sdk-go/services/dybaseapi"
"github.com/aliyun/alibaba-cloud-sdk-go/services/dybaseapi/mns"
"time"
)
const (
mnsDomain = "1943695596114318.mns.cn-hangzhou.aliyuncs.com"
)
func main() {
endpoints.AddEndpointMapping("cn-hangzhou", "Dybaseapi", "dybaseapi.aliyuncs.com")
// 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
// 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
// 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
// 创建client实例
client, err := dybaseapi.NewClientWithAccessKey(
"cn-hangzhou", // 您的可用区ID
os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), // 您环境变量中配置的AccessKey ID
os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) // 您环境变量中配置的AccessKey Secret
if err != nil {
// 异常处理
panic(err)
}
queueName := "<QueueName>"
messageType := "<MessageType>"
cstLoc, _ := time.LoadLocation("Asia/Shanghai")
var token *dybaseapi.MessageTokenDTO
var expireTime time.Time
for {
if token == nil || expireTime.Unix()-time.Now().Unix() < 2*60 {
// 创建 API 请求并设置参数
request := dybaseapi.CreateQueryTokenForMnsQueueRequest()
request.MessageType = messageType
request.QueueName = queueName
// 发起请求并处理异常
response, err := client.QueryTokenForMnsQueue(request)
if err != nil {
panic(err)
}
token = &response.MessageTokenDTO
}
expireTime, err = time.ParseInLocation("2006-01-02 15:04:05", token.ExpireTime, cstLoc)
if err != nil {
panic(err)
}
mnsClient, err := mns.NewClientWithStsToken(
"cn-hangzhou",
token.AccessKeyId,
token.AccessKeySecret,
token.SecurityToken,
)
if err != nil {
panic(err)
}
mnsRequest := mns.CreateBatchReceiveMessageRequest()
mnsRequest.Domain = mnsDomain
mnsRequest.QueueName = queueName
mnsRequest.NumOfMessages = "10"
mnsRequest.WaitSeconds = "5"
mnsResponse, err := mnsClient.BatchReceiveMessage(mnsRequest)
if err != nil {
panic(err)
}
// fmt.Println(mnsResponse)
receiptHandles := make([]string, len(mnsResponse.Message))
for i, message := range mnsResponse.Message {
messageBody, decodeErr := base64.StdEncoding.DecodeString(message.MessageBody)
if decodeErr != nil {
panic(decodeErr)
}
fmt.Println(string(messageBody))
receiptHandles[i] = message.ReceiptHandle
}
if len(receiptHandles) > 0 {
mnsDeleteRequest := mns.CreateBatchDeleteMessageRequest()
mnsDeleteRequest.Domain = mnsDomain
mnsDeleteRequest.QueueName = queueName
mnsDeleteRequest.SetReceiptHandles(receiptHandles)
//_, err = mnsClient.BatchDeleteMessage(mnsDeleteRequest) //取消注释将删除队列中的消息
if err != nil {
panic(err)
}
}
}
}
文档内容是否对您有帮助?