文档

将云消息队列Kafka版接入阿里云Elasticsearch

更新时间:

随着时间的积累,云消息队列 Kafka 版中的日志数据会越来越多。当您需要查看并分析庞杂的日志数据时,可通过阿里云Logstash将云消息队列 Kafka 版中的日志数据导入阿里云Elasticsearch,然后通过Kibana进行可视化展示与分析。本文介绍将云消息队列 Kafka 版接入阿里云Elasticsearch的操作方法。

前提条件

在开始本教程前,请确保您已完成以下操作:

背景信息

通过阿里云Logstash将数据从云消息队列 Kafka 版导入阿里云Elasticsearch的过程如下图所示。elasticsearch

  • 云消息队列 Kafka 版

    云消息队列 Kafka 版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。云消息队列 Kafka 版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。更多信息,请参见什么是云消息队列 Kafka 版?

  • 阿里云Elasticsearch

    Elasticsearch简称ES,是一个基于Lucene的实时分布式的搜索与分析引擎,是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。它提供了一个分布式服务,可以使您快速的近乎于准实时的存储、查询和分析超大数据集,通常被用来作为构建复杂查询特性和需求强大应用的基础引擎或技术。阿里云Elasticsearch支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0版本,并提供了商业插件X-Pack服务,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch的基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。更多信息,请参见什么是阿里云Elasticsearch

  • 阿里云Logstash

    阿里云Logstash作为服务器端的数据处理管道,提供了100%兼容开源的Logstash功能。Logstash能够动态地从多个来源采集数据、转换数据,并且将数据存储到所选择的位置。通过输入、过滤和输出插件,Logstash可以对任何类型的事件加工和转换。更多信息,请参见什么是阿里云Logstash

步骤一:获取VPC环境接入点

阿里云Logstash通过云消息队列 Kafka 版的接入点与云消息队列 Kafka 版在VPC环境下建立连接。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 实例详情页面的接入点信息页签,获取实例的VPC环境接入点。

    endpoint

    云消息队列 Kafka 版支持以下VPC环境接入点:

    • 默认接入点:端口号为9092。

    • SASL接入点:端口号为9094。如需使用SASL接入点,请开启ACL。详细步骤,请参见开启ACL

    更多信息,请参见接入点对比

步骤二:创建Topic

创建用于存储消息的Topic。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Topic 管理

  5. Topic 管理页面,单击创建 Topic

  6. 创建 Topic面板,设置Topic属性,然后单击确定

    参数

    说明

    示例

    名称

    Topic名称。

    demo

    描述

    Topic的简单描述。

    demo test

    分区数

    Topic的分区数量。

    12

    存储引擎

    说明

    当前仅专业版实例支持选择存储引擎类型,标准版暂不支持,默认选择为云存储类型。

    Topic消息的存储引擎。

    云消息队列 Kafka 版支持以下两种存储引擎。

    • 云存储:底层接入阿里云云盘,具有低时延、高性能、持久性、高可靠等特点,采用分布式3副本机制。实例的规格类型标准版(高写版)时,存储引擎只能为云存储

    • Local 存储:使用原生Kafka的ISR复制算法,采用分布式3副本机制。

    云存储

    消息类型

    Topic消息的类型。

    • 普通消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,可能会造成消息乱序。当存储引擎选择云存储时,默认选择普通消息

    • 分区顺序消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,仍然保证分区内按照发送顺序存储。但是会出现部分分区发送消息失败,等到分区恢复后即可恢复正常。当存储引擎选择Local 存储时,默认选择分区顺序消息

    普通消息

    日志清理策略

    Topic日志的清理策略。

    存储引擎选择Local 存储(当前仅专业版实例支持选择存储引擎类型为Local存储,标准版暂不支持)时,需要配置日志清理策略

    云消息队列 Kafka 版支持以下两种日志清理策略。

    • Delete:默认的消息清理策略。在磁盘容量充足的情况下,保留在最长保留时间范围内的消息;在磁盘容量不足时(一般磁盘使用率超过85%视为不足),将提前删除旧消息,以保证服务可用性。

    • Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保证相同Key的消息,最新的value值一定会被保留。主要适用于系统宕机后恢复状态,系统重启后重新加载缓存等场景。例如,在使用Kafka Connect或Confluent Schema Registry时,需要使用Kafka Compact Topic存储系统状态信息或配置信息。

      重要

      Compact Topic一般只用在某些生态组件中,例如Kafka Connect或Confluent Schema Registry,其他情况的消息收发请勿为Topic设置该属性。具体信息,请参见云消息队列 Kafka 版Demo库

    Compact

    标签

    Topic的标签。

    demo

    创建完成后,在Topic 管理页面的列表中显示已创建的Topic。

