使用Flink访问

本文介绍使用自建的Apache Flink访问LindormDFS。

准备工作

  • 开通LindormDFS,详情请参见开通指南

  • 在计算节点上安装JDK,版本不能低于1.8。

  • 在计算节点上安装Scala。

    Scala下载地址为官方链接,其版本要与使用的Apache Spark版本相兼容。

  • 下载Apache Hadoop压缩包。

    Apache Hadoop下载地址为官方链接。建议您选用的Apache Hadoop版本不低于2.7.3,本文中使用的Apache Hadoop版本为Apache Hadoop 2.7.3。

  • 下载Apache Flink压缩包。

    在LindormDFS上使用的Flink的版本必须为1.9.0及以上,Apache Flink下载地址为官方链接。本文中使用的Flink版本为官方提供的预编译版本Flink 1.9.0。

说明

本文档的操作步骤中涉及的安装包版本号、文件夹路径,请根据实际情况进行替换。

Apache Hadoop 配置

  1. 解压压缩包到指定目录。

    tar -zxvf hadoop-2.7.3.tar.gz -C /usr/local/
  2. 修改hadoop-env.sh配置文件。

    1. 执行如下命令打开hadoop-env.sh配置文件。

      vim /usr/local/hadoop-2.7.3/etc/hadoop/hadoop-env.sh
    2. 配置JAVA_HOME

      export JAVA_HOME=${jdk安装目录}
  3. 修改core-site.xml

    1. 执行如下命令打开core-site.xml

      vim /usr/local/hadoop-2.7.3/etc/hadoop/core-site.xml
    2. 修改core-site.xml文件,修改的内容如下所示,其中实例ID需要替换为实际实例ID。

      <configuration>
        <property>
           <name>fs.defaultFS</name>
           <value>hdfs://${实例ID}</value>
        </property>
      </configuration>
  4. 修改mapred-site.xml配置文件。

    1. 执行如下命令打开mapred-site.xml

      vim /usr/local/hadoop-2.7.3/etc/hadoop/mapred-site.xml
    2. 修改mapred-site.xml,修改的内容如下所示。

      <configuration>
      <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
      </property>
      </configuration>
  5. 修改yarn-site.xml配置文件。

    1. 执行如下命令打开yarn-site.xml配置文件。

      vim /usr/local/hadoop-2.7.3/etc/hadoop/yarn-site.xml
    2. 修改yarn-site.xml,修改的内容如下所示。

      <configuration>
      <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>xxxx</value>
        <!-- 该地址填写集群中yarn的resourcemanager的hostname -->
      </property>
      <property>
          <name>yarn.nodemanager.aux-services</name>
          <value>mapreduce_shuffle</value>
      </property>
      <property>
        <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
      </property>
      <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>16384</value>
          <!-- 根据您当前的集群能力进行配置此项 -->
      </property>
      <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>4</value>
           <!-- 根据您当前的集群能力进行配置此项 -->
      </property>
      <property>
        <name>yarn.scheduler.maximum-allocation-vcores</name>
        <value>4</value>
          <!-- 根据您当前的集群能力进行配置此项 -->
      </property>
      <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>3584</value>
          <!-- 根据您当前的集群能力进行配置此项 -->
      </property>
      <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>14336</value>
          <!-- 根据您当前的集群能力进行配置此项 -->
      </property>
      </configuration>
  6. 修改slaves配置文件。

    1. 执行如下命令打开slaves配置文件。

       vim /usr/local/hadoop-2.7.3/etc/hadoop/slaves 
    2. 修改slaves,修改的内容如下所示,假设Spark集群有两个节点node1,node2。

      node1
      node2
      说明

      node1和node2为Spark集群节点的机器名。

  7. 配置环境变量。

    1. 执行如下命令打开/etc/profile配置文件。

      vim /etc/profile
    2. 修改/etc/profile,在文件末尾添加如下内容。

      export HADOOP_HOME=/usr/local/hadoop-2.7.3
      export HADOOP_CLASSPATH=/usr/local/hadoop-2.7.3/etc/hadoop:/usr/local/hadoop-2.7.3/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/common/*:/usr/local/hadoop-2.7.3/share/hadoop/hdfs:/usr/local/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.3/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/yarn/*:/usr/local/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/mapreduce/*:/usr/local/hadoop-2.7.3/contrib/capacity-scheduler/*.jar
      export HADOOP_CONF_DIR=/usr/local/hadoop-2.7.3/etc/hadoop
    3. 执行如下命令以生效配置信息。

  8. 执行如下命令将${HADOOP_HOME}文件夹同步到集群的其他节点。

    scp -r hadoop-2.7.3/ testuser@node2:/usr/local/

验证Apache Hadoop配置

完成Hadoop配置后,不需要格式化namenode,也不需要使用start-dfs.sh来启动HDFS相关服务。如需使用yarn服务,只需在resourcemanager节点启动yarn服务,具体验证Hadoop是否配置成功的方法请参见使用开源HDFS客户端访问

配置Apache Flink

执行如下命令解压Flink压缩包到指定目录。

tar -zxvf flink-1.9.0-bin-scala_2.11.tgz -C /usr/local/
说明
  • 在使用Apache Flink之前必须在您的集群环境变量中配置HADOOP_HOMEHADOOP_CLASSPATHHADOOP_CONF_DIR,详情请参见Apache Hadoop 配置步骤7配置环境变量。

  • 如果您需要对Flink进行额外的配置,请参见官方文档配置操作指南

验证Apache Flink配置

使用Flink自带的WordCount.jar对LindormDFS上的数据进行读取,并将计算结果写入LindormDFS,在测试之前需要先启动yarn服务。

  1. 生成测试数据。

    此处使用Apache Hadoop 2.7.3自带的JAR包hadoop-mapreduce-examples-2.7.3.jar中的randomtextwriter方法在LindormDFS上生成测试数据。

    /usr/local/hadoop-2.7.3/bin/hadoop jar /usr/local/hadoop-2.7.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar randomtextwriter -D mapreduce.randomtextwriter.totalbytes=10240 -D mapreduce.randomtextwriter.bytespermap=1024 -D mapreduce.job.maps=4 -D mapreduce.job.reduces=2 /flink-test/input
  2. 查看在LindormDFS上生成的测试数据。

     /usr/local/hadoop-2.7.3/bin/hadoop fs -cat /flink-test/input/*
  3. 提交wordcount程序。

    /usr/local/flink-1.9.0/bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 \
    /usr/local/flink-1.9.0/examples/batch/WordCount.jar \
    --input hdfs://${实例ID}/flink-test/input \
    --output hdfs://${实例ID}/flink-test/output \
  4. 查看在LindormDFS上的结果文件。

    /usr/local/hadoop-2.7.3/bin/hadoop fs -cat /flink-test/output

    image