本文介绍如何使用Confluent-Kafka-C#实现Kafka消费。
环境准备
此处以Centos 7为例进行安装。
安装dotnet core SDK。
yum install dotnet-sdk-3.1
新建dotnet项目。
dotnet new console -n myconsumer -o /home/user/projects/myconsumer
安装confluent kafka依赖。
dotnet add package -v 1.9.4-RC1 Confluent.Kafka
参数说明
参数 | 描述 | 示例 |
BootstrapServers | 初始连接的集群地址,格式为
| etl-dev.cn-hangzhou.log.aliyuncs.com:10012 其中,etl-dev为Project名称。 |
SaslMechanism | 必须使用SaslMechanism.Plain。 | SaslMechanism.Plain |
SecurityProtocol | 为了保证数据传输的安全性,请使用SecurityProtocol.SaslSsl。 | SecurityProtocol.SaslSsl |
SaslUsername | 日志服务Project名称。 | project |
SaslPassword | 阿里云账号AccessKey。格式为 | 无 |
GroupId | 消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。 | kafka-test |
AutoOffsetReset | 消费起始点位,常用的值为AutoOffsetReset.Latest和AutoOffsetReset.Earliest。
| AutoOffsetReset.Latest |
代码示例
using System;
using System.Threading;
using Confluent.Kafka;
class Consumer
{
public static void Main(string[] args)
{
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
string accessKeyId = Environment.GetEnvironmentVariable("SLS_ACCESS_KEY_ID");
string accessKeySecret = Environment.GetEnvironmentVariable("SLS_ACCESS_KEY_SECRET");
string project = "project";
string endpoint = "cn-shenzhen.log.aliyuncs.com";
string port = "10012";
string host = project + "." + endpoint + ":" + port;
string password = accessKeyId + "#" +accessKeySecret;
string groupId = "test002";
string topic = "your logstore";
// 配置Kafka消费者
var conf = new ConsumerConfig {
GroupId = groupId,
BootstrapServers = host,
AutoOffsetReset = AutoOffsetReset.Earliest,
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = project,
SaslPassword = password
};
// 创建Kafka消费者实例
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
}
- 本页导读 (1)