.NET/C# Demo

依赖.NET语言的阿里云SDK核心库及dybaseapi,其中dybaseapi包用于拉取轻量消息队列(原MNS)消息。

Demo如下:

说明

调用接口前需配置环境变量,通过环境变量读取访问凭证。

AccessKey ID和AccessKey Secret的环境变量名:SECRET_AK_ENV 、SECRET_SK_ENV。配置详情请参见在Linux、macOS和Windows系统配置环境变量

阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维using System;
using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Dybaseapi.Model.V20170525;
using Aliyun.Acs.Dybaseapi.MNS;
using Aliyun.Acs.Dybaseapi.MNS.Model;
using System.Threading;
using System.Collections.Generic;
using System.Text;

using System;

using QueryTokenForMnsQueue_MessageTokenDTO = Aliyun.Acs.Dybaseapi.Model.V20170525.QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO;

namespace CommonRpc
{
    class Program
    {
        static void Main(string[] args)
        {
                
            // 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
            // 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
            // 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
            IClientProfile profile = DefaultProfile.GetProfile("cn-hangzhou", Environment.GetEnvironmentVariable("SECRET_AK_ENV"), Environment.GetEnvironmentVariable("SECRET_SK_ENV")); // todo: 补充AK信息

            DefaultProfile.AddEndpoint("cn-hangzhou", "cn-hangzhou", "Dybaseapi", "dybaseapi.aliyuncs.com");

            DefaultAcsClient client = new DefaultAcsClient(profile);

            String queueName = "<QueueName>"; // todo: 补充队列名称
            String messageType = "<MessageType>"; // todo: 补充消息类型

            int maxThread = 2;

            for (int i = 0; i < maxThread; i++)
            {
                TestTask testTask = new TestTask("PullMessageTask-thread-" + i, messageType, queueName, client);
                Thread t = new Thread(new ThreadStart(testTask.Handle));
                //启动线程
                t.Start();
            }
            Console.ReadKey();

            try
            {
                QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
                {
                    MessageType = messageType,
                    QueueName = queueName
                };

                QueryTokenForMnsQueueResponse response = client.GetAcsResponse(request);
                Console.WriteLine(response.MessageTokenDTO.SecurityToken);
            }
            catch (ServerException ex)
            {
                Console.WriteLine(ex.ToString());
            }
            catch (ClientException ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }
    }

    class TestTask
    {
        object o = new object();
        const int sleepTime = 50;
        const long bufferTime = 60 * 2; // 过期时间小于2分钟则重新获取,防止服务器时间误差
        const String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"; // 阿里通信消息的endpoint,固定

        public String name { get; private set; }
        public String messageType { get; private set; }
        public String QueueName { get; private set; }
        public int TaskID { get; private set; }
        public IAcsClient AcsClient { get; private set; }

        public TestTask(String name, String messageType, String queueName, IAcsClient acsClient)
        {
            this.name = name;
            this.messageType = messageType;
            this.QueueName = queueName;
            this.AcsClient = acsClient;
        }

        readonly Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO>();
        readonly Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();

        public QueryTokenForMnsQueue_MessageTokenDTO GetTokenByMessageType(IAcsClient acsClient, String messageType)
        {
            QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
            {
                MessageType = messageType
            };
            QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse = acsClient.GetAcsResponse(request);
            QueryTokenForMnsQueue_MessageTokenDTO token = queryTokenForMnsQueueResponse.MessageTokenDTO;
            return token;
        }

        /// 处理消息
        public void Handle()
        {
            while (true)
            {
                try
                {
                    QueryTokenForMnsQueue_MessageTokenDTO token = null;
                    Queue queue = null;
                    lock (o)
                    {
                        if (tokenMap.ContainsKey(messageType))
                        {
                            token = tokenMap[messageType];
                        }

                        if (queueMap.ContainsKey(QueueName))
                        {
                            queue = queueMap[QueueName];
                        }

                        TimeSpan ts = new TimeSpan(0);

                        if (token != null)
                        {
                            DateTime b = Convert.ToDateTime(token.ExpireTime);
                            DateTime c = Convert.ToDateTime(DateTime.Now);
                            ts = b - c;
                        }

                        if (token == null || ts.TotalSeconds < bufferTime || queue == null)
                        {
                            token = GetTokenByMessageType(AcsClient, messageType);
                            IMNS client = new MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
                            queue = client.GetNativeQueue(QueueName);
                            if (tokenMap.ContainsKey(messageType))
                            {
                                tokenMap.Remove(messageType);
                            }
                            if (queueMap.ContainsKey(QueueName))
                            {
                                queueMap.Remove(QueueName);
                            }
                            tokenMap.Add(messageType, token);
                            queueMap.Add(QueueName, queue);
                        }
                    }

                    BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
                    List<Message> messages = batchReceiveMessageResponse.Messages;

                    for (int i = 0; i <= messages.Count - 1; i++)
                    {
                        try
                        {
                            byte[] outputb = Convert.FromBase64String(messages[i].Body);
                            string orgStr = Encoding.UTF8.GetString(outputb);
                            Console.WriteLine(orgStr);
                            // TODO 具体消费逻辑,待客户自己实现.
                            // 消费成功的前提下删除消息
                            // queue.DeleteMessage(messages[i].ReceiptHandle);
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e.ToString());
                        }
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.ToString());
                }
                Thread.Sleep(sleepTime);
            }
        }
    }
}