.NET SDK接入示例

本文介绍使用.NET语言的AMQP SDK接入阿里云物联网平台,接收服务端订阅消息的示例。

前提条件

已获取消费组ID,并订阅Topic消息。

开发环境

本示例使用的开发环境要求如下表。

Framework

支持版本

.NET Framework

3.5、4.0、4.5及以上版本

.NET Micro Framework

4.2及以上版本

.NET nanoFramework

1.0及以上版本

.NET Compact Framework

3.9及以上版本

.Net Core on Windows 10 and Ubuntu 14.04

1.0及以上版本

Mono

4.2.1及以上版本

下载SDK

.NET版本AMQP SDK,推荐使用AMQP.Net Lite库。请访问AMQP.Net Lite下载库和查看使用说明。

添加依赖

packages.config中添加以下依赖。

<packages>
  <package id="AMQPNetLite" version="2.2.0" targetFramework="net47" />
</packages>

代码示例

using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;

namespace amqp
{
    class MainClass
    {
        //接入域名,请参见AMQP客户端接入说明文档。
        static string Host = "${YourHost}";
        static int Port = 5671;
        // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
        static string AccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
        static string AccessSecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        static string consumerGroupId = "${YourConsumerGroupId}";
        static string clientId = "${YourClientId}";
        //iotInstanceId:实例ID。
        static string iotInstanceId = "${YourIotInstanceId}"; 
        static int Count = 0;
        static int IntervalTime = 10000;

        static Address address;

        public static void Main(string[] args)
        {
            long timestamp = GetCurrentMilliseconds();
            string param = "authId=" + AccessKey + "&timestamp=" + timestamp;
            //userName组装方法,请参见AMQP客户端接入说明文档。
            string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
               + ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
            string password = doSign(param, AccessSecret, "HmacMD5");

            DoConnectAmqp(userName, password);

            ManualResetEvent resetEvent = new ManualResetEvent(false);
            resetEvent.WaitOne();
        }

        static void DoConnectAmqp(string userName, string password)
        {
            address = new Address(Host, Port, userName, password);
            //创建Connection。
            ConnectionFactory cf = new ConnectionFactory();
            //如果需要,使用本地TLS。
            //cf.SSL.ClientCertificates.Add(GetCert());
            //cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
            cf.SASL.Profile = SaslProfile.External;
            cf.AMQP.IdleTimeout = 120000;
            //cf.AMQP.ContainerId、cf.AMQP.HostName请自定义。
            cf.AMQP.ContainerId = "client.1.2"; 
            cf.AMQP.HostName = "contoso.com";
            cf.AMQP.MaxFrameSize = 8 * 1024;
            var connection = cf.CreateAsync(address).Result;

            //Connection Exception已关闭。
            connection.AddClosedCallback(ConnClosed);

            //接收消息。
            DoReceive(connection);
        }

        static void DoReceive(Connection connection)
        {
            //创建Session。
            var session = new Session(connection);

            //创建ReceiverLink并接收消息。
            var receiver = new ReceiverLink(session, "queueName", null);


            receiver.Start(20, (link, message) =>
            {
                object messageId = message.ApplicationProperties["messageId"];
                object topic = message.ApplicationProperties["topic"];
                string body = Encoding.UTF8.GetString((Byte[])message.Body);
                //注意:此处不要有耗时的逻辑,如果这里要进行业务处理,请另开线程,否则会堵塞消费。如果消费一直延时,会增加消息重发的概率。
                Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);

                //ACK消息。
                link.Accept(message);
            });


        }

        //连接发生异常后,进入重连模式。
        //这里只是一个简单重试的示例,您可以采用指数退避方式,来完善异常场景,重连策略。
        static void ConnClosed(IAmqpObject _, Error e)
        {
            Console.WriteLine("ocurr error: " + e);
            if(Count < 3)
            {
                Count += 1;
                Thread.Sleep(IntervalTime * Count);
            }
            else
            {
                Thread.Sleep(120000);
            }

            //重连。
            DoConnectAmqp(address.User, address.Password);
        }

        static X509Certificate GetCert()
        {
            string certPath = Environment.CurrentDirectory + "/root.crt";
            X509Certificate crt = new X509Certificate(certPath);

            return crt;
        }

        static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            return true;
        }

        static long GetCurrentMilliseconds()
        {
            DateTime dt1970 = new DateTime(1970, 1, 1);
            DateTime current = DateTime.Now;
            return (long)(current - dt1970).TotalMilliseconds;
        }

        //签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        static string doSign(string param, string accessSecret, string signMethod)
        {
            //signMethod = HmacMD5
            byte[] key = Encoding.UTF8.GetBytes(accessSecret);
            byte[] signContent = Encoding.UTF8.GetBytes(param);
            var hmac = new HMACMD5(key);
            byte[] hashBytes = hmac.ComputeHash(signContent);
            return Convert.ToBase64String(hashBytes);
        }
    }
}

您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明

重要

请确保参数值输入正确,否则AMQP客户端接入会失败。

参数

说明

Host

AMQP接入域名。

${YourHost}对应的AMQP接入域名信息,请参见管理实例终端节点

AccessKey

登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

说明

如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见RAM用户访问

AccessSecret

consumerGroupId

当前物联网平台对应实例中的消费组ID。

登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。

iotInstanceId

实例ID。您可在物联网平台控制台实例概览页面,查看当前实例的ID。

  • 若有ID值,必须传入该ID值。

  • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ""

clientId

表示客户端ID,需您自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。

AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表页签,单击消费组对应的查看消费组详情页面将显示该参数,方便您识别区分不同的客户端。

运行结果示例

  • 成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。成功

    参数

    示例

    说明

    topic

    /***********/******/thing/event/property/post

    设备属性上报的Topic。

    messageId

    2**************7

    消息的ID。

    body

    {"deviceType":"CustomCategory","iotId":"4EwuVV***","requestId":"161268***","checkFailedData":{},"productKey":"g4***S","gmtCreate":1612682173249,"deviceName":"Esensor","items":{"temperature":{"value":-1,"time":1612682173247},"humidity":{"value":74,"time":1612682173247}}}

    消息的内容。

  • 失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。失败

相关文档

服务端订阅消息相关错误码,请参见消息相关错误码