使用Kafka协议上传日志

您可以使用Kafka Producer SDK、Beats系列软件、Collectd、Fluentd、Logstash、Telegraf、Vector等采集工具采集日志,并通过Kafka协议上传到日志服务。本文介绍通过采集工具采集到日志后,利用Kafka协议上传日志到日志服务的操作步骤。

相关限制

  • 支持的Kafka协议版本最低为2.1.0。

  • 为保证日志传输安全性,必须使用SASL_SSL连接协议。

权限说明

以下两个权限规则满足其中之一即可。

  • AliyunLogFullAccess

    本策略定义了管理日志服务(Log)的权限。授权方式请参见RAM用户授权RAM角色授权

  • 自定义权限策略

    1. 创建一个自定义权限策略,其中在脚本编辑页签,请使用以下脚本替换配置框中的原有内容。具体操作,请参见创建自定义权限策略

      说明

      脚本中的Project名称请根据实际情况替换。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project名称",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project名称/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. RAM用户添加创建的自定义权限策略。具体操作,请参见RAM用户授权

配置方式

使用Kafka协议上传日志时,您需要配置以下参数。

配置名

配置值

说明

示例

SLS_KAFKA_ENDPOINT

初始连接的集群地址,格式为Project名称.Endpoint:Port,请根据Project所在的Endpoint进行配置。更多信息,请参见服务入口

  • 私网:端口号为10011,例如Project名称.cn-hangzhou-intranet.log.aliyuncs.com:10011

  • 公网:端口号为10012,例如Project名称.cn-hangzhou.log.aliyuncs.com:10012

aliyun-project-testProject名称,cn-hangzhou-xxx.aliyuncs.comEndpoint1001110012分别为私网和公网的端口号。

  • 私网:aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011

  • 公网:aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012

SLS_PROJECT

Project名称

日志服务对应的Project名称。

aliyun-project-test

SLS_LOGSTORE

Logstore名称

日志服务对应的Logstore名称。Logstore名称后缀加上.json,则代表尝试JSON解析。

例如Logstore名称是test-logstore

  • 配置值为test-logstore,上报日志内容存储在content字段中。

  • 配置值为test-logstore.json,上报日志内容按JSON解析,解析用户上报的JSON数据中的首层Key为字段名,对应的Value为字段值。

SLS_PASSWORD

具备sls写入权限的AccessKeySecret。

AK的概念和创建步骤,请参见创建AccessKey

值为AccessKey IDAccessKey Secret#号拼接。

LTAI****************#yourAccessKeySecret

说明

如果您要通过Kafka消费组实时消费日志服务的数据,请提交工单咨询阿里云技术支持工程师。

示例一:通过Beats系列软件上传日志

Beats系列软件(MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat等)采集到日志后,支持通过Kafka协议将日志上传到日志服务。

  • 配置示例

    示例中用到的SLS_开头的参数配置请参见配置方式

    output.kafka:
      # initial brokers for reading cluster metadata
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # message topic selection + partitioning
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

示例二:通过Collectd上传日志

Collectd是一个守护(daemon)进程,用于定期采集系统和应用程序的性能指标,并支持通过Kafka协议上传到日志服务。更多信息,请参见Write Kafka Plugin

Collectd采集到日志上传到日志服务时,还需安装Kafka插件以及相关依赖。例如:在Linux CentOS中,可以使用yum安装Kafka插件,命令为sudo yum install collectd-write_kafka,安装RPM请参见Collectd-write_kafka

  • 配置示例

    • 示例中将日志输出格式(Format)设置为JSON,除此之外,还支持Command、Graphite类型。更多信息,请参见Collectd配置文档

    • 示例中用到的SLS_开头的参数配置请参见配置方式

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

示例三:使用Telegraf上传日志

Telegraf是由Go语言编写的代理程序,内存占用小,用于收集、处理、汇总数据指标。Telegraf具有丰富的插件及集成功能,可从其运行的系统中获取各种指标、从第三方API中获取指标以及通过statsdKafka消费者服务监听指标。

