您可以使用Kafka Producer SDK、Beats系列软件、Collectd、Fluentd、Logstash、Telegraf、Vector等采集工具采集日志,并通过Kafka协议上传到日志服务。本文介绍通过采集工具采集到日志后,利用Kafka协议上传日志到日志服务的操作步骤。
相关限制
支持的Kafka协议版本最低为2.1.0。
为保证日志传输安全性,必须使用SASL_SSL连接协议。
权限说明
以下两个权限规则满足其中之一即可。
自定义权限策略
创建一个自定义权限策略,其中在脚本编辑页签,请使用以下脚本替换配置框中的原有内容。具体操作,请参见创建自定义权限策略。
说明脚本中的
Project名称
请根据实际情况替换。{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project名称", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project名称/logstore/*", "Effect": "Allow" } ] }
为RAM用户添加创建的自定义权限策略。具体操作,请参见为RAM用户授权。
配置方式
使用Kafka协议上传日志时,您需要配置以下参数。
配置名 | 配置值 | 说明 | 示例 |
SLS_KAFKA_ENDPOINT | 初始连接的集群地址,格式为 |
| aliyun-project-test为Project名称,
|
SLS_PROJECT | Project名称 | 日志服务对应的Project名称。 | aliyun-project-test |
SLS_LOGSTORE | Logstore名称 | 日志服务对应的Logstore名称。Logstore名称后缀加上 | 例如Logstore名称是
|
SLS_PASSWORD | 具备sls写入权限的AccessKeySecret。 | AK的概念和创建步骤,请参见创建AccessKey。 值为AccessKey ID和AliyunKey Secret用
| LTaI5xxxxxxxxxxxxindexp2#CZO8XXXXXXXXXXpKSG 说明
|
如果您要通过Kafka消费组实时消费日志服务的数据,请提交工单咨询阿里云技术支持工程师。
示例一:通过Beats系列软件上传日志
Beats系列软件(MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat等)采集到日志后,支持通过Kafka协议将日志上传到日志服务。更多信息,请参见Beats-Kafka-Output。
配置示例
示例中用到的
SLS_
开头的参数配置请参见配置方式。output.kafka: # initial brokers for reading cluster metadata hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # message topic selection + partitioning topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
示例二:通过Collectd上传日志
Collectd是一个守护(daemon)进程,用于定期采集系统和应用程序的性能指标,并支持通过Kafka协议上传到日志服务。更多信息,请参见Write Kafka Plugin。
将Collectd采集到日志上传到日志服务时,还需安装Kafka插件以及相关依赖。例如:在linux Centos中,可以使用yum安装Kafka插件,命令为sudo yum install collectd-write_kafka
,安装RPM请参见Collectd-write_kafka。
配置示例
示例中将日志输出格式(Format)设置为JSON,除此之外,还支持Command、Graphite类型。更多信息,请参见Collectd配置文档。
示例中用到的
SLS_
开头的参数配置请参见配置方式。
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
示例三:使用Telegraf上传日志
Telegraf是由Go语言编写的代理程序,内存占用小,用于收集、处理、汇总数据指标。Telegraf具有丰富的插件及集成功能,可从其运行的系统中获取各种指标、从第三方API中获取指标以及通过statsd和Kafka消费者服务监听指标。
将Telegraf采集到的日志通过Kafka协议上传到日志服务前,您需要先修改配置文件。
配置示例
示例中将日志输出格式(Format)设置为JSON,除此之外还支持Graphite、Carbon2等类型。更多信息,请参见Telegraf输出格式。
说明Telegraf必须配置一个合法的tls_ca路径,使用服务器自带的根证书的路径即可。Linux环境中,根证书CA路径一般为/etc/ssl/certs/ca-bundle.crt。
示例中用到的
SLS_
开头的参数配置请参见配置方式。
# Kafka output plugin configuration [[outputs.kafka]] ## URLs of kafka brokers brokers = ["SLS_KAFKA_ENDPOINT"] ## Kafka topic for producer messages topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
示例四:使用Fluentd上传日志
Fluentd是一个开源的日志收集器,是云端原生计算基金会(CNCF)的成员项目之一,遵循Apache 2 License协议。
Fluentd支持众多输入、处理、输出插件,支持通过Kafka插件将日志上传到日志服务,您只需安装并配置Kafka插件即可。更多信息,请参见fluent-plugin-kafka。
配置示例
示例中将日志输出格式(Format)设置为JSON,除此之外还支持数十种Format类型。更多信息,请参见Fluentd Formatter。
示例中用到的
SLS_
开头的参数配置请参见配置方式。
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
示例五:使用Logstash上传日志
Logstash是一个具备实时处理能力、开源的日志采集引擎,可以动态采集不同来源的日志。
Logstash内置Kafka输出插件,您可以配置Logstash实现日志通过kafka协议上传到日志服务。由于日志服务使用SASL_SSL连接协议,因此还需要配置SSL证书和jaas文件。
配置示例
示例中将日志输出格式(Format)设置为JSON,除此之外还支持数十种Format类型。更多信息,请参见Logstash Codec。
说明本示例为连通性测试的配置,您的生产环境中建议删除stdout的输出配置。
示例中用到的
SLS_
开头的参数配置请参见配置方式。
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
示例六:通过Fluent-bit上传日志
Fluent-bit是一个轻量级、高可扩展的日志与指标的处理器、转发器,支持众多输入、处理和输出插件,支持通过Kafka插件将日志上传到日志服务。更多信息,请参见Kafka output plugin。
配置示例
示例中用到的
SLS_
开头的参数配置请参见配置方式。[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
示例七 :Vector配置Kafka协议上传
Vector是一款轻量级、高性能的日志处理软件,它支持Kafka协议的方式上报日志。下面是Vector通过Kafka兼容模式写入SLS的配置方法。
配置示例
示例中用到的
SLS_
开头的参数配置请参见配置方式。[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
示例八:通过Kafka生产者(produce)上传日志
代码示例
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { //配置信息。 Properties props = new Properties(); String project = "etl-dev"; String logstore = "testlog"; // 如果希望produce的内容被json解析展开,则设置为true boolean parseJson = true; // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。 // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。 // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险 String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-huhehaote.log.aliyuncs.com"; // 根据实际project所在的endpoint配置 String port = "10012"; // 公网用10012,私网用10011 String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put("bootstrap.servers", hosts); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put("enable.idempotence", "false"); // SLS的Kafka写入接口不支持事务 //设置数据key和value的序列化处理类。 props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); //创建生产者实例。 KafkaProducer<String,String> producer = new KafkaProducer<>(props); //发送记录 for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; ProducerRecord record = new ProducerRecord<String, String>(topic, content); producer.send(record); } producer.close(); } }
pom依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
错误信息
使用Kafka协议上传日志失败时,会按照Kafka的错误信息返回对应的错误信息,如下表所示,Kafka协议错误信息详情,请参见error list。
错误信息 | 说明 | 推荐解决方式 |
NetworkException | 出现网络错误时返回该错误信息。 | 一般等待1秒后重试即可。 |
TopicAuthorizationException | 鉴权失败时返回该错误信息。 | 一般是您提供的AccessKey错误或没有写入对应Project、Logstore的权限。请填写正确的且具备写入权限的AccessKey。 |
UnknownTopicOrPartitionException | 出现该错误可能有两种原因:
| 请确保已创建对应的Project和Logstore。如果已创建还是提示该错误,请检查Project所在地域是否和填入的Endpoint一致。 |
KafkaStorageException | 服务端出现异常时返回该错误信息。 | 一般等待1秒后重试即可。 |