Use Apache Flink with Apsara File Storage for HDFS

更新时间:
复制 MD 格式

This topic describes how to install and use Apache Flink on a Hadoop cluster with an Apsara File Storage for HDFS file system.

Prerequisites

Step 1: Configure Hadoop

  1. Extract the Hadoop archive to a specified directory:

    tar -zxf hadoop-2.7.2.tar.gz -C /usr/local/
  2. Modify the hadoop-env.sh configuration file.

    1. Open the hadoop-env.sh configuration file:

      vim /usr/local/hadoop-2.7.2/etc/hadoop/hadoop-env.sh
    2. Set the JAVA_HOME environment variable.

      export JAVA_HOME=/usr/java/default
  3. Modify the core-site.xml configuration file.

    1. Open the core-site.xml configuration file:

      vim /usr/local/hadoop-2.7.2/etc/hadoop/core-site.xml
    2. Add the following properties to the core-site.xml configuration file. For more information, see Mount an Apsara File Storage HDFS file system.

      <configuration>
          <property>
               <name>fs.defaultFS</name>
               <value>dfs://x-xxxxxxxx.cn-xxxxx.dfs.aliyuncs.com:10290</value>
               <!-- Replace this value with your mount target address. -->
          </property>
          <property>
               <name>fs.dfs.impl</name>
               <value>com.alibaba.dfs.DistributedFileSystem</value>
          </property>
          <property>
               <name>fs.AbstractFileSystem.dfs.impl</name>
               <value>com.alibaba.dfs.DFS</value>
          </property>
      </configuration>
  4. Modify the yarn-site.xml configuration file.

    1. Open the yarn-site.xml configuration file:

      vim /usr/local/hadoop-2.7.2/etc/hadoop/yarn-site.xml
    2. Add the following properties to the yarn-site.xml configuration file.

      <configuration>
          <property>
              <name>yarn.resourcemanager.hostname</name>
              <value>xxxx</value>
              <!-- Replace this value with the hostname of the ResourceManager in your cluster. -->
          </property>
          <property>
              <name>yarn.nodemanager.aux-services</name>
              <value>mapreduce_shuffle</value>
          </property>
          <property>
              <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
              <value>org.apache.hadoop.mapred.ShuffleHandler</value>
          </property>
          <property>
              <name>yarn.nodemanager.resource.memory-mb</name>
              <value>16384</value>
              <!-- Configure this based on the capacity of your cluster. -->
          </property>
          <property>
              <name>yarn.nodemanager.resource.cpu-vcores</name>
              <value>4</value>
              <!-- Configure this based on the capacity of your cluster. -->
          </property>
          <property>
              <name>yarn.scheduler.maximum-allocation-vcores</name>
              <value>4</value>
              <!-- Configure this based on the capacity of your cluster. -->
          </property>
          <property>
              <name>yarn.scheduler.minimum-allocation-mb</name>
              <value>3584</value>
              <!-- Configure this based on the capacity of your cluster. -->
          </property>
          <property>
              <name>yarn.scheduler.maximum-allocation-mb</name>
              <value>14336</value>
              <!-- Configure this based on the capacity of your cluster. -->
          </property>
      </configuration>
  5. Modify the slaves configuration file.

    1. Open the slaves configuration file:

      vim /usr/local/hadoop-2.7.2/etc/hadoop/slaves
    2. Add the hostnames of the compute nodes to the slaves file.

      cluster-header-1
      cluster-worker-1
  6. Configure environment variables.

    1. Open the /etc/profile configuration file:

      vim /etc/profile
    2. Add the following Hadoop environment variables to the /etc/profile file:

      export HADOOP_HOME=/usr/local/hadoop-2.7.2
      export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
      export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
      export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
    3. Apply the changes to the current session:

      source /etc/profile
  7. Install the Java SDK for Apsara File Storage for HDFS.

    You can download the latest Java SDK for Apsara File Storage HDFS and deploy it to the CLASSPATH of your Hadoop ecosystem components. For more information, see Mount an Apsara File Storage HDFS file system.

    cp aliyun-sdk-dfs-x.y.z.jar  /usr/local/hadoop-2.7.2/share/hadoop/hdfs
  8. Synchronize the ${HADOOP_HOME} directory to the other cluster nodes. Then, on each of those nodes, configure the Hadoop environment variables as described in Step 6.

    scp -r hadoop-2.7.2/ hadoop@cluster-worker-1:/usr/local/

Step 2: Verify Hadoop configuration

After you configure Hadoop, you do not need to format the NameNode or run the start-dfs.sh script to start HDFS services. To verify that Hadoop is configured correctly, see Verify the installation.

Step 3: Configure Flink

Extract the Flink archive to a specified directory, such as /usr/local/.

tar -zxf flink-1.12.5-bin-scala_2.11.tgz -C /usr/local/
Important
  • Before using Flink, ensure the HADOOP_HOME, HADOOP_CLASSPATH, and HADOOP_CONF_DIR environment variables are set for your cluster. For more information, see Configure environment variables.

  • For additional Flink configurations, see the Configuration guide.

Step 4: Verify Flink configuration

Use the WordCount.jar example included with Flink to read data from Apsara File Storage for HDFS and write the computation results back to Apsara File Storage for HDFS.

  1. Generate test data in your Apsara File Storage for HDFS file system.

    ${HADOOP_HOME}/bin/hadoop jar  ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar \
    randomtextwriter \
    -D mapreduce.randomtextwriter.totalbytes=10240 \
    -D mapreduce.randomtextwriter.bytespermap=1024 \
    dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/input

    In the command, replace f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com with the mount target address of your Apsara File Storage for HDFS file system.

  2. Check if the HADOOP_CLASSPATH environment variable is set:

    echo $HADOOP_CLASSPATH

    If HADOOP_CLASSPATH is not set, run the following command to set it.

    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$($HADOOP_HOME/bin/hadoop classpath)
  3. Start a Flink session on YARN:

    /usr/local/flink-1.12.5/bin/yarn-session.sh --detached
  4. Run the WordCount.jar job:

    /usr/local/flink-1.12.5/bin/flink run \
    /usr/local/flink-1.12.5/examples/batch/WordCount.jar \
    --input dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/input \
    --output dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/output

    In the command, replace f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com with the mount target address of your Apsara File Storage for HDFS file system.

  5. View a sample of the output in your Apsara File Storage for HDFS file system.

    ${HADOOP_HOME}/bin/hadoop fs -cat dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/output | tail -20

    If the command returns output similar to the following, Flink is configured correctly.

    visceral 4
    volcano 2
    wandoo 1
    warlike 4
    warriorwise 2
    weism 4
    wemless 2
    whilkut 1
    whitlowwort 3
    whittle 1
    wingable 1
    wvsportswomanship 1
    x 1
    y 2
    yawler 1
    yeat 1
    yeelaman 2
    yote 2
    z 3
    zoonitic 1