Telegraf采集到的日志通过Kafka协议上传到日志服务前,您需要先修改配置文件。

  • 配置示例

    • 示例中将日志输出格式(Format)设置为JSON,除此之外还支持Graphite、Carbon2等类型。更多信息,请参见Telegraf输出格式

      说明

      Telegraf必须配置一个合法的tls_ca路径,使用服务器自带的根证书的路径即可。Linux环境中,根证书CA路径一般为/etc/ssl/certs/ca-bundle.crt

    • 示例中用到的SLS_开头的参数配置请参见配置方式

    # Kafka output plugin configuration
    [[outputs.kafka]]
      ## URLs of kafka brokers
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Kafka topic for producer messages
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec represents the various compression codecs recognized by
      ## Kafka in messages.
      ## 0 : No compression
      ## 1 : Gzip compression
      ## 2 : Snappy compression
      ## 3 : LZ4 compression
      compression_codec = 1
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Use TLS but skip chain & host verification
      # insecure_skip_verify = false
      ## Optional SASL Config
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Data format to output.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

示例四:使用Fluentd上传日志

Fluentd是一个开源的日志收集器,是云端原生计算基金会(CNCF)的成员项目之一,遵循Apache 2 License协议。

Fluentd支持众多输入、处理、输出插件,支持通过Kafka插件将日志上传到日志服务,您只需安装并配置Kafka插件即可。更多信息,请参见fluent-plugin-kafka

  • 配置示例

    • 示例中将日志输出格式(Format)设置为JSON,除此之外还支持数十种Format类型。更多信息,请参见Fluentd Formatter

    • 示例中用到的SLS_开头的参数配置请参见配置方式

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka producer options
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

示例五:使用Logstash上传日志

Logstash是一个具备实时处理能力、开源的日志采集引擎,可以动态采集不同来源的日志。

说明

Kafka协议上传日志功能要求Logstash的版本号至少为7.10.1。

Logstash内置Kafka输出插件,您可以配置Logstash实现日志通过Kafka协议上传到日志服务。由于日志服务使用SASL_SSL连接协议,因此还需要配置SSL证书和jaas文件。

  • 配置示例

    • 示例中将日志输出格式(Format)设置为JSON,除此之外还支持数十种Format类型。更多信息,请参见Logstash Codec

      说明

      本示例为连通性测试的配置,您的生产环境中建议删除stdout的输出配置。

    • 示例中用到的SLS_开头的参数配置请参见配置方式

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

示例六:通过Fluent-bit上传日志

Fluent-bit是一个轻量级、高可扩展的日志与指标的处理器、转发器,支持众多输入、处理和输出插件,支持通过Kafka插件将日志上传到日志服务。更多信息,请参见Kafka output plugin

  • 配置示例

    示例中用到的SLS_开头的参数配置请参见配置方式

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

示例七 :Vector配置Kafka协议上传

Vector是一款轻量级、高性能的日志处理软件,它支持Kafka协议的方式上报日志。下面是Vector通过Kafka兼容模式写入SLS的配置方法。

  • 配置示例

    示例中用到的SLS_开头的参数配置请参见配置方式

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

示例八:通过Kafka生产者(produce)上传日志

Java

  • 依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • 代码示例

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            //配置信息。
            Properties props = new Properties();
            String project = "etl-shanghai-b";
            String logstore = "testlog";
            // 如果希望produce的内容被json解析展开,则设置为true
            boolean parseJson = false;
            // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
            // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
            // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-shanghai.log.aliyuncs.com"; // 根据实际project所在的endpoint配置
            String port = "10012"; // 公网用10012,私网用10011
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
    
            //创建生产者实例。
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            //发送记录
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                // 如果有需要可以用下面的方式设置消息的时间戳
                // long timestamp = System.currentTimeMillis();
                // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content);
    
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("ERROR: Failed to send message: " + exception.getMessage());
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent successfully to topic: " + metadata.topic() +
                                         ", partition: " + metadata.partition() +
                                         ", offset: " + metadata.offset() +
                                         ", timestamp: " + metadata.timestamp());
                    }
                });
            }
    
            producer.close();
        }
    }
    