步骤三:发送消息

向创建的Topic发送消息。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Topic 管理

  5. Topic 管理页面,找到目标Topic,在其操作列中,选择更多 > 体验发送消息

  6. 快速体验消息收发面板,发送测试消息。

    • 发送方式选择控制台

      1. 消息 Key文本框中输入消息的Key值,例如demo。

      2. 消息内容文本框输入测试的消息内容,例如 {"key": "test"}。

      3. 设置发送到指定分区,选择是否指定分区。

        • 单击,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态

        • 单击,不指定分区。

      4. 根据界面提示信息,通过SDK订阅消息,或者执行Docker命令订阅消息。

    • 发送方式选择Docker,运行Docker容器。

      1. 执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。

      2. 执行发送后如何消费消息?区域的Docker命令,订阅消息。

    • 发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK体验消息收发。

步骤四:创建Group

创建阿里云Elasticsearch所属的Group

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Group 管理

  5. Group 管理页面,单击创建 Group

  6. 创建 Group面板的Group ID文本框输入Group的名称,在描述文本框简要描述Group,并给Group添加标签,单击确定

    创建完成后,在Group 管理页面的列表中显示已创建的Group。

步骤五:创建索引

通过阿里云Elasticsearch创建索引,接收云消息队列 Kafka 版的数据。

  1. 登录Kibana控制台

    说明
    • Username为您创建阿里云Elasticsearch实例时设置的用户名。

    • Password为您创建阿里云Elasticsearch实例时设置的密码。

  2. 在Kibana控制台的左侧导航栏,单击Dev Tools

  3. 执行以下命令创建索引。

    PUT /elastic_test
    {}

步骤六:创建管道

通过阿里云Logstash创建管道。管道部署后,将源源不断地从云消息队列 Kafka 版导入数据进阿里云Elasticsearch。

  1. 登录阿里云Elasticsearch控制台的Logstash实例页面

  2. 在顶部菜单栏,选择地域。

  3. Logstash实例页面,单击创建的实例。

  4. 在左侧导航栏,单击管道管理

  5. 管道列表区域,单击创建管道

  6. Config配置中,输入配置。

    配置示例如下。

    input {
        kafka {
        bootstrap_servers => ["alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"]
        group_id => "elastic_group"
        topics => ["elastic_test"]
        consumer_threads => 12
        decorate_events => true
        }
    }
    output {
        elasticsearch {
        hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"]
        index => "elastic_test"
        password => "XXX"
        user => "elastic"
        }
    }

    表 1. input参数说明

    参数

    描述

    示例值

    bootstrap_servers

    云消息队列 Kafka 版的VPC环境接入点。

    alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

    group_id

    Group的名称。

    elastic_group

    topics

    Topic的名称。

    elastic_test

    consumer_threads

    消费线程数。建议与Topic的分区数保持一致。

    12

    decorate_events

    是否包含消息元数据。默认值为false。

    true

    表 2. output参数说明

    参数

    描述

    示例值

    hosts

    阿里云Elasticsearch服务的访问地址。您可在阿里云Elasticsearch实例的基本信息页面获取。

    http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200

    index

    索引的名称。

    elastic_test

    password

    访问阿里云Elasticsearch服务的密码。您在创建阿里云Elasticsearch实例时设置的密码。

    XXX

    user

    访问阿里云Elasticsearch服务的用户名。您在创建阿里云Elasticsearch实例时设置的用户名。

    elastic

  7. 管道参数配置中,输入配置信息,然后单击保存并部署

  8. 提示对话框,单击确认

步骤七:搜索数据

您可以在Kibana控制台搜索通过管道导入阿里云Elasticsearch的数据,确认数据是否导入成功。

  1. 登录Kibana控制台

    说明
    • Username为您创建阿里云Elasticsearch实例时设置的用户名。

    • Password为您创建阿里云Elasticsearch实例时设置的密码。

  2. 在Kibana控制台的左侧导航栏,单击Dev Tools图标。

  3. 执行以下命令搜索数据。

    GET /elastic_test/_search
    {}

    返回结果如下。作为Input接入