SAE日志导入Kafka最佳实践

Serverless 应用引擎 SAE(Serverless App Engine)产生的日志导入到云消息队列 Kafka 版之后,您可以将Kafka中的日志统一写入阿里云Elasticsearch,便于后续的统一查看和处理。本文介绍通过SAE日志导入到Kafka,再从Kafka将数据写入Elasticsearch的最佳实践。

前提条件

步骤一:创建Group

本步骤介绍如何创建阿里云Elasticsearch所属的Group。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Group 管理

  5. Group 管理页面,单击创建 Group

  6. 创建 Group面板的Group ID文本框输入Group的名称,在描述文本框简要描述Group,并给Group添加标签,单击确定

    创建完成后,在Group 管理页面的列表中显示已创建的Group。

步骤二:创建索引

本步骤介绍如何通过阿里云Elasticsearch创建索引,接收云消息队列 Kafka 版的数据。

  1. 登录阿里云Elasticsearch控制台
  2. 在左侧导航栏,单击Elasticsearch实例
  3. 进入目标实例。
    1. 在顶部菜单栏处,选择资源组和地域。
    2. Elasticsearch实例中单击目标实例ID。
  4. 在左侧导航栏,选择配置与管理 > 可视化控制

  5. Kibana区域中,单击私网入口公网入口

    • 私网入口:开启Kibana私网访问后(默认未开启),才会显示。具体操作,请参见配置Kibana公网或私网访问白名单

      说明

      仅以5601为公网访问端口的Kibana实例支持开放私网访问,具体以控制台为准。

    • 公网入口:开启Kibana公网访问后(默认开启),才会显示。具体操作,请参见配置Kibana公网或私网访问白名单

      说明
      • 首次从公网入口进入Kibana控制台且公网访问配置未修改时,系统会提示您修改配置。单击修改配置,进入Kibana配置页面修改Kibana公网访问白名单,在白名单中加入您客户端的IP地址,具体操作请参见配置Kibana公网或私网访问白名单。修改后,再次单击公网入口,即可进入Kibana控制台。

      • 获取客户端的IP地址:如果您的客户端处在家庭网络或公司局域网中,您需要将局域网的公网出口IP地址添加到白名单中,而非客户端机器的内网机制。建议您通过浏览器访问https://myip.ipip.net查询。您也可以将白名单配置为0.0.0.0/0,允许所有IPv4地址访问Kibana,此配置会导致Kibana完全暴露在公网中,增加安全风险,配置前请确认您是否可以接受这个风险。

  6. 在Kibana登录页面,输入用户名和密码,单击Log in

    参数说明如下。

  7. 在Kibana控制台的左侧导航栏,选择Management > Dev Tools

  8. 执行以下命令,创建索引。

    PUT /elastic_test
    {}

步骤三:创建管道

本步骤介绍如何通过阿里云Logstash创建管道。成功部署管道后,数据将持续地从消息队列Kafka版导入到阿里云Elasticsearch。

  1. 登录阿里云Elasticsearch控制台
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理

  4. 管道列表页面,单击创建管道

  5. 创建管道任务配置向导页面,配置相关参数。

    1. Config配置页签,配置相关参数并单击下一步

      本文的参数配置以Kafka数据导入Elasticsearch为例。参数的更多信息,请参见通过配置文件管理管道

      • 管道ID:自定义。

      • Config配置:配置示例如下。

        input {
            kafka {
            bootstrap_servers => ["alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"]
            group_id => "elastic_group"
            topics => ["elastic_test"]
            codec => json
            consumer_threads => 12
            decorate_events => true
            }
        }
        output {
            elasticsearch {
            hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"]
            index => "elastic_test"
            password => "XXX"
            user => "elastic"
            }
        }

        参数说明如下。

        参数

        说明

        示例值

        input

        bootstrap_servers

        Kafka的VPC环境接入点。

        alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

        group_id

        Group的名称。

        elastic_group

        topics

        Topic的名称。

        elastic_test

        codec

        解码的类型。

        说明

        建议设置为JSON格式,与SAE日志导入到Kafka的数据格式保持一致。

        json

        consumer_threads

        消费线程数。

        说明

        建议与Topic的分区数保持一致。

        12

        decorate_events

        是否包含消息元数据。默认值为false

        true

        output

        hosts

        Elasticsearch的访问地址。您可在Elasticsearch实例的基本信息页面获取。

        http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200

        index

        索引的名称。

        elastic_test

        password

        访问Elasticsearch的密码。您在创建Elasticsearch实例时设置的密码。

        XXX

        user

        访问Elasticsearch的用户名。您在创建Elasticsearch实例时设置的用户名。

        elastic

    2. 管道参数配置页签,配置相关参数,单击保存并部署

      本示例以默认配置为准,您可以按需修改。

  6. 提示对话框,单击确认

步骤四:搜索数据

您可以在Kibana控制台搜索通过管道导入阿里云Elasticsearch的数据,确认数据是否导入成功。

  1. 登录阿里云Elasticsearch控制台
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择资源组和地域。
    2. Elasticsearch实例中单击目标实例ID。
  3. 在左侧导航栏,选择配置与管理 > 可视化控制

  4. 在Kibana控制台的左侧导航栏,选择Management > Dev Tools

    登录Kibana控制台的具体步骤,请参见创建索引步骤5~7。

  5. 执行以下命令,搜索数据。

    GET /elastic_test/_search
    {}

    返回示例如下。sc_elasticsearch_responses