Python

  • 依赖

    pip install confluent-kafka
  • 代码示例

    #!/bin/env python3
    import time
    import os
    from confluent_kafka import Producer
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    def main():
        project = "etl-shanghai-b"
        logstore = "testlog"
        parse_json = False
    
        # Get credentials from environment variables
        access_key_id = os.getenv("SLS_ACCESS_KEY_ID")
        access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET")
        endpoint = "cn-shanghai.log.aliyuncs.com"
        port = "10012" # 公网用10012,私网用10011
    
        hosts = f"{project}.{endpoint}:{port}"
        topic = logstore
        if parse_json:
            topic = topic + ".json"
    
        # Configure Kafka producer
        conf = {
            'bootstrap.servers': hosts,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': project,
            'sasl.password': f"{access_key_id}#{access_key_secret}",
            'enable.idempotence': False,
        }
    
        # Create producer instance
        producer = Producer(conf)
    
        # Send message
        content = "{\"msg\": \"Hello World\"}"
        producer.produce(topic=topic,
                         value=content.encode('utf-8'),
                         #timestamp=int(time.time() * 1000),  # (可选) 设置record时间戳, 单位毫秒
                         callback=delivery_report)
    
        # Wait for any outstanding messages to be delivered and delivery report
        # callbacks to be triggered.
        producer.flush()
    
    if __name__ == '__main__':
        main()
    

Golang

  • 依赖

    go get github.com/confluentinc/confluent-kafka-go/kafka
  • 代码示例

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    //	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    	project := "etl-shanghai-b"
    	logstore := "testlog"
    	parseJson := false
    
    	// Get credentials from environment variables
    	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
    	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
    	endpoint := "cn-shanghai.log.aliyuncs.com"
    	port := "10012" // 公网用10012,私网用10011
    
    	hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port)
    	topic := logstore
    	if parseJson {
    		topic = topic + ".json"
    	}
    
    	// Configure Kafka producer
    	config := &kafka.ConfigMap{
    		"bootstrap.servers":  hosts,
    		"security.protocol":  "sasl_ssl",
    		"sasl.mechanisms":    "PLAIN",
    		"sasl.username":      project,
    		"sasl.password":      accessKeyID + "#" + accessKeySecret,
    		"enable.idempotence": false,
    	}
    
    	// Create producer instance
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Fatalf("Failed to create producer: %v", err)
    	}
    	defer producer.Close()
    
    	// 批量发送消息
    	messages := []string{
    		"{\"msg\": \"Hello World 1\"}",
    		"{\"msg\": \"Hello World 2\"}",
    		"{\"msg\": \"Hello World 3\"}",
    	}
    
    	for _, content := range messages {
    		err := producer.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Value:          []byte(content),
    			//Timestamp:      time.Now(), // 如有需要可以设置时间
    		}, nil)
    
    		if err != nil {
    			log.Printf("Failed to produce message: %v", err)
    		}
    	}
    
    	// 启用一个go routine 监听producer发送是否成功或者失败
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
    				} else {
    					fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
    						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
    				}
    			}
    		}
    	}()
    
    	producer.Flush(5 * 1000)
    }

错误信息

使用Kafka协议上传日志失败时,会按照Kafka的错误信息返回对应的错误信息,如下表所示,Kafka协议错误信息详情,请参见error list

错误信息

说明

推荐解决方式

NetworkException

出现网络错误时返回该错误信息。

一般等待1秒后重试即可。

TopicAuthorizationException

鉴权失败时返回该错误信息。

一般是您提供的AccessKey错误或没有写入对应Project、Logstore的权限。请填写正确的且具备写入权限的AccessKey。

UnknownTopicOrPartitionException

出现该错误可能有两种原因:

  • 不存在对应的ProjectLogstore。

  • Project所在地域与填入的Endpoint不一致。

请确保已创建对应的ProjectLogstore。如果已创建还是提示该错误,请检查Project所在地域是否和填入的Endpoint一致。

KafkaStorageException

服务端出现异常时返回该错误信息。

一般等待1秒后重试即可。