Flume插件

Flume-DataHub插件是基于Flume开发的DataHub数据订阅/发布插件,可以将采集到的数据写入DataHub,也可以从DataHub读取数据写入其他系统。该插件遵守Flume插件开发规范,安装方便,可以很方便的向DataHub发布/订阅数据。

安装Flume插件

安装限制

  • JDK版本在 1.8及以上版本

  • Apache Maven 版本3.x

  • Flume-NG 版本1.x

安装Flime

  1. 下载Flume(如已下载,可跳过该步骤)

    $ tar zxvf apache-flume-1.11.0-bin.tar.gz
    说明

    为后续方便描述,以下介绍以${FLUME_HOME}表示Flume主目录位置。

  2. 安装Flume-datahub。

    • 直接安装。

      1. 下载Flume-datahub插件aliyun-flume-datahub-sink-2.0.9.tar.gz

      2. 解压flume插件并放在${FLUME_HOME}/plugins.d目录下

        $ tar aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ cd aliyun-flume-datahub-sink-x.x.x
        $ mkdir ${FLUME_HOME}/plugins.d
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
    • 源码安装。

      1. 下载源码aliyun-maxcompute-data-collectors

      2. 编译并安装。

        $ cd aliyun-maxcompute-data-collectors
        $ mvn clean package -DskipTests=true  -Dmaven.javadoc.skip=true
        $ cd flume-plugin/target
        $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d

参数介绍

sink参数介绍

名称

默认值

是否必须

描述

datahub.endPoint

-

必须

阿里云datahub的服务地址

datahub.accessId

-

必须

阿里云access id

datahub.accessKey

-

必须

阿里云access key

datahub.project

-

必须

datahub项目名称

datahub.topic

-

必须

datahub topic名称

datahub.shard.ids

所有shard

可选

写入datahub的指定shard列表,以”,”分隔,例如 0,1,2。每次从shard列表随机选择一个shard写入DataHub。在发生shard分裂或者合并时,如果没有指定该参数,那么shard分裂或者合并后,flume会自动调整shard列表,否则需要用户手动修改配置文件。

datahub.enablePb

true

可选

是否使用pb传输,部分专有云不支持需要手动设置为false

datahub.compressType

none

可选

是否压缩传输,目前支持 LZ4, DEFLATE

datahub.batchSize

1000

可选

datahub每次发送的最大数据量

datahub.maxBufferSize

2*1024*1024

可选

datahub单次请求写入数据量的最大值(单位:Byte)。不建议修改该参数,单次写入数据量过大可能写入失败

datahub.batchTimeout

5

可选

如果数据量没有达到batchSize,向datahub同步数据之前等待的时间(单位:秒)

datahub.retryTimes

3

可选

数据同步失败重试次数

datahub.retryInterval

5

可选

数据同步失败重试间隔(单位:秒)

datahub.dirtyDataContinue

true

可选

遇到脏数据是否继续处理,为true时,会自动将脏数据以,分隔符写入脏数据文件,不影响后续数据的处理

datahub.dirtyDataFile

DataHub-Flume-dirty-file

可选

脏数据文件

serializer

-

必须

