本文介绍使用.NET语言的AMQP SDK接入阿里云物联网平台,接收服务端订阅消息的示例。
前提条件
已获取消费组ID,并订阅Topic消息。
管理消费组:您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。
配置AMQP服务端订阅:您可通过消费组订阅需要的Topic消息。
开发环境
本示例使用的开发环境要求如下表。
Framework | 支持版本 |
.NET Framework | 3.5、4.0、4.5及以上版本 |
.NET Micro Framework | 4.2及以上版本 |
.NET nanoFramework | 1.0及以上版本 |
.NET Compact Framework | 3.9及以上版本 |
.Net Core on Windows 10 and Ubuntu 14.04 | 1.0及以上版本 |
Mono | 4.2.1及以上版本 |
下载SDK
.NET版本AMQP SDK,推荐使用AMQP.Net Lite库。请访问AMQP.Net Lite下载库和查看使用说明。
添加依赖
在packages.config中添加以下依赖。
<packages>
<package id="AMQPNetLite" version="2.2.0" targetFramework="net47" />
</packages>
代码示例
using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;
namespace amqp
{
class MainClass
{
//接入域名,请参见AMQP客户端接入说明文档。
static string Host = "${YourHost}";
static int Port = 5671;
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
static string AccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
static string AccessSecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
static string consumerGroupId = "${YourConsumerGroupId}";
static string clientId = "${YourClientId}";
//iotInstanceId:实例ID。
static string iotInstanceId = "${YourIotInstanceId}";
static int Count = 0;
static int IntervalTime = 10000;
static Address address;
public static void Main(string[] args)
{
long timestamp = GetCurrentMilliseconds();
string param = "authId=" + AccessKey + "×tamp=" + timestamp;
//userName组装方法,请参见AMQP客户端接入说明文档。
string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
+ ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
string password = doSign(param, AccessSecret, "HmacMD5");
DoConnectAmqp(userName, password);
ManualResetEvent resetEvent = new ManualResetEvent(false);
resetEvent.WaitOne();
}
static void DoConnectAmqp(string userName, string password)
{
address = new Address(Host, Port, userName, password);
//创建Connection。
ConnectionFactory cf = new ConnectionFactory();
//如果需要,使用本地TLS。
//cf.SSL.ClientCertificates.Add(GetCert());
//cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
cf.SASL.Profile = SaslProfile.External;
cf.AMQP.IdleTimeout = 120000;
//cf.AMQP.ContainerId、cf.AMQP.HostName请自定义。
cf.AMQP.ContainerId = "client.1.2";
cf.AMQP.HostName = "contoso.com";
cf.AMQP.MaxFrameSize = 8 * 1024;
var connection = cf.CreateAsync(address).Result;
//Connection Exception已关闭。
connection.AddClosedCallback(ConnClosed);
//接收消息。
DoReceive(connection);
}
static void DoReceive(Connection connection)
{
//创建Session。
var session = new Session(connection);
//创建ReceiverLink并接收消息。
var receiver = new ReceiverLink(session, "queueName", null);
receiver.Start(20, (link, message) =>
{
object messageId = message.ApplicationProperties["messageId"];
object topic = message.ApplicationProperties["topic"];
string body = Encoding.UTF8.GetString((Byte[])message.Body);
//注意:此处不要有耗时的逻辑,如果这里要进行业务处理,请另开线程,否则会堵塞消费。如果消费一直延时,会增加消息重发的概率。
Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);
//ACK消息。
link.Accept(message);
});
}
//连接发生异常后,进入重连模式。
//这里只是一个简单重试的示例,您可以采用指数退避方式,来完善异常场景,重连策略。
static void ConnClosed(IAmqpObject _, Error e)
{
Console.WriteLine("ocurr error: " + e);
if(Count < 3)
{
Count += 1;
Thread.Sleep(IntervalTime * Count);
}
else
{
Thread.Sleep(120000);
}
//重连。
DoConnectAmqp(address.User, address.Password);
}
static X509Certificate GetCert()
{
string certPath = Environment.CurrentDirectory + "/root.crt";
X509Certificate crt = new X509Certificate(certPath);
return crt;
}
static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
static long GetCurrentMilliseconds()
{
DateTime dt1970 = new DateTime(1970, 1, 1);
DateTime current = DateTime.Now;
return (long)(current - dt1970).TotalMilliseconds;
}
//签名方法:支持hmacmd5,hmacsha1和hmacsha256。
static string doSign(string param, string accessSecret, string signMethod)
{
//signMethod = HmacMD5
byte[] key = Encoding.UTF8.GetBytes(accessSecret);
byte[] signContent = Encoding.UTF8.GetBytes(param);
var hmac = new HMACMD5(key);
byte[] hashBytes = hmac.ComputeHash(signContent);
return Convert.ToBase64String(hashBytes);
}
}
}
您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明。
请确保参数值输入正确,否则AMQP客户端接入会失败。
参数 | 说明 |
Host | AMQP接入域名。
|
AccessKey | 登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。 说明 如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见RAM用户访问。 |
AccessSecret | |
consumerGroupId | 当前物联网平台对应实例中的消费组ID。 登录物联网平台控制台,在对应实例的 查看您的消费组ID。 |
iotInstanceId | 实例ID。您可在物联网平台控制台的实例概览页面,查看当前实例的ID。
|
clientId | 表示客户端ID,需您自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。 AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的 页签,单击消费组对应的查看,消费组详情页面将显示该参数,方便您识别区分不同的客户端。 |
运行结果示例
成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。
参数
示例
说明
topic
/***********/******/thing/event/property/post
设备属性上报的Topic。
messageId
2**************7
消息的ID。
body
{"deviceType":"CustomCategory","iotId":"4EwuVV***","requestId":"161268***","checkFailedData":{},"productKey":"g4***S","gmtCreate":1612682173249,"deviceName":"Esensor","items":{"temperature":{"value":-1,"time":1612682173247},"humidity":{"value":74,"time":1612682173247}}}
消息的内容。
失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。
相关文档
服务端订阅消息相关错误码,请参见消息相关错误码。