随着时间的积累,消息队列Kafka版中的日志数据会越来越多。当您需要查看并分析庞杂的日志数据时,可通过阿里云Logstash将消息队列Kafka版中的日志数据导入阿里云Elasticsearch,然后通过Kibana进行可视化展示与分析。本文说明如何进行操作。

前提条件

在开始本教程前,请确保您已完成以下操作:

背景信息

通过阿里云Logstash将数据从消息队列Kafka版导入阿里云Elasticsearch的过程如下图所示。elasticsearch
  • 消息队列Kafka版

    消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。详情请参见什么是消息队列Kafka版

  • 阿里云Elasticsearch

    Elasticsearch简称ES,是一个基于Lucene的实时分布式的搜索与分析引擎,是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。它提供了一个分布式服务,可以使您快速的近乎于准实时的存储、查询和分析超大数据集,通常被用来作为构建复杂查询特性和需求强大应用的基础引擎或技术。阿里云Elasticsearch支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0版本,并提供了商业插件X-Pack服务,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch的基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。详情请参见什么是阿里云Elasticsearch

  • 阿里云Logstash

    阿里云Logstash作为服务器端的数据处理管道,提供了100%兼容开源的Logstash功能。Logstash能够动态地从多个来源采集数据、转换数据,并且将数据存储到所选择的位置。通过输入、过滤和输出插件,Logstash可以对任何类型的事件加工和转换。详情请参见什么是阿里云LogstashService

步骤一:获取VPC环境接入点

阿里云Logstash通过消息队列Kafka版的接入点与消息队列Kafka版在VPC环境下建立连接。

  1. 登录消息队列Kafka版控制台
  2. 在顶部菜单栏,选择地域,例如华东1(杭州)。
  3. 在左侧导航栏,单击实例详情
  4. 实例详情页面,选择要将数据导入阿里云Elasticsearch的实例。
  5. 基本信息区域,获取实例的VPC环境接入点。
    endpoint
    消息队列Kafka版支持以下VPC环境接入点:
    • 默认接入点:端口号为9092。
    • SASL接入点:端口号为9094。如需使用SASL接入点,请开启ACL。您可以提交工单申请开启ACL。

    详情请参见接入点对比

步骤二:创建Topic

创建用于存储消息的Topic。

  1. 消息队列Kafka版控制台的左侧导航栏,单击Topic管理
  2. Topic管理页面,单击创建Topic
  3. 创建Topic页面,输入Topic信息,然后单击创建
    create_topic

步骤三:发送消息

向创建的Topic发送消息。

  1. 消息队列Kafka版控制台的Topic管理页面,找到创建的Topic,在其右侧操作列,单击发送消息
  2. 发送消息对话框,输入消息信息,然后单击发送
    send_msg

步骤四:创建Consumer Group

创建阿里云Elasticsearch所属的Consumer Group。

  1. 消息队列Kafka版控制台的左侧导航栏,单击Consumer Group管理
  2. Consumer Group管理页面,单击创建Consumer Group
  3. 创建Consumer Group页面,输入Consumer Group信息,然后单击创建
    create_cg

步骤五:创建索引

通过阿里云Elasticsearch创建索引,接收消息队列Kafka版的数据。

  1. 登录阿里云Elasticsearch控制台
  2. 在顶部菜单栏,选择地域。
  3. 实例列表页面,单击创建的实例。
  4. 在左侧导航栏,单击可视化控制
  5. Kibana区域,单击进入控制台
  6. 在Kibana登录页面,输入Username和Password,然后单击Log in
    说明
    • Username为您创建阿里云Elasticsearch实例时设置的用户名。
    • Password为您创建阿里云Elasticsearch实例时设置的密码。
  7. 在Kibana控制台的左侧导航栏,单击Dev Tools
  8. 执行以下命令创建索引。
    PUT /elastic_test
    {}

步骤六:创建管道

通过阿里云Logstash创建管道。管道部署后,将源源不断地从消息队列Kafka版导入数据进阿里云Elasticsearch。

  1. 登录阿里云Logstash控制台
  2. 在顶部菜单栏,选择地域。
  3. 实例列表页面,单击创建的实例。
  4. 在左侧导航栏,单击管道管理
  5. 管道列表区域,单击创建管道
  6. Config配置中,输入配置。
    配置示例如下。
    input {
        kafka {
        bootstrap_servers => ["192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092"]
        group_id => "elastic_group"
        topics => ["elastic_test"]
        consumer_threads => 12
        decorate_events => true
        }
    }
    output {
        elasticsearch {
        hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"]
        index => "elastic_test"
        password => "XXX"
        user => "elastic"
        }
    }
    表 1. input参数说明
    参数 描述 示例值
    bootstrap_servers 消息队列Kafka版的VPC环境接入点。 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092
    group_id Consumer Group的名称。 elastic_group
    topics Topic的名称。 elastic_test
    consumer_threads 消费线程数。建议与Topic的分区数保持一致。 12
    decorate_events 是否包含消息元数据。默认值为false。 true
    表 2. output参数说明
    参数 描述 示例值
    hosts 阿里云Elasticsearch服务的访问地址。您可在阿里云Elasticsearch实例的基本信息页面获取。 http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200
    index 索引的名称。 elastic_test
    password 访问阿里云Elasticsearch服务的密码。您在创建阿里云Elasticsearch实例时设置的密码。 XXX
    user 访问阿里云Elasticsearch服务的用户名。您在创建阿里云Elasticsearch实例时设置的用户名。 elastic
  7. 管道参数配置中,输入配置信息,然后单击保存并部署
    input
  8. 提示对话框,单击确认

步骤七:搜索数据

您可以在Kibana控制台搜索通过管道导入阿里云Elasticsearch的数据,确认数据是否导入成功。

  1. 登录阿里云Elasticsearch控制台
  2. 在顶部菜单栏,选择地域。
  3. 实例列表页面,单击创建的实例。
  4. 在左侧导航栏,单击可视化控制
  5. Kibana区域,单击进入控制台
  6. 在Kibana登录页面,输入Username和Password,然后单击Log in
    说明
    • Username为您创建阿里云Elasticsearch实例时设置的用户名。
    • Password为您创建阿里云Elasticsearch实例时设置的密码。
  7. 在Kibana控制台的左侧导航栏,单击Dev Tools图标。
  8. 执行以下命令搜索数据。
    GET /elastic_test/_search
    {}
    返回结果如下。作为Input接入