本文介绍如何在VPC环境下使用C# SDK接入消息队列Kafka版的默认接入点并收发消息。

前提条件

您已安装.NET。更多信息,请参见安装.NET

安装C#依赖库

执行以下命令安装C#依赖库。
dotnet add package -v 1.5.2 Confluent.Kafka

发送消息

  1. 创建发送消息程序producer.cs
    using System;
    using Confluent.Kafka;
    
    class Producer
    {
        public static void Main(string[] args)
        {
            var conf = new ProducerConfig {
                BootstrapServers = "XXX,XXX,XXX",
                };
    
            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");
    
            string topic ="XXX";
    
            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                for (int i=0; i<100; ++i)
                {
                    p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
                }
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
    参数 描述
    BootstrapServers 默认接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。
    topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。
  2. 执行以下命令发送消息。
    dotnet run producer.cs

订阅消息

  1. 创建订阅消息程序consumer.cs
    using System;
    using System.Threading;
    using Confluent.Kafka;
    
    class Consumer
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig {
                GroupId = "XXX",
                BootstrapServers = "XXX,XXX,XXX",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
    
            string topic = "XXX";
    
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe(topic);
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true;
                    cts.Cancel();
                };
    
                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
        }
    }
    参数 描述
    GroupId Consumer Group名称。您可在消息队列Kafka版控制台的Consumer Group管理页面获取。
    BootstrapServers 默认接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。
    topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。
  2. 执行以下命令消费消息。
    dotnet run consumer.cs