通过自定义Source,您可以自行扩展更多的数据源,例如,加密的数据流、自建的服务端口和专有的数据存储中心等。本文通过示例为您介绍如何自定义Source。
前提条件
- 已创建集群,并且选择了Flume服务,详情请参见创建集群。
- 本地安装了文件传输工具(SSH Secure File Transfer Client)。
操作步骤
- 创建自定义Source。
- 添加pom依赖。
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies>
说明1.9.0
为Flume的版本信息,需要根据您Flume的版本信息进行替换。 - 编写自定义的Source类。
org.example.MySource
实现了一个按照特定格式打印日志的Source。package org.example; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; public class MySource extends AbstractSource implements Configurable, PollableSource { private String myDateFormat; private int myIntervalMS; @Override public void configure(Context context) { String myFormat = context.getString("dateFormat", "HH:mm:ss.SSS"); int myInterval = context.getInteger("intervalMS", 1000); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myDateFormat = myFormat; this.myIntervalMS = myInterval; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = new SimpleEvent(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat(myDateFormat); e.setBody((sdf.format(date)).getBytes()); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Exception e) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; e.printStackTrace(); } try { Thread.sleep(myIntervalMS); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
- 添加pom依赖。
- 将自定义的代码打成JAR包。在pom.xml所在目录,执行如下命令制作JAR包。
mvn clean package -DskipTests
- 使用文件传输工具,上传生成的JAR包至Flume的/opt/apps/FLUME/flume-current/lib目录。说明 非EMR集群时,请上传到您实际Flume的安装目录。
- 新增配置。
- 通过SSH方式登录集群,详情请参见登录集群。
- 执行以下命令,进入/conf目录。
cd /opt/apps/FLUME/flume-current/conf
- 执行以下命令,新增配置文件。
vim mysource.conf
说明 本文示例中配置文件为mysource.conf,您可以自定义文件名称。 - 添加如下内容至配置文件mysource.conf中。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = org.example.MySource a1.sources.r1.dateFormat = HH:mm:ss.SSS a1.sources.r1.intervalMS = 2000 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
说明 代码中的dateFormat
表示日期格式,intervalMS
表示间隔时间,单位ms。
- 启动Flume。
- 执行以下命令,进入/flume-current目录。
cd /opt/apps/FLUME/flume-current
- 执行以下命令,启动Flume。
bin/flume-ng agent --name a1 -c conf -f conf/mysource.conf -Dflume.root.logger=INFO,console
返回如下信息。2021-07-16 14:44:27,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2021-07-16 14:44:27,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 37 2E 37 30 35 14:44:27.705 } 2021-07-16 14:44:29,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 39 2E 37 30 39 14:44:29.709 } 2021-07-16 14:44:31,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 31 2E 37 30 39 14:44:31.709 } 2021-07-16 14:44:33,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 33 2E 37 31 30 14:44:33.710 } 2021-07-16 14:44:35,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 35 2E 37 31 30 14:44:35.710 } 2021-07-16 14:44:37,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 37 2E 37 31 30 14:44:37.710 } 2021-07-16 14:44:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 39 2E 37 31 30 14:44:39.710 }
- 执行以下命令,进入/flume-current目录。