当您需要将Azure Event Hubs事件中心中的数据同步到阿里云Elasticsearch中时,可使用阿里云Logstash的管道配置功能实现。本文介绍具体的实现方法。

操作流程

  1. 步骤一:准备环境与实例
  2. 步骤二:创建并配置Logstash管道
  3. 步骤三:验证结果

步骤一:准备环境与实例

  1. 创建阿里云Elasticsearch实例,并开启自动创建索引功能。本文使用7.10版本的实例。
  2. 创建阿里云Logstash实例并配置NAT公网数据传输。本文使用7.4版本的实例。
    具体操作请参见创建阿里云Logstash实例
    由于阿里云Logstash实例部署在专有网络VPC下,但在数据同步过程中,Logstash需要连接公网才能与Azure Event Hubs事件中心互通,因此需要通过配置NAT网关实现与公网连通,详情请参见配置NAT公网数据传输
    说明 对于自建的Logstash,需要购买与阿里云Elasticsearch在同一VPC下的ECS实例(已符合条件的ECS不需要重复购买,需要绑定弹性公网IP)。
  3. 准备Azure Event Hubs事件中心的自建环境。
    具体操作请参见Azure Event Hubs官方文档

步骤二:创建并配置Logstash管道

  1. 登录阿里云Elasticsearch控制台
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. 在左侧导航栏,单击Logstash实例,然后在Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理
  4. 单击创建管道
  5. 创建管道任务页面,输入管道ID并配置管道。
    本文使用的管道配置如下。
    input {
      azure_event_hubs {
         event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"]
         initial_position => "beginning"
         threads => 2
         decorate_events => true
         consumer_group => "group-kl"
         storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn"
         storage_container => "lettie_container"
       }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"]
        index => "test-log"
        password => "xxxxxx"
        user => "elastic"
      }
    }
    表 1. input参数说明
    参数 说明
    event_hub_connections 标识要读取的事件中心的连接字符串列表。连接字符串包括事件中心的EntityPath。更多详细说明,请参见event_hub_connections
    说明 每一个事件中心都会定义一个event_hub_connections参数,其他参数在各事件中心之间共享。
    initial_position 从事件中心读取数据的位置,可选值:beginning(默认)、end和look_back。更多详细说明,请参见initial position
    threads 处理事件的线程总数。更多详细说明,请参见threads
    decorate_events 是否同步事件中心的元数据,包括事件中心名称、consumer_group、processor_host、分区、偏移量、序列、时间戳和event_size。更多详细说明,请参见decorate events
    consumer_group 用于读取事件中心数据的消费者组。您需要专门为Logstash创建一个消费者组,并确保Logstash的所有节点都使用该消费者组,以便它们可以正常协同工作。更多详细说明,请参见consumer group
    storage_connection Blob账户存储的连接字符串。Blob账户存储会保留重启之间的偏移量,并确保Logstash的多个节点处理不同的分区。设置此值后,重启将在处理中断的地方开始。如果未设置此值,重启将从initial_position设置的值的地方开始。更多详细说明,请参见storage connection
    storage_container 用于持久保存偏移量并允许多个Logstash节点一起工作的存储容器的名称。更多详细说明,请参见storage container
    说明 为避免覆盖偏移量,建议使用不同的storage_container名称。如果同一份数据分别写入到不同的服务中,此参数需设置为不同的名称。
    表 2. output参数说明
    参数 说明
    hosts Elasticsearch服务的访问地址,需要设置为http://<阿里云Elasticsearch实例ID>.elasticsearch.aliyuncs.com:9200
    index 迁移后的索引名。
    user 访问Elasticsearch服务的用户名,默认为elastic。
    password 对应用户的密码。对于阿里云Elasticsearch,elastic用户的密码在创建实例时设定,如果忘记可进行重置,重置密码的注意事项和操作步骤请参见重置实例访问密码

    更多Config配置详情请参见Logstash配置文件说明

  6. 单击下一步,配置管道参数。
    管道参数配置
    参数 说明
    管道工作线程 并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。
    管道批大小 单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用该值。默认值:125。
    管道批延迟 创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。
    队列类型 用于事件缓冲的内部排队模型。可选值:
    • MEMORY:默认值。基于内存的传统队列。
    • PERSISTED:基于磁盘的ACKed队列(持久队列)。
    队列最大字节数 请确保该值小于您的磁盘总容量。默认值:1024 MB。
    队列检查点写入数 启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。
    警告 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。
  7. 单击保存或者保存并部署
    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

步骤三:验证结果

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
    登录Kibana控制台的具体操作,请参见登录Kibana控制台
    说明 本文以阿里云Elasticsearch 7.10.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
  2. 单击右上角的Dev tools
  3. Console中,执行如下命令,查看同步后数据。
    GET test-log3/_search
    {
      "query":{
        "match":{
          "message":"L23"
         }
       }
    }
    预期结果如下。预期结果