通过自定义Sink,您可以自行扩展更多的数据存储组件,或者根据需求裁剪和优化现有Sink的功能。本文通过示例为您介绍如何自定义Sink。
前提条件
- 已创建集群,并且选择了Flume服务,详情请参见创建集群。
- 本地安装了文件传输工具(SSH Secure File Transfer Client)。
操作步骤
- 创建自定义Sink。
- 添加pom依赖。
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies>
说明1.9.0
为Flume的版本信息,需要根据您创建集群的信息替换。 - 编写自定义的Sink类。
org.example.MySink
仿照LoggerSink实现了一个默认Buffer更大的Sink。package org.example; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventHelper; import org.apache.flume.sink.AbstractSink; import org.apache.flume.sink.LoggerSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MySink.class); // Default Max bytes to dump public static final int DEFAULT_MAX_BYTE_DUMP = 32; // Max number of bytes to be dumped private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP; public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog"; private String myProp; @Override public void configure(Context context) { this.maxBytesToLog = context.getInteger(MAX_BYTES_DUMP_KEY, DEFAULT_MAX_BYTE_DUMP); } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = Status.READY; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event = null; try { txn.begin(); // This try clause includes whatever Channel operations you want to do event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); if (event != null) { if (logger.isInfoEnabled()) { logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog)); } } else { // No event found, request back-off semantics from the sink runner status = Status.BACKOFF; } txn.commit(); } catch (Exception e) { txn.rollback(); throw new EventDeliveryException("Failed to log event: " + event, e); } finally { txn.close(); } return status; } }
- 添加pom依赖。
- 将自定义的代码打成JAR包。在pom.xml所在目录,执行如下命令制作JAR包。
mvn clean package -DskipTests
- 使用文件传输工具,上传生成的JAR包至Flume的/opt/apps/FLUME/flume-current/lib目录。说明 非EMR集群时,请上传到您实际Flume的安装目录。
- 新增配置。
- 通过SSH方式登录集群,详情请参见登录集群。
- 执行以下命令,进入/conf目录。
cd /opt/apps/FLUME/flume-current/conf
- 执行以下命令,新增配置文件。
vim custom_sink.conf
说明 本文示例中配置文件为custom_sink.conf,您可以自定义文件名称。 - 添加如下内容至配置文件custom_sink.conf中。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.StressSource a1.sources.r1.maxEventsPerSecond = 1 a1.sources.r1.batchSize = 1 a1.sources.r1.maxTotalEvents = 100 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.maxBytesToLog = 64 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
说明 代码中的maxBytesToLog
表示Buffer最大字节数。
- 启动Flume。
- 执行以下命令,进入/flume-current目录。
cd /opt/apps/FLUME/flume-current
- 执行以下命令,启动Flume。
bin/flume-ng agent --name a1 -c conf -f conf/custom_sink.conf -Dflume.root.logger=INFO,console
返回如下信息。2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms 2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2021-07-16 14:49:29,526 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.StressSource.doStart(StressSource.java:169)] Stress source doStart finished 2021-07-16 14:49:29,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:30,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:31,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:32,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ } 2021-07-16 14:49:33,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ 00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
- 执行以下命令,进入/flume-current目录。