Flume使用JindoSDK写入OSS-HDFS服务

Apache Flume是一个分布式、可靠和高可用的系统,用于从大量不同的数据源有效地收集、聚合和移动大量日志数据,进行集中式的数据存储。Flume通过调用flush()保证事务性写入,并通过JindoSDK写入OSS-HDFS服务,确保flush后的数据立刻可见,保证数据不丢失。

前提条件

操作步骤

  1. 连接ECS实例。具体操作,请参见连接ECS实例

  2. 配置JindoSDK。

    1. 下载最新版本的Jindo SDK JAR包。下载地址,请参见GitHub

    2. 可选:如果您的环境中未包含Kerberos和SASL相关依赖,则需要在部署JindoSDK的所有节点安装以下依赖。

      • Ubuntu或Debian

        sudo apt-get install libkrb5-dev krb5-admin-server krb5-kdc krb5-user libsasl2-dev libsasl2-modules libsasl2-modules-gssapi-mit
      • Red Hat Enterprise Linux或CentOS

        sudo yum install krb5-server krb5-workstation cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-plain
      • macOS

        brew install krb5
    3. 解压下载的安装包。

      以下以解压jindosdk-x.x.x-linux.tar.gz为例,如使用其他版本的JindoSDK,请替换为对应的JAR包名称。

      tar -zxvf jindosdk-x.x.x-linux.tar.gz -C/usr/lib
    4. 配置JINDOSDK_HOME

      export JINDOSDK_HOME=/usr/lib/jindosdk-x.x.x-linux
      export PATH=$JINDOSDK_HOME/bin:$PATH
    5. 配置HADOOP_CLASSPATH

      export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${FLUME_HOME}/lib/*
      重要

      请将安装目录和环境变量部署到每个节点Flume根目录下的lib文件夹。

    6. 配置FLUME_CLASSPATH

      cp ${FLUME_HOME}/conf/flume-env.sh.template ${FLUME_HOME}/conf/flume-env.sh
      echo "FLUME_CLASSPATH=/usr/lib/jindosdk-x.x.x-linux/lib/*" >>  ${FLUME_HOME}/conf/flume-env.sh
  3. 配置Sink。

    配置Sink示例如下:

    # 配置OSS Sink。your_bucket填写为已开启OSS-HDFS服务的Bucket。
    xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%Y-%m-%d/%H
    
    # 写入Flume事务中最大的event数量。推荐每次Flush的量在32 MB以上,避免Flush过小影响整体性能以及产生大量的staging文件。
    # batchSize单位为event数量(即日志行数),设置时需要先评估event平均大小(例如200 Byte),假设每次Flush的大小预期为32 MB,则batchSize约为160000(32 MB / 200 Byte)。
    xxx.sinks.oss_sink.hdfs.batchSize = 100000
    
    ...
    # 用于HDFS文件按照时间分区,时间戳向下取整。默认值为true。
    xxx.sinks.oss_sink.hdfs.round = true
    # 当xxx.sinks.oss_sink.hdfs.round设置为true时,配合xxx.sinks.oss_sink.hdfs.roundUnit时间单位一起使用。例如,xxx.sinks.oss_sink.hdfs.roundUnit值为minute,该值设置为60,则表示60分钟之内的数据写到一个文件中,相当于每60分钟生成一个文件。
    xxx.sinks.oss_sink.hdfs.roundValue = 15
    # 按时间分区使用的时间单位。默认值为minute,可选值为second、minute和hour。
    xxx.sinks.oss_sink.hdfs.roundUnit = minute
    # Flume在HDFS文件夹下创建新文件的固定前缀。
    xxx.sinks.oss_sink.hdfs.filePrefix = your_topic
    # 当前文件写入达到该大小后触发滚动创建新文件,单位为字节。0表示不根据文件大小来分割文件。
    xxx.sinks.oss_sink.rollSize = 3600
    # 每个Sink实例操作HDFS IO时开启的线程数(例如open、write等)。
    xxx.sinks.oss_sink.threadsPoolSize = 30
    ...

    关于配置Sink涉及的各参数含义,请参见Apache Flume

常见问题

报错“org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure.user_sink”

在Hadoop的core-site.xml配置文件中添加以下配置解决该问题。

配置JindoOSS实现类。
fs.AbstractFileSystem.oss.impl com.aliyun.jindodata.oss.OSS 
fs.oss.impl com.aliyun.jindodata.oss.OssFileSystem
配置AccessKey及Endpoint。
fs.oss.credentials.provider com.aliyun.jindodata.auth.SimpleAliyunCredentialsProvider 
fs.oss.accessKeyId LTAI******** 
fs.oss.accessKeySecret KZo1********
fs.oss.endpoint {regionId}.oss-dls.aliyuncs.com