本文通过示例为您介绍E-MapReduce中的Flume组件,如何配置拦截器(Interceptor)、Channel选择器(Channel Selector)和Sink组逻辑处理器(Sink Processor)。

拦截器

拦截器的位置在Source和Channel之间,用于修改或丢弃Event。拦截图示意图如下。interceptor
拦截器的主要类型如下表。
类型 描述
时间戳拦截器 在Event Header中添加Unix时间戳属性。
Host拦截器 在Event Header中添加Host属性。
静态拦截器 在Event Header中添加一个固定键值对属性。
Header拦截器 在Event Header中删除一个或多个属性。
UUID拦截器 在Event中设置一个UUID。如果应用层没有UUID,则可以使用该拦截器来默认添加。
Morphline拦截器 通过Morphline配置文件过滤Event或修改插入Event Header。
查找拦截器 使用Java正则表达式查找Event Body。
替换拦截器 使用Java正则表达式替换Event Body。
正则过滤拦截器 过滤配置匹配或者没有匹配上正则表达式的Event Body。
相关示例如下:
  • 示例1:Event Body包含1:2:3.4foobar5,如果想配置正则过滤器,则配置如下。
    a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
    a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
    a1.sources.r1.interceptors.i1.serializers.s1.name = one
    a1.sources.r1.interceptors.i1.serializers.s2.name = two
    a1.sources.r1.interceptors.i1.serializers.s3.name = three

    修改后的Event Body不变,后续的Header中增加了one=>1, two=>2, three=>3

  • 示例2:Event Body包含2012-10-18 18:47:57,614 some log line,如果想配置时间过滤器,则配置如下。
    a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
    a1.sources.r1.interceptors.i1.serializers = s1
    a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
    a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
    a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

    修改后的Event Body不变,后续的Header中增加了timestamp=>1350611220000

Channel选择器

Channel选择器用于在Source与Channel一对多场景下选择Channel。Channel选择器示意图如下。Selector
Flume内置复制选择器(Replicating)和多路复用选择器(Multiplexing)两种选择器,默认为复制选择器。复制选择器会把所有Event发送到每个Channel,而多路复用选择器,则会按照一定的规则发送。多路复用选择器的示例如下。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

上述示例中,r1会选择性地将Event发送给c1、c2、c3和c4四个Channel。如果Header中的state属性值为CZ,则发送给c1,如果属性值为US,则发送给c2和c3,其他情况默认发送给c4。

Sink组逻辑处理器

Sink组逻辑处理器示意图请参见Channel选择器的示意图。

Sink组逻辑处理器用于多个Sink一同消费Channel队列中的数据,并把这些Sink配置为负载均衡或故障转移的工作方式。默认Sink与Channel是一对一的。配置为负载均衡方式,则根据配置的负载均衡机制,将Event分发到Sink中。配置为故障转移方式,则表示多个Sink是一主多备的工作方式,当工作的Sink中止后,Event会被转移到备用的Sink上。Selector
相关示例如下:
  • 示例1:故障转移方式
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000

    上述示例中有k1和k2两个Sink,权重分别是5和10,最大故障转移时间是10000毫秒。

  • 示例2:负载均衡方式
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random

    上述示例中有k1和k2两个Sink,通过随机选择(random)方法进行负载分配,您也可以使用轮询(round_robin)方法。示例中的a1.sinkgroups.g1.processor.backoff参数表示是否以指数的形式退避失败的Sinks,设置为true,则Sink Processor会屏蔽故障的Sink。