本文介绍如何使用E-MapReduce(简称EMR)的Flume实时同步日志服务(LogHub)的数据至E-MapReduce集群的HDFS,并根据数据记录的时间戳将数据存入HDFS相应的分区中。
背景信息
您可以借助日志服务的Logtail工具,将需要同步的数据实时采集并上传到LogHub,再使用E-MapReduce的Flume将LogHub的数据同步至EMR集群的HDFS。
采集数据到日志服务的LogHub的详细步骤参见数据采集概述。
前提条件
已创建DataLake集群,并且选择了Flume服务,详情请参见创建集群。
操作步骤
- 配置Flume。
- 进入Flume的配置页面。
- 登录EMR on ECS控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 在集群管理页面,单击目标集群操作列的集群服务。
- 在集群服务页面,单击FLUME服务区域的配置。
- 单击flume-conf.properties页签。本文示例采用的是全局配置方式,如果您想按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置。
- 在flume-conf.properties的参数值中,添加以下内容。
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.loghub.LogHubSource default-agent.sources.source1.endpoint = <yourLogHubEndpoint> default-agent.sources.source1.project = canaltest default-agent.sources.source1.logstore = canal default-agent.sources.source1.accessKeyId = yHiu*******BG2s default-agent.sources.source1.accessKey = ABctuw0M***************iKKljZy default-agent.sources.source1.useRecordTime = true default-agent.sources.source1.consumerGroup = consumer_1 default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H default-agent.sinks.k1.hdfs.fileType = DataStream default-agent.sinks.k1.hdfs.rollInterval = 3600 default-agent.sinks.k1.hdfs.round = true default-agent.sinks.k1.hdfs.roundValue = 60 default-agent.sinks.k1.hdfs.roundUnit = minute default-agent.sinks.k1.hdfs.rollSize = 0 default-agent.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 2000 default-agent.channels.c1.transactionCapacity = 2000 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
参数 说明 default-agent.sources.source1.type 固定值为org.apache.flume.source.loghub.LogHubSource。 default-agent.sources.source1.endpoint LogHub的Endpoint。 说明 如果使用VPC或经典网络的Endpoint,需要保证与EMR集群在同一个地区;如果使用公网Endpoint,需要保证运行Flume agent的节点有公网IP。default-agent.sources.source1.project LogHub的项目名。 default-agent.sources.source1.logstore LogStore名称。 default-agent.sources.source1.accessKeyId 阿里云的AccessKey ID。 default-agent.sources.source1.accessKey 阿里云的AccessKey Secret。 default-agent.sources.source1.useRecordTime 设置为true。 默认值为false。如果Header中没有Timestamp属性,接收Event的时间戳会被加入到Header中。但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为Timestamp。
default-agent.sources.source1.consumerGroup 消费组名称,默认值为consumer_1。 default-agent.sources.source1.consumerPosition 消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费。 - begin:表示从最早的数据开始消费。
- special:表示从指定的时间点开始消费。
在配置为special时,需要配置startTime为开始消费的时间点,单位为秒。
default-agent.sources.source1.heartbeatInterval 消费组与服务端维持心跳的间隔,单位是毫秒,默认为30000毫秒。 default-agent.sources.source1.fetchInOrder 相同Key的数据是否按序消费,默认值为false。 default-agent.sources.source1.batchSize 通用的source batch配置,在一个批处理中写入通道的最大消息数。 default-agent.sources.source1.batchDurationMillis 通用的source batch配置,在将批处理写入通道之前的最大时间。 default-agent.sources.source1.backoffSleepIncrement 通用的source sleep配置,表示LogHub没有数据时触发Sleep的初始和增量等待时间。 default-agent.sources.source1.maxBackoffSleep 通用的source sleep配置,表示LogHub没有数据时触发Sleep的最大等待时间。 default-agent.sinks.k1.hdfs.path HDFS存储路径。例如,/tmp/flume-data/loghub/datetime=%y%m%d/hour=%H。 default-agent.sinks.k1.hdfs.fileType 保存到HDFS上的文件类型。固定为DataStream。 default-agent.sinks.k1.hdfs.rollInterval 设置多久生成一个新的文件,单位为秒。例如,3600。 default-agent.sinks.k1.hdfs.round 用于HDFS文件按照时间分区,时间戳向下取整。默认值为true。 default-agent.sinks.k1.hdfs.roundValue 当default-agent.sinks.k1.hdfs.round设置为true,配合default-agent.sinks.k1.hdfs.roundUnit时间单位一起使用。 例如,default-agent.sinks.k1.hdfs.roundUnit值为minute,该值设置为60,则表示60分钟之内的数据写到一个文件中,相当于每60分钟生成一个文件。
default-agent.sinks.k1.hdfs.roundUnit 按时间分区使用的时间单位。默认值为minute。 default-agent.sinks.k1.hdfs.rollSize 当临时文件达到该参数值时,滚动成目标文件,单位:byte。 该值设置为0,则表示文件不根据文件大小滚动生成。
default-agent.sinks.k1.hdfs.rollCount 当Event数据达到该数量时,将临时文件滚动生成目标文件。 该值设置为0,则表示文件不根据Event数滚动生成。
default-agent.channels.c1.capacity 通道中存储的最大事件数。例如,2000。 default-agent.channels.c1.transactionCapacity 每次Channel从Source获取事件或推送给Sink的最大事件数。例如,2000。 配置项请遵循开源Flume内容,详情请参见Avro Source、Taildir Source、Sink和Channel。
- 保存配置。
- 单击下方的保存。
- 在弹出的对话框中,输入执行原因,单击确定。
- 进入Flume的配置页面。
- 启动服务。
- 在FLUME服务页面,选择更多操作 > 重启。
- 在弹出的对话框中,输入执行原因,单击确定。
- 在确认对话框中,单击确定。
- 启动服务。
- 在FLUME服务页面,选择更多操作 > 重启。
- 在弹出的对话框中,输入执行原因,单击确定。
- 在确认对话框中,单击确定。
启动成功后,您可以看到配置的HDFS路径下按照Record Timestamp存储的日志数据。