数据解析方式,目前支持DELIMITED(分,JSON(每行为单层Json)和REGEX(正则表达式)

serializer.delimiter

,

可选

数据字段分割符,如果要使用特殊字符需要添加双引号,例如”\t”

serializer.regex

(.*)

可选

数据解析的正则表达式,每个字段的数据被解析成一个group

serializer.fieldnames

-

必须

输入数据字段到datahub字段的映射,以输入的顺序标示字段,如果要跳过某个字段, 不指定列名即可,例如 c1,c2,,c3,表示将输入数据的第一、二、四字段和datahub的c1,c2,c3字段进行匹配。

serializer.charset

UTF-8

可选

数据解析编码格式

Source 参数

名称

默认值

是否必须

描述

datahub.endPoint

-

必须

阿里云datahub的服务地址

datahub.accessId

-

必须

阿里云access id

datahub.accessKey

-

必须

阿里云access key

datahub.project

-

必须

datahub项目名称

datahub.topic

-

必须

datahub topic名称

datahub.subId

-

必须

datahub 订阅 id

datahub.startTime

-

可选

datahub 指定时间点进行读数据,格式为yyyy-MM-dd HH:mm:ss,使用该参数会首先重置订阅,然后根据订阅读取数据。

datahub.shard.ids

-

可选

读取datahub的指定shard列表,以”,”分隔,例如 0,1,2。每次读数据时会从shard列表随机选择一个shard进行消费。如不指定,则采用协同消费进行数据读取。不建议使用该参数,如果配置了多个source的情况下,不指定该参数时,协同消费会自动分配shard,尽可能保证每个source负载均衡。

datahub.enablePb

true

可选

是否使用pb传输,部分专有云不支持需要手动设置为false

datahub.compressType

none

可选

是否压缩传输,目前支持 LZ4, DEFLATE

datahub.batchSize

1000

可选

DataHub每次读取的最大数据量

datahub.batchTimeout

5

可选

如果数据量没有达到batchSize,向datahub同步数据之前等待的时间(单位:秒)

datahub.retryTimes

3

可选

数据读取失败重试次数,重试间隔默认为1S,不可调整

datahub.autoCommit

true

可选

设为true表示由consumer自动提交点位,可能发生数据未消费但是点位被提交的可能,修改为false表示数据被提交到flume channel之后才会提交该点位

datahub.offsetCommitTimeout

30

可选

自动提交点位时间间隔(单位:秒)

datahub.sessionTimeout

60

可选

source功能采取协同消费实现,协同消费超时没有发送心跳,则session会自动关闭

serializer

-

必须

数据解析方式,目前支持DELIMITED(分隔符),数据的每个字段将会以datahub schema顺序写成一行,并以delimiter进行分隔

serializer.delimiter

,

可选

数据字段分割符,如果要使用特殊字符需要添加双引号,例如”\t”

serializer.charset

UTF-8

可选

数据解析编码格式

案例介绍

Sink使用案例

案例一:DELIMITED serializer

  1. 准备测试数据。

    DELIMITED解析数据时将每一行作为一条Record,并以给定的分隔符对数据进行解析。下面以csv文件为例,说明如何使用flume将批量csv文件准时时上传到DataHub。请将以下内容保存至本地文件/temp/test.csv中。

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
    2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
    3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
    4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
    5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
    6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
    7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
    8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

    测试数据对应的DataHub Schema为以下内容:

    字段名称

    字段类型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume文件。

    在目录 ${FLUME_HOME}/conf 下创建文件名为datahub_basic.conf的文件,然后将以下内容写入文件,本实例采用Exec Source作为数据源,更多Source可以参考Flume官方文档

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    说明

    ExecSource源可能发生数据丢失,因为ExecSource无法保证将事件放入Channel,在这种情况下,数据将丢失。例如,tail命令获取数据时,此时flume channel已满,而这部分数据将会丢失。建议使用Spooling Directory Source或者Taildir Source。这里将静态文件/temp/test.csv作为数据源,如果文件为动态写入的日志文件,可使用命令tail -F logFile进行实时采集。

  3. 启动Flume。

    Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台,如需更多信息可采用DEBUG模式。使用如下命令启动Flume,即可完成CSV文件数据采集进入DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

案例二:REGEX serializer

  1. 准备测试数据。

    REGEX解析数据时将每一行作为一条Record,并以给定的正则表达式对数据进行解析,一条Record的信息的多个内容以分组表示。下面以日志文件为例,说明flume如何利用正则表达式准时时上传到DataHub。请将以下测试数据保存在本地文件/temp/test.csv中:

    1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
    2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
    3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
    4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
    5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
    6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
    7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
    8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
    9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
    10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
    11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
    12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938

    以上测试数据对应的DataHub Schema为:

    字段名称

    字段类型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume文件。

    在目录 ${FLUME_HOME}/conf 下创建文件名为datahub_basic.conf的文件,然后将以下内容写入文件。本实例采用Exec Source作为数据源,更多Source可以参考Flume官方文档

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = REGEX
    a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    说明

    ExecSource源可能发生数据丢失,因为EeecSource无法保证将事件放入Channel,在这种情况下,数据将丢失。例如,tail命令获取数据时,此时flume channel已满,而这部分数据将会丢失。建议使用Spooling Directory Source或者Taildir Source。这里将静态文件/temp/test.csv作为数据源,如果文件为动态写入的日志文件,可使用命令tail -F logFile进行实时采集。

  3. 启动Flume。

    Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台,如需更多信息可采用DEBUG模式。使用如下命令启动Flume,即可完成CSV文件数据采集进入DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

案例三:Flume Taildir Source

Flume使用exec source时,可能会导致数据丢失,所以在实际生产环境中并不建议使用。如果想要采集本地日志,可以使用Taildir Source或者Spooling Directory Source。下面以Taildir为例,介绍日志文件的采集。Taildir将会可以指定文件组,然后观察指定的文件,并在检测到新行添加到每个文件后,几乎实时的进行读取。如果正在写入新行,则此源将重试读取它们,以等待写入完成。 Taildir Source会把每个文件的已经读到的位置信息以JSON格式储存在positionFile文件中,source event 放入channel失败,已读位置不会更新,所以Taildir Source是可靠的。

  1. 准备测试数据。

    所有的日志将以如下格式追加到文件末尾。日志文件命名格式为 *.log

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289

    以上测试数据对应的DataHub Schema为:

    字段名称

    字段类型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume配置文件。

    在目录 ${FLUME_HOME}/conf 下创建文件名为datahub_basic.conf的文件,然后将以下内容写入文件。

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /temp/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /temp/.*log
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 启动Flume。

    Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台,如需更多信息可采用DEBUG模式。使用如下命令启动Flume,即可完成CSV文件数据采集进入DataHub:

    1. $ cd ${FLUME_HOME}
    2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

案例四:JSON serializer

SON解析数据时将每一行作为一条Record,只做一层JSON解析,嵌套的内容直接当作string,第一层的name若在配置的serializer.fieldnames中,就会加入到对应的列中。下面以日志文件为例,说明flume如何利用JSON解析方式准时时上传到DataHub。

  1. 准备测试数据。

    将以下内容保存在本地文件/temp/test.json中。其中需要同步的数据内容为日期后面的详细数据。

    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V","id":1,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l","id":2,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs","id":3,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI","id":4,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV","id":5,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX","id":6,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8","id":7,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}

    以上测试数据对应的DataHub Schema为:

    字段名称

    字段类型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume文件。

    在目录 ${FLUME_HOME}/conf 下创建文件名为datahub_basic.conf的文件,然后将以下内容写入文件。本实例采用Exec Source作为数据源,更多Source可以参考Flume官方文档

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.json
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = JSON
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 启动Flume。

    Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台,如需更多信息可采用DEBUG模式。使用如下命令启动Flume,即可完成CSV文件数据采集进入DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Source使用案例

读取DataHub中数据至其它系统

DataHub-Flume Source可以将DataHub中的数据读取出来,并且移动到另外的系统中,本文以logger(直接输出到控制台)为例,介绍DataHub-Flume Source的使用方法。

  1. 如下字段topic Schema为例。

    字段名称

    字段类型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume文件。

    在目录 ${FLUME_HOME}/conf 下创建文件名为datahub_source.conf的文件,然后将以下内容写入文件。

     # A single-node Flume configuration for DataHub
     # Name the components on this agent
     a1.sources = r1
     a1.sinks = k1
     a1.channels = c1
    
     # Describe/configure the source
     a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
     a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
     a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
     a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
     a1.sources.r1.datahub.project = datahub_test
     a1.sources.r1.datahub.topic = test_flume
     a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
     a1.sources.r1.serializer = DELIMITED
     a1.sources.r1.serializer.delimiter = ,
     a1.sources.r1.serializer.charset = UTF-8
     a1.sources.r1.datahub.retryTimes = 3
     a1.sources.r1.datahub.batchSize = 1000
     a1.sources.r1.datahub.batchTimeout = 5
     a1.sources.r1.datahub.enablePb = false
    
     # Describe the sink
     a1.sinks.k1.type = logger
    
     # Use a channel which buffers events in memory
     a1.channels.c1.type = memory
     a1.channels.c1.capacity = 10000
     a1.channels.c1.transactionCapacity = 10000
    
     # Bind the source and sink to the channel
     a1.sources.r1.channels = c1
     a1.sinks.k1.channel = c1
  3. 启动Flume。

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console

Flume metric

DataHub-Flume 支持Flume的内置计数监控器,用户可以利用监控器来监控自己的Flume插件的运行情况。DataHub-Flume插件的Sink和Source都支持metric信息显示,具体参数含义可查看下表(只含DataHub相关的参数,更多参数含义参考:Flume官方文档)。

DatahubSink

名称

描述

BatchEmptyCount

batch timeout时没有数据需要写入DataHub发生的次数

BatchCompleteCount

Batch处理成功次数,仅包含全部写入成功的情况

EventDrainAttemptCount

尝试写入DataHub的数据数量(解析成功数量)

BatchUnderflowCount

成功写入DataHub的数据数量小于需要写入的数据量发生的次数。数据解析完成,但写入DataHub时部分失败或全部失败。

EventDrainSuccessCount

成功写入DataHub的数据量

DatahubSource

名称

描述

EventReceivedCount

Source接收到的DataHub的数据数量

EventAcceptedCount

Source将DataHub数据成功写入channel的数量

lume监控

lume提供了多种监控方法,本文以HTTP监控为例,介绍Flume监控工具的使用,使用HTTP方式监控,只需要在Flume插件启动时增加两个参数即可,-Dflume.monitoring.type=http -Dflume.monitoring.port=1234,其中type将监控方式指定为http,port为指定的端口号。使用示例如下:

bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234

插件成功启动之后,便可以登录Web界面进行查看。地址为 https://ip:1234/metrics

说明

更多的监控方法可以参考Flume官方文档

常见问题

flume启动报错org.apache.flume.ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

flume默认堆内存20MB,配置的batchSize过大时,flume使用的堆内存会超出20M。

解决方案1:调小batchSize。

解决方案2:调大flume最大堆内存。

  • $ vim bin/flume-ng

  • JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"

DataHub-Flume插件是否支持JSON格式?

目前不支持,不过用户可以通过自定义正则表达式进行数据解析,或者修改DataHub-Flume插件代码,添加JSONEvent进行支持。

DataHub-Flume插件支持Blob Topic吗?

目前DataHub-Flume插件仅支持Tuple Topic,暂不支持blob。

flume 报错 org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count

channel已满,source数据写入channel失败。可以在配置文件中修改channel capacity解决,并且可以适当降低datahub source的batchSize。

使用旧版本flume时报错,可能会因为jar包冲突导致无法正常启动。

  • 错误场景:使用flume1.6时,启动时报错:java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;。因为新版本的插件依赖的jar包和flume本身依赖的jar包版本不一致,使用了flume依赖的旧版本jar包导致新版本的method找不到。

  • 如何处理:删除${FLUME_HOME}/lib目录下的三个jar包即可。

    • jackson-annotations-2.3.0.jar

    • jackson-databind-2.3.1.jar

    • jackson-annotations-2.3.0.jar

使用flume采集数据时,空字符串自动转为null

在flume插件2.0.2中对于非空字符串会做trim,空字符串直接转为null。flume插件2.0.3中已经优化掉,非空字符串写入DataHub依旧为空字符串。

启动报错Cannot invoke "com.google.common.cache.LoadingCache.get(Object)" because"com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null]

删除Flume lib文件夹中的guava 、zstd的 jar包文件,重新启动。