文档

使用Confluent-Kafka-C#实现Kafka消费

更新时间:

本文介绍如何使用Confluent-Kafka-C#实现Kafka消费。

环境准备

说明

此处以Centos 7为例进行安装。

  1. 安装dotnet core SDK。

    yum install dotnet-sdk-3.1
  2. 新建dotnet项目。

    dotnet new console -n myconsumer -o /home/user/projects/myconsumer
  3. 安装confluent kafka依赖。

    dotnet add package -v 1.9.4-RC1 Confluent.Kafka

参数说明

参数

描述

示例

BootstrapServers

初始连接的集群地址,格式为${project}.${endpoint}:${port},请根据实际的Project名称及其所在的Endpoint进行配置。更多信息,请参见服务入口

  • 阿里云VPC内网:端口号为10011。

  • 公网:端口号为10012。

etl-dev.cn-hangzhou.log.aliyuncs.com:10012

其中,etl-dev为Project名称。

SaslMechanism

必须使用SaslMechanism.Plain。

SaslMechanism.Plain

SecurityProtocol

为了保证数据传输的安全性,请使用SecurityProtocol.SaslSsl。

SecurityProtocol.SaslSsl

SaslUsername

日志服务Project名称。

project

SaslPassword

阿里云账号AccessKey。格式为{access-key-id}#{access-key-secret}。建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见授权。如何获取AccessKey的具体操作,请参见访问密钥

GroupId

消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。

kafka-test

AutoOffsetReset

消费起始点位,常用的值为AutoOffsetReset.Latest和AutoOffsetReset.Earliest。

  • AutoOffsetReset.Earliest:表示使用最早的偏移量,从最早的消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,从头开始消费。

  • AutoOffsetReset.Latest:表示使用最新的偏移量,即从最新消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,消费新产生的数据。

AutoOffsetReset.Latest

代码示例

using System;
using System.Threading;
using Confluent.Kafka;

class Consumer
{
    public static void Main(string[] args)
    {   
      	// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
        // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
        string accessKeyId = Environment.GetEnvironmentVariable("SLS_ACCESS_KEY_ID");
        string accessKeySecret = Environment.GetEnvironmentVariable("SLS_ACCESS_KEY_SECRET");
        string project = "project";
        
        string endpoint = "cn-shenzhen.log.aliyuncs.com";
        string port = "10012";
        string host = project + "." + endpoint + ":" + port;
        string password = accessKeyId + "#" +accessKeySecret;
        string groupId = "test002";
        string topic = "your logstore";
      
      	// 配置Kafka消费者
        var conf = new ConsumerConfig {
            GroupId = groupId,
            BootstrapServers = host,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            SaslMechanism = SaslMechanism.Plain,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslUsername = project,
            SaslPassword = password
        };

       	// 创建Kafka消费者实例
        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {    
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                c.Close();
            }
        }
    }
}

  • 本页导读 (1)
文档反馈