通过自定义Sink,您可以自行扩展更多的数据存储组件,或者根据需求裁剪和优化现有Sink的功能。本文通过示例为您介绍如何自定义Sink。

前提条件

  • 已创建集群,并且选择了Flume服务,详情请参见创建集群
  • 本地安装了文件传输工具(SSH Secure File Transfer Client)。

操作步骤

  1. 创建自定义Sink。
    1. 添加pom依赖。
      <dependencies>
          <dependency>
              <groupId>org.apache.flume</groupId>
              <artifactId>flume-ng-core</artifactId>
              <version>1.9.0</version>
          </dependency>
      </dependencies>
      说明 1.9.0为Flume的版本信息,需要根据您创建集群的信息替换。
    2. 编写自定义的Sink类。
      org.example.MySink仿照LoggerSink实现了一个默认Buffer更大的Sink。
      package org.example;
      
      import org.apache.flume.Channel;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.Transaction;
      import org.apache.flume.conf.Configurable;
      import org.apache.flume.event.EventHelper;
      import org.apache.flume.sink.AbstractSink;
      import org.apache.flume.sink.LoggerSink;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class MySink extends AbstractSink implements Configurable {
          private static final Logger logger = LoggerFactory.getLogger(MySink.class);
      
          // Default Max bytes to dump
          public static final int DEFAULT_MAX_BYTE_DUMP = 32;
      
          // Max number of bytes to be dumped
          private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
      
          public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog";
      
          private String myProp;
      
          @Override
          public void configure(Context context) {
              this.maxBytesToLog = context.getInteger(MAX_BYTES_DUMP_KEY, DEFAULT_MAX_BYTE_DUMP);
          }
      
          @Override
          public void start() {
              // Initialize the connection to the external repository (e.g. HDFS) that
              // this Sink will forward Events to ..
          }
      
          @Override
          public void stop () {
              // Disconnect from the external respository and do any
              // additional cleanup (e.g. releasing resources or nulling-out
              // field values) ..
          }
      
          @Override
          public Status process() throws EventDeliveryException {
              Status status = Status.READY;
      
              // Start transaction
              Channel ch = getChannel();
              Transaction txn = ch.getTransaction();
              Event event = null;
              try {
                  txn.begin();
                  // This try clause includes whatever Channel operations you want to do
                  event = ch.take();
                  // Send the Event to the external repository.
                  // storeSomeData(e);
      
                  if (event != null) {
                      if (logger.isInfoEnabled()) {
                          logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
                      }
                  } else {
                      // No event found, request back-off semantics from the sink runner
                      status = Status.BACKOFF;
                  }
                  txn.commit();
              } catch (Exception e) {
                  txn.rollback();
                  throw new EventDeliveryException("Failed to log event: " + event, e);
              } finally {
                  txn.close();
              }
              return status;
          }
      }
  2. 将自定义的代码打成JAR包。
    pom.xml所在目录,执行如下命令制作JAR包。
    mvn clean package -DskipTests
  3. 使用文件传输工具,上传生成的JAR包至Flume的/opt/apps/FLUME/flume-current/lib目录。
    说明 非EMR集群时,请上传到您实际Flume的安装目录。
  4. 新增配置。
    1. 通过SSH方式登录集群,详情请参见登录集群
    2. 执行以下命令,进入/conf目录。
      cd /opt/apps/FLUME/flume-current/conf
    3. 执行以下命令,新增配置文件。
      vim custom_sink.conf
      说明 本文示例中配置文件为custom_sink.conf,您可以自定义文件名称。
    4. 添加如下内容至配置文件custom_sink.conf中。
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.r1.type = org.apache.flume.source.StressSource
      a1.sources.r1.maxEventsPerSecond = 1
      a1.sources.r1.batchSize = 1
      a1.sources.r1.maxTotalEvents = 100
      
      a1.sinks.k1.type = org.example.MySink
      a1.sinks.k1.maxBytesToLog = 64
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      说明 代码中的maxBytesToLog表示Buffer最大字节数。
  5. 启动Flume。
    1. 执行以下命令,进入/flume-current目录。
      cd /opt/apps/FLUME/flume-current
    2. 执行以下命令,启动Flume。
      bin/flume-ng agent --name a1 -c conf -f conf/custom_sink.conf  -Dflume.root.logger=INFO,console
      返回如下信息。
      2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
      2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms
      2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
      2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
      2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
      2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
      2021-07-16 14:49:29,526 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.StressSource.doStart(StressSource.java:169)] Stress source doStart finished
      2021-07-16 14:49:29,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:30,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:31,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:32,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:33,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }