文档

Kafka消费概述

更新时间:

阿里云日志服务兼容Kafka消费组协议,您可以使用原生Kafka客户端对日志服务进行读操作。

概念映射

Kafka

日志服务

描述

Topic

Logstore

  • Topic:Kafka用来区分不同类型信息的主题。

  • Logstore:日志服务中日志的采集、存储和查询单元。

Partition

Shard

数据存储分区。

  • Partition:连续的,只增不减。

  • Shard:支持分裂、合并。

Offset

Cursor

  • Offset:Partition中的消息的顺序ID。

  • Cursor:日志服务中日志的相对偏移量,通过Cursor可以获得一组相对位置的日志。

权限配置

当您使用RAM用户进行操作时,需先赋予RAM用户只读访问日志服务(Log)的权限,即AliyunLogReadOnlyAccess权限。具体操作,请参见为RAM用户授权

如果需要更精细的账号权限要求,可采用自定义权限策略,示例如下:

{
    "Version": "1",
    "Statement": [
        {
            "Action": "log:GetProject",
            "Resource": "acs:log:*:*:project/project名称",
            "Effect": "Allow"
        },
        {
            "Action": [
                "log:GetLogStore",
                "log:ListShards",
                "log:GetCursorOrData"
            ],
            "Resource": "acs:log:*:*:project/project名称/logstore/*",
            "Effect": "Allow"
        }
    ]
}

代码示例

您可以通过Java、GO、Python及.net等SDK实现消费组消费日志。

消费字段格式说明

  • 场景1:如果Logstore的日志中只有一个内容字段,例如只有content字段,那么Kafka消费到的Key为content,Value为content字段值。

  • 场景2:如果Logstore的日志中不止一个内容字段,例如存在urlmethod字段,那么Kafka消费到的Key为null,Value为JSON编码后的内容,例如{"url" : "/", "method" : "get"}

  • 场景3:如果Logstore的日志中包含字段kafka_topickafka_partitionkafka_offsetkeyvalue字段, 则消费程序会认为该数据为您导入的Kafka数据。更多信息,请参见导入Kafka数据。为了与导入的数据前保持一致,消费时key字段和value字段将分别映射为Kafka的Key和Kafka的Value。

说明
  • 当日志中可能存在单个字段或者多个字段时,那么消费到的Value可能是单字段值(场景1)或JSON格式的内容(场景2)。

  • 当设置消费程序所消费的Value为JSON格式,且要消费场景1中的日志时,需确保字段值为JSON格式,否则将导致消费异常。

消费组延迟监控

您可以通过日志服务控制台查看日志消费的状态并设置告警。具体信息,请参见消费组监控与告警

费用说明

使用限制

  • 日志服务的Kafka消费协议兼容版本为2.1.0。

  • 每个消费组支持消费最多50个Logstore。

  • 每个Logstore最多支持被15个消费组消费。

    说明

    Kafka消费受到日志服务的读流量Quota限制,Kafka消费组的数量限制和日志服务消费组的数量限制均单独计数,Quota不共享。具体信息,请参见数据读写

  • 为保证日志传输安全性,目前仅支持SASL_SSL连接协议。

  • Offset由日志服务Cursor特殊编码产生,目前不支持用于计算消费延迟。消费延迟计算请参考消费组监控,详细信息,请参见消费组监控与告警

  • 若单个LogGroup中日志数量超过10万条,在Kafka兼容消费时会自动截断至10万条。

  • 使用Kafka兼容消费Logstore后,对Logstore进行删除重建,然后再消费可能会遇到异常。需要您手动调用代码删除关联的消费组。示例代码如下:

    重要

    日志服务在消费协议兼容对Shard和Partition的做了映射关系,不删除映射关系,再重建同名Logstore,再次消费还会获取以前的映射关系,导致消费异常。

    package org.example;
    
    import org.apache.kafka.clients.admin.*;
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args){
    
            Properties props = new Properties();
            String project = "project";
            // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
            // 此处以把AccessKeyID和AccessKeySecret保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
            // 强烈建议不要把AccessKeyID和AccessKeySecret保存到代码里,会存在密钥泄漏风险
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-hangzhou.log.aliyuncs.com";
            String port = "10012";
    
            //内网endpoint和对应port,可以通过阿里云内部网络访问日志服务,相比公网有更好的链路质量和安全性。
            //String endpoint = "cn-hangzhou-intranet.log.aliyuncs.com";
            //String port = "10011";
            String hosts = project + "." + endpoint + ":" + port;
    
            props.put("bootstrap.servers", hosts);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
    
            List<String> deleteGroupId = new ArrayList<>();
            // 即将删除的消费组名称
            deleteGroupId.add("kafka-test-112");
    
            AdminClient client = KafkaAdminClient.create(props);
    
            try {
                DeleteConsumerGroupsResult deleteConsumerGroupsResult =  client.deleteConsumerGroups(deleteGroupId);
                deleteConsumerGroupsResult.all().get(10, TimeUnit.SECONDS);
    
            } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

  • 本页导读 (1)
文档反馈