在VPC环境中作为Output接入

更新时间:2024-12-26 08:09:50

云消息队列 Kafka 版可以作为Output接入Logstash。本文说明如何在VPC环境下通过Logstash云消息队列 Kafka 版发送消息。

前提条件

  • 购买并部署云消息队列 Kafka 版实例。具体操作,请参见VPC接入

  • 下载并安装Logstash。具体操作,请参见Download Logstash

  • 下载并安装JDK 8。具体操作,请参见Download JDK 8

步骤一:获取接入点

Logstash通过云消息队列 Kafka 版的接入点与云消息队列 Kafka 版建立连接。

说明

云消息队列 Kafka 版支持以下VPC环境接入点:

  • 默认接入点:端口号为9092。

  • SASL接入点:端口号为9094。如需使用SASL接入点,请开启ACL,具体步骤,请参见开启ACL

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击作为Output接入Logstash的实例。

  4. 实例详情页面的接入点信息区域,获取实例的接入点。在配置信息区域,获取用户名密码

    endpoint

    说明

    不同接入点的差异,请参见接入点对比

步骤二:创建Topic

创建用于存储消息的Topic。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

    重要

    Topic需要在应用程序所在的地域(即所部署的ECS的所在地域)进行创建。Topic不能跨地域使用。例如Topic创建在华北2(北京)这个地域,那么消息生产端和消费端也必须运行在华北2(北京)的ECS。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击Topic 管理

  5. Topic 管理页面,单击创建 Topic

  6. 创建 Topic面板,设置Topic属性,然后单击确定

    参数

    说明

    示例

    参数

    说明

    示例

    名称

    Topic名称。

    demo

    描述

    Topic的简单描述。

    demo test

    分区数

    Topic的分区数量。

    12

    存储引擎

    说明

    当前仅专业版实例支持选择存储引擎类型,标准版暂不支持,默认选择为云存储类型。

    Topic消息的存储引擎。

    云消息队列 Kafka 版支持以下两种存储引擎。

    • 云存储:底层接入阿里云云盘,具有低时延、高性能、持久性、高可靠等特点,采用分布式3副本机制。实例的规格类型标准版(高写版)时,存储引擎只能为云存储

    • Local 存储:使用原生KafkaISR复制算法,采用分布式3副本机制。

    云存储

    消息类型

    Topic消息的类型。

    • 普通消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,可能会造成消息乱序。当存储引擎选择云存储时,默认选择普通消息

    • 分区顺序消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,仍然保证分区内按照发送顺序存储。但是会出现部分分区发送消息失败,等到分区恢复后即可恢复正常。当存储引擎选择Local 存储时,默认选择分区顺序消息

    普通消息

    日志清理策略

    Topic日志的清理策略。

    存储引擎选择Local 存储(当前仅专业版实例支持选择存储引擎类型为Local存储,标准版暂不支持)时,需要配置日志清理策略

    云消息队列 Kafka 版支持以下两种日志清理策略。

    • Delete:默认的消息清理策略。在磁盘容量充足的情况下,保留在最长保留时间范围内的消息;在磁盘容量不足时(一般磁盘使用率超过85%视为不足),将提前删除旧消息,以保证服务可用性。

    • Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保证相同Key的消息,最新的value值一定会被保留。主要适用于系统宕机后恢复状态,系统重启后重新加载缓存等场景。例如,在使用Kafka ConnectConfluent Schema Registry时,需要使用Kafka Compact Topic存储系统状态信息或配置信息。

      重要

      Compact Topic一般只用在某些生态组件中,例如Kafka ConnectConfluent Schema Registry,其他情况的消息收发请勿为Topic设置该属性。具体信息,请参见云消息队列 Kafka 版Demo

    Compact

    标签

    Topic的标签。

    demo

    创建完成后,在Topic 管理页面的列表中显示已创建的Topic。

步骤三:Logstash发送消息

在安装了Logstash的机器上启动Logstash,向创建的Topic发送消息。

  1. 执行cd命令切换到logstashbin目录。

  2. 创建output.conf配置文件。

    1. 执行命令vim output.conf创建空的配置文件。

    2. i键进入插入模式。

    3. 输入以下内容。

      input {
          input {
            stdin{}
        }
      }
      
      output {
         kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"
              topic_id => "logstash_test"
             }
      }

      参数

      描述

      示例值

      参数

      描述

      示例值

      bootstrap_servers

      云消息队列 Kafka 版提供以下VPC接入点:

      • 默认接入点

      • SASL接入点

      alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

      topic_id

      Topic的名称。

      logstash_test

    4. Esc键回到命令行模式。

    5. 键进入底行模式,输入wq,然后按回车键保存文件并退出。

  3. 向创建的Topic发送消息。

    1. 执行./logstash -f output.conf

    2. 输入test,然后按回车键。

      返回结果如下。

      result

步骤四:查看Topic分区

查看消息发送到Topic的情况。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击作为Output接入Logstash的实例。

  4. 在左侧导航栏,单击Topic 管理

  5. Topic 管理页面,单击目标Topic名称进入Topic 详情页面,然后单击分区状态页签。

    表 1. 分区状态信息

    参数

    说明

    分区ID

    Topic分区的ID号。

    最小位点

    Topic在当前分区下的最小消费位点。

    最大位点

    Topic在当前分区下的最大消费位点。

    分区消息量

    Topic在当前分区下的消息总量。

    最近更新时间

    本分区中最近一条消息的存储时间。

    分区状态信息

步骤五:按位点查询消息

您可以根据发送的消息的分区ID和位点信息查询该消息。

  1. 登录云消息队列 Kafka 版控制台

  2. 概览页面的资源分布区域,选择地域。

  3. 实例列表页面,单击目标实例名称。

  4. 在左侧导航栏,单击消息查询

  5. 消息查询页面的查询方式列表中,选择按位点查询

  6. Topic列表中,选择消息所属Topic名称;在分区列表中,选择消息所属的分区;在起始位点文本框,输入消息所在分区的位点,然后单击查询

    展示该查询位点及以后连续的消息。例如,指定的分区和位点都为“5”,那么返回的结果从位点“5”开始。

    表 2. 查询结果参数解释

    参数

    描述

    分区

    消息的Topic分区。

    位点

    消息的所在的位点。

    Key

    消息的键(已强制转化为String类型)。

    Value

    消息的值(已强制转化为String类型),即消息的具体内容。

    消息创建时间

    发送消息时,客户端自带的或是您指定的ProducerRecord中的消息创建时间。

    说明
    • 如果配置了该字段,则按配置值显示。

    • 如果未配置该字段,则默认取消息发送时的系统时间。

    • 如果显示值为1970/x/x x:x:x,则说明发送时间配置为0或其他有误的值。

    • 0.9及以前版本的云消息队列 Kafka 版客户端不支持配置该时间。

    操作

    • 单击下载 Key:下载消息的键值。

    • 单击下载 Value:下载消息的具体内容。

    重要
    • 查询到的每条消息在控制台上最多显示1 KB的内容,超过1 KB的部分将自动截断。如需查看完整的消息内容,请下载相应的消息。

    • 下载的消息最大为10 MB。如果消息超过10 MB,则只下载10 MB的内容。

更多信息

更多参数设置,请参见Kafka output plugin

  • 本页导读 (1)
  • 前提条件
  • 步骤一:获取接入点
  • 步骤二:创建Topic
  • 步骤三:Logstash发送消息
  • 步骤四:查看Topic分区
  • 步骤五:按位点查询消息
  • 更多信息
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等