本文通过示例为您介绍E-MapReduce中的Flume组件,如何配置拦截器(Interceptor)、Channel选择器(Channel Selector)和Sink组逻辑处理器(Sink Processor)。
拦截器
拦截器的位置在Source和Channel之间,用于修改或丢弃Event。拦截图示意图如下。
拦截器的主要类型如下表。
类型 | 描述 |
---|---|
时间戳拦截器 | 在Event Header中添加Unix时间戳属性。 |
Host拦截器 | 在Event Header中添加Host属性。 |
静态拦截器 | 在Event Header中添加一个固定键值对属性。 |
Header拦截器 | 在Event Header中删除一个或多个属性。 |
UUID拦截器 | 在Event中设置一个UUID。如果应用层没有UUID,则可以使用该拦截器来默认添加。 |
Morphline拦截器 | 通过Morphline配置文件过滤Event或修改插入Event Header。 |
查找拦截器 | 使用Java正则表达式查找Event Body。 |
替换拦截器 | 使用Java正则表达式替换Event Body。 |
正则过滤拦截器 | 过滤配置匹配或者没有匹配上正则表达式的Event Body。 |
相关示例如下:
- 示例1:Event Body包含
1:2:3.4foobar5
,如果想配置正则过滤器,则配置如下。a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
修改后的Event Body不变,后续的Header中增加了
one=>1, two=>2, three=>3
。 - 示例2:Event Body包含
2012-10-18 18:47:57,614 some log line
,如果想配置时间过滤器,则配置如下。a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
修改后的Event Body不变,后续的Header中增加了
timestamp=>1350611220000
。
Channel选择器
Channel选择器用于在Source与Channel一对多场景下选择Channel。Channel选择器示意图如下。
Flume内置复制选择器(Replicating)和多路复用选择器(Multiplexing)两种选择器,默认为复制选择器。复制选择器会把所有Event发送到每个Channel,而多路复用选择器,则会按照一定的规则发送。多路复用选择器的示例如下。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
上述示例中,r1会选择性地将Event发送给c1、c2、c3和c4四个Channel。如果Header中的state属性值为CZ,则发送给c1,如果属性值为US,则发送给c2和c3,其他情况默认发送给c4。
Sink组逻辑处理器
Sink组逻辑处理器示意图请参见Channel选择器的示意图。
Sink组逻辑处理器用于多个Sink一同消费Channel队列中的数据,并把这些Sink配置为负载均衡或故障转移的工作方式。默认Sink与Channel是一对一的。配置为负载均衡方式,则根据配置的负载均衡机制,将Event分发到Sink中。配置为故障转移方式,则表示多个Sink是一主多备的工作方式,当工作的Sink中止后,Event会被转移到备用的Sink上。
相关示例如下:
- 示例1:故障转移方式
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
上述示例中有k1和k2两个Sink,权重分别是5和10,最大故障转移时间是10000毫秒。
- 示例2:负载均衡方式
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
上述示例中有k1和k2两个Sink,通过随机选择(random)方法进行负载分配,您也可以使用轮询(round_robin)方法。示例中的
a1.sinkgroups.g1.processor.backoff
参数表示是否以指数的形式退避失败的Sinks,设置为true,则Sink Processor会屏蔽故障的Sink。