Flume-DataHub插件是基于Flume开发的DataHub数据订阅/发布插件,可以将采集到的数据写入DataHub,也可以从DataHub读取数据写入其他系统。该插件遵守Flume插件开发规范,安装方便,可以很方便的向DataHub发布/订阅数据。
安装Flume插件
安装限制
JDK版本在 1.8及以上版本。
Apache Maven 版本3.x。
Flume-NG 版本
1.x
。
安装Flime
下载Flume(如已下载,可跳过该步骤)
$ tar zxvf apache-flume-1.11.0-bin.tar.gz
说明为后续方便描述,以下介绍以
${FLUME_HOME}
表示Flume主目录位置。安装Flume-datahub。
直接安装。
下载Flume-datahub插件aliyun-flume-datahub-sink-2.0.9.tar.gz
解压flume插件并放在
${FLUME_HOME}/plugins.d
目录下$ tar aliyun-flume-datahub-sink-x.x.x.tar.gz $ cd aliyun-flume-datahub-sink-x.x.x $ mkdir ${FLUME_HOME}/plugins.d $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
源码安装。
编译并安装。
$ cd aliyun-maxcompute-data-collectors $ mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true $ cd flume-plugin/target $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
参数介绍
sink参数介绍
Source 参数
案例介绍
Sink使用案例
案例一:DELIMITED serializer
案例二:REGEX serializer
案例三:Flume Taildir Source
案例四:JSON serializer
Source使用案例
读取DataHub中数据至其它系统
Flume metric
DataHub-Flume 支持Flume的内置计数监控器,用户可以利用监控器来监控自己的Flume插件的运行情况。DataHub-Flume插件的Sink和Source都支持metric信息显示,具体参数含义可查看下表(只含DataHub相关的参数,更多参数含义参考:Flume官方文档)。
DatahubSink
DatahubSource
lume监控
lume提供了多种监控方法,本文以HTTP监控为例,介绍Flume监控工具的使用,使用HTTP方式监控,只需要在Flume插件启动时增加两个参数即可,-Dflume.monitoring.type=http -Dflume.monitoring.port=1234
,其中type将监控方式指定为http,port为指定的端口号。使用示例如下:
bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
插件成功启动之后,便可以登录Web界面进行查看。地址为 https://ip:1234/metrics
更多的监控方法可以参考Flume官方文档 。
常见问题
flume启动报错org.apache.flume.ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
flume默认堆内存20MB,配置的batchSize过大时,flume使用的堆内存会超出20M。
解决方案1:调小batchSize。
解决方案2:调大flume最大堆内存。
$ vim bin/flume-ng
JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"
DataHub-Flume插件是否支持JSON格式?
目前不支持,不过用户可以通过自定义正则表达式进行数据解析,或者修改DataHub-Flume插件代码,添加JSONEvent进行支持。
DataHub-Flume插件支持Blob Topic吗?
目前DataHub-Flume插件仅支持Tuple Topic,暂不支持blob。
flume 报错 org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count
channel已满,source数据写入channel失败。可以在配置文件中修改channel capacity解决,并且可以适当降低datahub source的batchSize。
使用旧版本flume时报错,可能会因为jar包冲突导致无法正常启动。
错误场景:使用flume1.6时,启动时报错:
java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;
。因为新版本的插件依赖的jar包和flume本身依赖的jar包版本不一致,使用了flume依赖的旧版本jar包导致新版本的method找不到。如何处理:删除${FLUME_HOME}/lib目录下的三个jar包即可。
jackson-annotations-2.3.0.jar
jackson-databind-2.3.1.jar
jackson-annotations-2.3.0.jar
使用flume采集数据时,空字符串自动转为null
在flume插件2.0.2中对于非空字符串会做trim,空字符串直接转为null。flume插件2.0.3中已经优化掉,非空字符串写入DataHub依旧为空字符串。
启动报错Cannot invoke "com.google.common.cache.LoadingCache.get(Object)" because"com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null]
删除Flume lib文件夹中的guava 、zstd的 jar包文件,重新启动。