阿里云日志服务兼容Kafka消费组协议,您可以使用原生Kafka客户端对日志服务进行读操作。
概念映射
Kafka | 日志服务 | 描述 |
Topic | Logstore |
|
Partition | Shard | 数据存储分区。
|
Offset | Cursor |
|
权限配置
当您使用RAM用户进行操作时,需先赋予RAM用户只读访问日志服务(Log)的权限,即AliyunLogReadOnlyAccess权限。具体操作,请参见为RAM用户授权。
如果需要更精细的账号权限要求,可采用自定义权限策略,示例如下:
{
"Version": "1",
"Statement": [
{
"Action": "log:GetProject",
"Resource": "acs:log:*:*:project/project名称",
"Effect": "Allow"
},
{
"Action": [
"log:GetLogStore",
"log:ListShards",
"log:GetCursorOrData"
],
"Resource": "acs:log:*:*:project/project名称/logstore/*",
"Effect": "Allow"
}
]
}
代码示例
您可以通过Java、GO、Python及.net等SDK实现消费组消费日志。
消费字段格式说明
场景1:如果Logstore的日志中只有一个内容字段,例如只有
content
字段,那么Kafka消费到的Key为content
,Value为content
字段值。场景2:如果Logstore的日志中不止一个内容字段,例如存在
url
和method
字段,那么Kafka消费到的Key为null,Value为JSON编码后的内容,例如{"url" : "/", "method" : "get"}
。场景3:如果Logstore的日志中包含字段
kafka_topic
、kafka_partition
、kafka_offset
、key
和value
字段, 则消费程序会认为该数据为您导入的Kafka数据。更多信息,请参见导入Kafka数据。为了与导入的数据前保持一致,消费时key
字段和value
字段将分别映射为Kafka的Key和Kafka的Value。
当日志中可能存在单个字段或者多个字段时,那么消费到的Value可能是单字段值(场景1)或JSON格式的内容(场景2)。
当设置消费程序所消费的Value为JSON格式,且要消费场景1中的日志时,需确保字段值为JSON格式,否则将导致消费异常。
消费组延迟监控
您可以通过日志服务控制台查看日志消费的状态并设置告警。具体信息,请参见消费组监控与告警。
费用说明
当涉及的Logstore的计费模式为按使用功能计费模式,Kafka消费涉及多个计费项,包括读写流量、请求费用等。具体内容,请参见按使用功能计费模式计费项。
当涉及的Logstore的计费模式为按写入数据量计费模式,消费将不产生费用。具体内容,请参见按写入数据量计费模式计费项。
使用限制
日志服务的Kafka消费协议兼容版本为2.1.0。
每个消费组支持消费最多50个Logstore。
每个Logstore最多支持被15个消费组消费。
说明Kafka消费受到日志服务的读流量Quota限制,Kafka消费组的数量限制和日志服务消费组的数量限制均单独计数,Quota不共享。具体信息,请参见数据读写。
为保证日志传输安全性,目前仅支持SASL_SSL连接协议。
Offset由日志服务Cursor特殊编码产生,目前不支持用于计算消费延迟。消费延迟计算请参考消费组监控,详细信息,请参见消费组监控与告警。
若单个LogGroup中日志数量超过10万条,在Kafka兼容消费时会自动截断至10万条。
使用Kafka兼容消费Logstore后,对Logstore进行删除重建,然后再消费可能会遇到异常。需要您手动调用代码删除关联的消费组。示例代码如下:
重要日志服务在消费协议兼容对Shard和Partition的做了映射关系,不删除映射关系,再重建同名Logstore,再次消费还会获取以前的映射关系,导致消费异常。
package org.example; import org.apache.kafka.clients.admin.*; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args){ Properties props = new Properties(); String project = "project"; // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。 // 此处以把AccessKeyID和AccessKeySecret保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。 // 强烈建议不要把AccessKeyID和AccessKeySecret保存到代码里,会存在密钥泄漏风险 String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-hangzhou.log.aliyuncs.com"; String port = "10012"; //内网endpoint和对应port,可以通过阿里云内部网络访问日志服务,相比公网有更好的链路质量和安全性。 //String endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"; //String port = "10011"; String hosts = project + "." + endpoint + ":" + port; props.put("bootstrap.servers", hosts); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); List<String> deleteGroupId = new ArrayList<>(); // 即将删除的消费组名称 deleteGroupId.add("kafka-test-112"); AdminClient client = KafkaAdminClient.create(props); try { DeleteConsumerGroupsResult deleteConsumerGroupsResult = client.deleteConsumerGroups(deleteGroupId); deleteConsumerGroupsResult.all().get(10, TimeUnit.SECONDS); } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { e.printStackTrace(); } } }
- 本页导读 (1)