本文介绍如何使用E-MapReduce(简称EMR)的Flume实时同步日志服务(LogHub)的数据至E-MapReduce集群的HDFS,并根据数据记录的时间戳将数据存入HDFS相应的分区中。

背景信息

您可以借助日志服务的Logtail工具,将需要同步的数据实时采集并上传到LogHub,再使用E-MapReduce的Flume将LogHub的数据同步至EMR集群的HDFS。

采集数据到日志服务的LogHub的详细步骤参见数据采集概述

前提条件

已创建DataLake集群,并且选择了Flume服务,详情请参见创建集群

操作步骤

  1. 配置Flume。
    1. 进入Flume的配置页面。
      1. 登录EMR on ECS控制台
      2. 在顶部菜单栏处,根据实际情况选择地域和资源组
      3. 集群管理页面,单击目标集群操作列的集群服务
      4. 集群服务页面,单击FLUME服务区域的配置
    2. 单击flume-conf.properties页签。
      本文示例采用的是全局配置方式,如果您想按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置
    3. flume-conf.properties的参数值中,添加以下内容。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.loghub.LogHubSource
      default-agent.sources.source1.endpoint = <yourLogHubEndpoint>
      default-agent.sources.source1.project = canaltest
      default-agent.sources.source1.logstore = canal
      default-agent.sources.source1.accessKeyId = yHiu*******BG2s
      default-agent.sources.source1.accessKey = ABctuw0M***************iKKljZy
      default-agent.sources.source1.useRecordTime = true
      default-agent.sources.source1.consumerGroup = consumer_1
      
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
      default-agent.sinks.k1.hdfs.fileType = DataStream
      default-agent.sinks.k1.hdfs.rollInterval = 3600
      default-agent.sinks.k1.hdfs.round = true
      default-agent.sinks.k1.hdfs.roundValue = 60
      default-agent.sinks.k1.hdfs.roundUnit = minute
      default-agent.sinks.k1.hdfs.rollSize = 0
      default-agent.sinks.k1.hdfs.rollCount = 0
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 2000
      default-agent.channels.c1.transactionCapacity = 2000
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      参数 说明
      default-agent.sources.source1.type 固定值为org.apache.flume.source.loghub.LogHubSource。
      default-agent.sources.source1.endpoint LogHub的Endpoint。
      说明 如果使用VPC或经典网络的Endpoint,需要保证与EMR集群在同一个地区;如果使用公网Endpoint,需要保证运行Flume agent的节点有公网IP。
      default-agent.sources.source1.project LogHub的项目名。
      default-agent.sources.source1.logstore LogStore名称。
      default-agent.sources.source1.accessKeyId 阿里云的AccessKey ID。
      default-agent.sources.source1.accessKey 阿里云的AccessKey Secret。
      default-agent.sources.source1.useRecordTime 设置为true。

      默认值为false。如果Header中没有Timestamp属性,接收Event的时间戳会被加入到Header中。但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为Timestamp。

      default-agent.sources.source1.consumerGroup 消费组名称,默认值为consumer_1。
      default-agent.sources.source1.consumerPosition 消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费。
      • begin:表示从最早的数据开始消费。
      • special:表示从指定的时间点开始消费。

        在配置为special时,需要配置startTime为开始消费的时间点,单位为秒。

      首次运行后LogHub服务端会记录消费组的消费点,此时如果想更改 consumerPosition,可以清除LogHub的消费组状态,或者更改配置consumerGroup为新的消费组。
      default-agent.sources.source1.heartbeatInterval 消费组与服务端维持心跳的间隔,单位是毫秒,默认为30000毫秒。
      default-agent.sources.source1.fetchInOrder 相同Key的数据是否按序消费,默认值为false。
      default-agent.sources.source1.batchSize 通用的source batch配置,在一个批处理中写入通道的最大消息数。
      default-agent.sources.source1.batchDurationMillis 通用的source batch配置,在将批处理写入通道之前的最大时间。
      default-agent.sources.source1.backoffSleepIncrement 通用的source sleep配置,表示LogHub没有数据时触发Sleep的初始和增量等待时间。
      default-agent.sources.source1.maxBackoffSleep 通用的source sleep配置,表示LogHub没有数据时触发Sleep的最大等待时间。
      default-agent.sinks.k1.hdfs.path HDFS存储路径。例如,/tmp/flume-data/loghub/datetime=%y%m%d/hour=%H。
      default-agent.sinks.k1.hdfs.fileType 保存到HDFS上的文件类型。固定为DataStream。
      default-agent.sinks.k1.hdfs.rollInterval 设置多久生成一个新的文件,单位为秒。例如,3600。
      default-agent.sinks.k1.hdfs.round 用于HDFS文件按照时间分区,时间戳向下取整。默认值为true。
      default-agent.sinks.k1.hdfs.roundValue default-agent.sinks.k1.hdfs.round设置为true,配合default-agent.sinks.k1.hdfs.roundUnit时间单位一起使用。

      例如,default-agent.sinks.k1.hdfs.roundUnit值为minute,该值设置为60,则表示60分钟之内的数据写到一个文件中,相当于每60分钟生成一个文件。

      default-agent.sinks.k1.hdfs.roundUnit 按时间分区使用的时间单位。默认值为minute。
      default-agent.sinks.k1.hdfs.rollSize 当临时文件达到该参数值时,滚动成目标文件,单位:byte。

      该值设置为0,则表示文件不根据文件大小滚动生成。

      default-agent.sinks.k1.hdfs.rollCount 当Event数据达到该数量时,将临时文件滚动生成目标文件。

      该值设置为0,则表示文件不根据Event数滚动生成。

      default-agent.channels.c1.capacity 通道中存储的最大事件数。例如,2000。
      default-agent.channels.c1.transactionCapacity 每次Channel从Source获取事件或推送给Sink的最大事件数。例如,2000。

      配置项请遵循开源Flume内容,详情请参见Avro SourceTaildir SourceSinkChannel

    4. 保存配置。
      1. 单击下方的保存
      2. 在弹出的对话框中,输入执行原因,单击确定
  2. 启动服务。
    1. 在FLUME服务页面,选择更多操作 > 重启
    2. 在弹出的对话框中,输入执行原因,单击确定
    3. 确认对话框中,单击确定
  3. 启动服务。
    1. 在FLUME服务页面,选择更多操作 > 重启
    2. 在弹出的对话框中,输入执行原因,单击确定
    3. 确认对话框中,单击确定
    启动成功后,您可以看到配置的HDFS路径下按照Record Timestamp存储的日志数据。