当您需要将Azure Event Hubs事件中心中的数据同步到阿里云Elasticsearch中时,可使用阿里云Logstash的管道配置功能实现。本文介绍具体的实现方法。
操作流程
步骤一:准备环境与实例
- 创建阿里云Elasticsearch实例,并开启自动创建索引功能。本文使用7.10版本的实例。具体操作请参见创建阿里云Elasticsearch实例和配置YML参数。
- 创建阿里云Logstash实例并配置NAT公网数据传输。本文使用7.4版本的实例。具体操作请参见创建阿里云Logstash实例。由于阿里云Logstash实例部署在专有网络VPC下,但在数据同步过程中,Logstash需要连接公网才能与Azure Event Hubs事件中心互通,因此需要通过配置NAT网关实现与公网连通,详情请参见配置NAT公网数据传输。说明 对于自建的Logstash,需要购买与阿里云Elasticsearch在同一VPC下的ECS实例(已符合条件的ECS不需要重复购买,需要绑定弹性公网IP)。
- 准备Azure Event Hubs事件中心的自建环境。具体操作请参见Azure Event Hubs官方文档。
步骤二:创建并配置Logstash管道
- 登录阿里云Elasticsearch控制台。
- 进入目标实例。
- 在顶部菜单栏处,选择地域。
- 在左侧导航栏,单击Logstash实例,然后在Logstash实例中单击目标实例ID。
- 在左侧导航栏,单击管道管理。
- 单击创建管道。
- 在创建管道任务页面,输入管道ID并配置管道。本文使用的管道配置如下。
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"] initial_position => "beginning" threads => 2 decorate_events => true consumer_group => "group-kl" storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn" storage_container => "lettie_container" } } filter { } output { elasticsearch { hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"] index => "test-log" password => "xxxxxx" user => "elastic" } }
表 1. input参数说明 参数 说明 event_hub_connections 标识要读取的事件中心的连接字符串列表。连接字符串包括事件中心的EntityPath。更多详细说明,请参见event_hub_connections。 说明 每一个事件中心都会定义一个event_hub_connections参数,其他参数在各事件中心之间共享。initial_position 从事件中心读取数据的位置,可选值:beginning(默认)、end和look_back。更多详细说明,请参见initial position 。 threads 处理事件的线程总数。更多详细说明,请参见threads。 decorate_events 是否同步事件中心的元数据,包括事件中心名称、consumer_group、processor_host、分区、偏移量、序列、时间戳和event_size。更多详细说明,请参见decorate events。 consumer_group 用于读取事件中心数据的消费者组。您需要专门为Logstash创建一个消费者组,并确保Logstash的所有节点都使用该消费者组,以便它们可以正常协同工作。更多详细说明,请参见consumer group。 storage_connection Blob账户存储的连接字符串。Blob账户存储会保留重启之间的偏移量,并确保Logstash的多个节点处理不同的分区。设置此值后,重启将在处理中断的地方开始。如果未设置此值,重启将从initial_position设置的值的地方开始。更多详细说明,请参见storage connection。 storage_container 用于持久保存偏移量并允许多个Logstash节点一起工作的存储容器的名称。更多详细说明,请参见storage container。 说明 为避免覆盖偏移量,建议使用不同的storage_container名称。如果同一份数据分别写入到不同的服务中,此参数需设置为不同的名称。表 2. output参数说明 参数 说明 hosts Elasticsearch服务的访问地址,需要设置为 http://<阿里云Elasticsearch实例ID>.elasticsearch.aliyuncs.com:9200
。index 迁移后的索引名。 user 访问Elasticsearch服务的用户名,默认为elastic。 password 对应用户的密码。对于阿里云Elasticsearch,elastic用户的密码在创建实例时设定,如果忘记可进行重置,重置密码的注意事项和操作步骤请参见重置实例访问密码。 更多Config配置详情请参见Logstash配置文件说明。
- 单击下一步,配置管道参数。
参数 说明 管道工作线程 并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。 管道批大小 单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用该值。默认值:125。 管道批延迟 创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。 队列类型 用于事件缓冲的内部排队模型。可选值: - MEMORY:默认值。基于内存的传统队列。
- PERSISTED:基于磁盘的ACKed队列(持久队列)。
队列最大字节数 请确保该值小于您的磁盘总容量。默认值:1024 MB。 队列检查点写入数 启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。 警告 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。 - 单击保存或者保存并部署。
- 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
- 保存并部署:保存并且部署后,会触发实例重启,使配置生效。
步骤三:验证结果
- 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。登录Kibana控制台的具体操作,请参见登录Kibana控制台。说明 本文以阿里云Elasticsearch 7.10.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
- 单击右上角的Dev tools。
- 在Console中,执行如下命令,查看同步后数据。
GET test-log3/_search { "query":{ "match":{ "message":"L23" } } }
预期结果如下。