本文介绍使用.NET语言的AMQP SDK接入阿里云物联网平台,接收服务端订阅消息的示例。

开发环境

本示例使用的开发环境要求如下表。

Framework 支持版本
.Net Framework 3.5、4.0、4.5及以上版本
.NET Micro Framework 4.2及以上版本
.NET nanoFramework 1.0及以上版本
.NET Compact Framework 3.9及以上版本
.Net Core on Windows 10 and Ubuntu 14.04 1.0及以上版本
Mono 4.2.1及以上版本

下载SDK

.NET版本AMQP SDK,推荐使用AMQP.Net Lite库。请访问AMQP.Net Lite下载库和查看使用说明。

添加依赖

packages.config中添加以下依赖。

<packages>
  <package id="AMQPNetLite" version="2.2.0" targetFramework="net47" />
</packages>

代码示例

以下Demo中涉及的参数说明,请参见AMQP客户端接入说明

using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;

namespace amqp
{
    class MainClass
    {
        //接入域名,请参见AMQP客户端接入说明文档。
        static string Host = "${YourHost}";
        static int Port = 5671;
        static string AccessKey = "${YourAccessKey}";
        static string AccessSecret = "${YourAccessSecret}";
        static string consumerGroupId = "${YourConsumerGroupId}";
        static string clientId = "${YourClientId}";
        //iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
        static string iotInstanceId = "${iotInstanceId}"; 
        static int Count = 0;
        static int IntervalTime = 10000;

        static Address address;

        public static void Main(string[] args)
        {
            long timestamp = GetCurrentMilliseconds();
            string param = "authId=" + AccessKey + "&timestamp=" + timestamp;
            //userName组装方法,请参见AMQP客户端接入说明文档。
            string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
               + ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
            string password = doSign(param, AccessSecret, "HmacMD5");

            DoConnectAmqp(userName, password);

            ManualResetEvent resetEvent = new ManualResetEvent(false);
            resetEvent.WaitOne();
        }

        static void DoConnectAmqp(string userName, string password)
        {
            address = new Address(Host, Port, userName, password);
            //Create Connection
            ConnectionFactory cf = new ConnectionFactory();
            //use local tls if neccessary
            //cf.SSL.ClientCertificates.Add(GetCert());
            //cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
            cf.SASL.Profile = SaslProfile.External;
            cf.AMQP.IdleTimeout = 120000;
            //cf.AMQP.ContainerId、cf.AMQP.HostName请自定义。
            cf.AMQP.ContainerId = "client.1.2"; 
            cf.AMQP.HostName = "contoso.com";
            cf.AMQP.MaxFrameSize = 8 * 1024;
            var connection = cf.CreateAsync(address).Result;

            //Connection Exception Closed
            connection.AddClosedCallback(ConnClosed);

            //Receive Message
            DoReceive(connection);
        }

        static void DoReceive(Connection connection)
        {
            //Create Session
            var session = new Session(connection);

            //Create Link and Receive Message
            var receiver = new ReceiverLink(session, "queueName", null);


            receiver.Start(20, (link, message) =>
            {
                object messageId = message.ApplicationProperties["messageId"];
                object topic = message.ApplicationProperties["topic"];
                string body = Encoding.UTF8.GetString((Byte[])message.Body);
                //注意:此处不要有耗时的逻辑,如果这里要进行业务处理,请另开线程,否则会堵塞消费。如果消费一直延时,会增加消息重发的概率。
                Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);

                //Acknowledge Message
                link.Accept(message);
            });


        }

        //连接发生异常后,进入重连模式。
        //这里只是一个简单重试的示例,您可以采用指数退避方式,来完善异常场景,重连策略。
        static void ConnClosed(IAmqpObject _, Error e)
        {
            Console.WriteLine("ocurr error: " + e);
            if(Count < 3)
            {
                Count += 1;
                Thread.Sleep(IntervalTime * Count);
            }
            else
            {
                Thread.Sleep(120000);
            }

            //Reconnection
            DoConnectAmqp(address.User, address.Password);
        }

        static X509Certificate GetCert()
        {
            string certPath = Environment.CurrentDirectory + "/root.crt";
            X509Certificate crt = new X509Certificate(certPath);

            return crt;
        }

        static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            return true;
        }

        static long GetCurrentMilliseconds()
        {
            DateTime dt1970 = new DateTime(1970, 1, 1);
            DateTime current = DateTime.Now;
            return (long)(current - dt1970).TotalMilliseconds;
        }

        //签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        static string doSign(string param, string accessSecret, string signMethod)
        {
            //signMethod = HmacMD5
            byte[] key = Encoding.UTF8.GetBytes(accessSecret);
            byte[] signContent = Encoding.UTF8.GetBytes(param);
            var hmac = new HMACMD5(key);
            byte[] hashBytes = hmac.ComputeHash(signContent);
            return Convert.ToBase64String(hashBytes);
        }
    }
}