随着时间的积累,云消息队列 Kafka 版中的日志数据会越来越多。当您需要查看并分析庞杂的日志数据时,可通过阿里云Logstash将云消息队列 Kafka 版中的日志数据导入阿里云Elasticsearch,然后通过Kibana进行可视化展示与分析。本文介绍将云消息队列 Kafka 版接入阿里云Elasticsearch的操作方法。
前提条件
在开始本教程前,请确保您已完成以下操作:
-
购买并部署云消息队列 Kafka 版实例。具体信息,请参见通过 VPC 接入云消息队列 Kafka 版。
-
创建阿里云Elasticsearch实例。具体信息,请参见创建阿里云Elasticsearch实例。
-
创建阿里云Logstash实例。具体信息,请参见创建阿里云Logstash实例。
背景信息
通过阿里云Logstash将数据从云消息队列 Kafka 版导入阿里云Elasticsearch的过程如下图所示。
-
云消息队列 Kafka 版
云消息队列 Kafka 版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。云消息队列 Kafka 版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。更多信息,请参见什么是云消息队列 Kafka 版?。
-
阿里云Elasticsearch
Elasticsearch简称ES,是一个基于Lucene的实时分布式的搜索与分析引擎,是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。它提供了一个分布式服务,可以使您快速的近乎于准实时的存储、查询和分析超大数据集,通常被用来作为构建复杂查询特性和需求强大应用的基础引擎或技术。阿里云Elasticsearch支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0版本,并提供了商业插件X-Pack服务,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch的基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。更多信息,请参见什么是阿里云Elasticsearch。
-
阿里云Logstash
阿里云Logstash作为服务器端的数据处理管道,提供了100%兼容开源的Logstash功能。Logstash能够动态地从多个来源采集数据、转换数据,并且将数据存储到所选择的位置。通过输入、过滤和输出插件,Logstash可以对任何类型的事件加工和转换。更多信息,请参见什么是阿里云Logstash。
步骤一:获取VPC环境接入点
阿里云Logstash通过云消息队列 Kafka 版的接入点与云消息队列 Kafka 版在VPC环境下建立连接。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
-
在实例详情页面的接入点信息页签,获取实例的VPC环境接入点。
接入点信息包含三种类型:默认接入点(网络VPC,协议PLAINTEXT)、SSL接入点(网络公网,协议SASL_SSL)和SASL接入点(网络VPC,协议SASL_PLAINTEXT)。获取默认接入点对应的域名地址即为VPC环境接入点。
步骤二:创建Topic
创建用于存储消息的Topic。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理。
在Topic 管理页面,单击创建 Topic。
在创建 Topic面板,设置Topic属性,然后单击确定。
参数
说明
示例
名称
Topic名称。
说明xxx_xxx和xxx.xxx的 Topic名称在 Kafka 中会被认为是同名Topic,重复创建会有错误提示。demo
描述
Topic的简单描述。
demo test
分区数
Topic的分区数量。
12
存储引擎
说明当前仅非Serverless专业版实例支持选择存储引擎类型,其他实例暂不支持选择,默认为云存储类型。
Topic消息的存储引擎。
云消息队列 Kafka 版支持以下两种存储引擎。
云存储:底层接入阿里云云盘,具有低时延、高性能、持久性、高可靠等特点,采用分布式3副本机制。实例的规格类型为标准版(高写版)时,存储引擎只能为云存储。
Local 存储:使用原生Kafka的ISR复制算法,采用分布式3副本机制。
云存储
消息类型
Topic消息的类型。
普通消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,可能会造成消息乱序。当存储引擎选择云存储时,默认选择普通消息。
分区顺序消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,仍然保证分区内按照发送顺序存储。但是会出现部分分区发送消息失败,等到分区恢复后即可恢复正常。当存储引擎选择Local 存储时,默认选择分区顺序消息。
普通消息
日志清理策略
Topic日志的清理策略。
当存储引擎选择Local 存储(当前仅专业版实例支持选择存储引擎类型为Local存储,标准版暂不支持)时,需要配置日志清理策略。
云消息队列 Kafka 版支持以下两种日志清理策略。
Delete:默认的消息清理策略。在磁盘容量充足的情况下,保留在最长保留时间范围内的消息;在磁盘容量不足时(一般磁盘使用率超过85%视为不足),将提前删除旧消息,以保证服务可用性。
Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保证相同Key的消息,最新的value值一定会被保留。主要适用于系统宕机后恢复状态,系统重启后重新加载缓存等场景。例如,在使用Kafka Connect或Confluent Schema Registry时,需要使用Kafka Compact Topic存储系统状态信息或配置信息。
重要Compact Topic一般只用在某些生态组件中,例如Kafka Connect或Confluent Schema Registry,其他情况的消息收发请勿为Topic设置该属性。具体信息,请参见云消息队列 Kafka 版Demo库。
Compact
标签
Topic的标签。
demo
创建完成后,在Topic 管理页面的列表中显示已创建的Topic。
步骤三:发送消息
向创建的Topic发送消息。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理。
在Topic 管理页面,单击目标Topic名称进入Topic 详情页面,然后单击体验发送消息。
在快速体验消息收发面板,发送测试消息。
发送方式选择控制台。
在消息 Key文本框中输入消息的Key值,例如demo。
在消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
设置发送到指定分区,选择是否指定分区。
单击是,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态。
单击否,不指定分区。
根据界面提示信息,通过SDK订阅消息,或者执行Docker命令订阅消息。
发送方式选择Docker,运行Docker容器。
执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。
执行发送后如何消费消息?区域的Docker命令,订阅消息。
发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK体验消息收发。
步骤四:创建Group
创建阿里云Elasticsearch所属的Group。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Group 管理。
在Group 管理页面,单击创建 Group。
在创建 Group面板的Group ID文本框输入Group的名称,在描述文本框简要描述Group,并给Group添加标签,单击确定。
创建完成后,在Group 管理页面的列表中显示已创建的Group。
步骤五:创建索引
通过阿里云Elasticsearch创建索引,接收云消息队列 Kafka 版的数据。
-
说明
-
Username为您创建阿里云Elasticsearch实例时设置的用户名。
-
Password为您创建阿里云Elasticsearch实例时设置的密码。
-
-
在Kibana控制台的左侧导航栏,单击Dev Tools。
-
执行以下命令创建索引。
PUT /elastic_test {}
步骤六:创建管道
通过阿里云Logstash创建管道。管道部署后,将源源不断地从云消息队列 Kafka 版导入数据进阿里云Elasticsearch。
-
登录阿里云Elasticsearch控制台的Logstash实例页面。
-
在顶部菜单栏,选择地域。
-
在Logstash实例页面,单击创建的实例。
-
在左侧导航栏,单击管道管理。
-
在管道列表区域,单击创建管道。
-
在Config配置中,输入配置。
配置示例如下。
input { kafka { bootstrap_servers => ["alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"] group_id => "elastic_group" topics => ["elastic_test"] consumer_threads => 12 decorate_events => true } } output { elasticsearch { hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"] index => "elastic_test" password => "XXX" user => "elastic" } }表 1. input参数说明
参数
描述
示例值
bootstrap_servers
云消息队列 Kafka 版的VPC环境接入点。
alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
group_id
Group的名称。
elastic_group
topics
Topic的名称。
elastic_test
consumer_threads
消费线程数。建议与Topic的分区数保持一致。
12
decorate_events
是否包含消息元数据。默认值为false。
true
表 2. output参数说明
参数
描述
示例值
hosts
阿里云Elasticsearch服务的访问地址。您可在阿里云Elasticsearch实例的基本信息页面获取。
http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200
index
索引的名称。
elastic_test
password
访问阿里云Elasticsearch服务的密码。您在创建阿里云Elasticsearch实例时设置的密码。
XXX
user
访问阿里云Elasticsearch服务的用户名。您在创建阿里云Elasticsearch实例时设置的用户名。
elastic
-
在管道参数配置中,输入配置信息,然后单击保存并部署。
-
在提示对话框,单击确认。
步骤七:搜索数据
您可以在Kibana控制台搜索通过管道导入阿里云Elasticsearch的数据,确认数据是否导入成功。
-
说明
-
Username为您创建阿里云Elasticsearch实例时设置的用户名。
-
Password为您创建阿里云Elasticsearch实例时设置的密码。
-
-
在Kibana控制台的左侧导航栏,单击Dev Tools图标。
-
执行以下命令搜索数据。
GET /elastic_test/_search {}返回结果如下。
{ "took" : 2, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "elastic_test", "_type" : "_doc", "_id" : "xxx", "_score" : 1.0, "_source" : { "@version" : "1", "message" : "1", "@timestamp" : "xxxT11:33:44.274Z" } }, { "_index" : "elastic_test", "_type" : "_doc", "_id" : "xxx", "_score" : 1.0, "_source" : { "@version" : "1", "message" : "2", "@timestamp" : "xxxT11:34:44.504Z" } } ] } }