本文介绍如何在公网环境下使用C# SDK接入消息队列Kafka版的SSL接入点并使用PLAIN机制收发消息。
前提条件
您已安装.NET。更多信息,请参见安装.NET。
安装C#依赖库
执行以下命令安装C#依赖库。
dotnet add package -v 1.5.2 Confluent.Kafka
准备配置
发送消息
- 创建发送消息程序producer.cs。
using System; using Confluent.Kafka; class Producer { public static void Main(string[] args) { var conf = new ProducerConfig { BootstrapServers = "XXX,XXX,XXX", SslCaLocation = "XXX/ca-cert.pem", SaslMechanism = SaslMechanism.Plain, SecurityProtocol = SecurityProtocol.SaslSsl, SaslUsername = "XXX", SaslPassword = "XXX", }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); string topic ="XXX"; using (var p = new ProducerBuilder<Null, string>(conf).Build()) { for (int i=0; i<100; ++i) { p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler); } p.Flush(TimeSpan.FromSeconds(10)); } } }
参数 描述 BootstrapServers SSL接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。 SslCaLocation 下载的SSL根证书的路径。 SaslUsername 用户名。 - 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的用户名。
- 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。更多信息,请参见SASL用户授权。
SaslPassword 密码。 - 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的密码。
- 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。更多信息,请参见SASL用户授权。
topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。 - 执行以下命令发送消息。
dotnet run producer.cs
订阅消息
- 创建订阅消息程序consumer.cs。
using System; using System.Threading; using Confluent.Kafka; class Consumer { public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "XXX", BootstrapServers = "XXX,XXX,XXX", SslCaLocation = "XXX/ca-cert.pem", SaslMechanism = SaslMechanism.Plain, SecurityProtocol = SecurityProtocol.SaslSsl, SaslUsername = "XXX", SaslPassword = "XXX", AutoOffsetReset = AutoOffsetReset.Earliest }; string topic = "XXX"; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } } }
参数 描述 GroupId Consumer Group名称。您可在消息队列Kafka版控制台的Consumer Group管理页面获取。 BootstrapServers SSL接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。 SslCaLocation 下载的SSL根证书的路径。 SaslUsername 用户名。 - 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的用户名。
- 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。更多信息,请参见SASL用户授权。
SaslPassword 密码。 - 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的密码。
- 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。更多信息,请参见SASL用户授权。
topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。 - 执行以下命令消费消息。
dotnet run consumer.cs
在文档使用中是否遇到以下问题
更多建议
匿名提交