This topic describes how to install and use Apache Flink on a Hadoop cluster with an Apsara File Storage for HDFS file system.
Prerequisites
You have activated Apsara File Storage for HDFS and created a file system instance and a mount target. For more information, see Quick Start for Apsara File Storage for HDFS.
You have installed JDK 1.8 or later on all nodes of the Hadoop cluster.
-
You have downloaded the Apache Hadoop package. We recommend that you use Hadoop 2.7.2 or later. This topic uses Apache Hadoop 2.7.2.
-
You have downloaded the Apache Flink archive. This topic uses the official pre-compiled version: Apache Flink 1.12.5.
Step 1: Configure Hadoop
-
Extract the Hadoop archive to a specified directory:
tar -zxf hadoop-2.7.2.tar.gz -C /usr/local/ -
Modify the
hadoop-env.shconfiguration file.-
Open the
hadoop-env.shconfiguration file:vim /usr/local/hadoop-2.7.2/etc/hadoop/hadoop-env.sh -
Set the
JAVA_HOMEenvironment variable.export JAVA_HOME=/usr/java/default
-
-
Modify the
core-site.xmlconfiguration file.-
Open the
core-site.xmlconfiguration file:vim /usr/local/hadoop-2.7.2/etc/hadoop/core-site.xml -
Add the following properties to the
core-site.xmlconfiguration file. For more information, see Mount an Apsara File Storage HDFS file system.<configuration> <property> <name>fs.defaultFS</name> <value>dfs://x-xxxxxxxx.cn-xxxxx.dfs.aliyuncs.com:10290</value> <!-- Replace this value with your mount target address. --> </property> <property> <name>fs.dfs.impl</name> <value>com.alibaba.dfs.DistributedFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.dfs.impl</name> <value>com.alibaba.dfs.DFS</value> </property> </configuration>
-
-
Modify the
yarn-site.xmlconfiguration file.-
Open the
yarn-site.xmlconfiguration file:vim /usr/local/hadoop-2.7.2/etc/hadoop/yarn-site.xml -
Add the following properties to the
yarn-site.xmlconfiguration file.<configuration> <property> <name>yarn.resourcemanager.hostname</name> <value>xxxx</value> <!-- Replace this value with the hostname of the ResourceManager in your cluster. --> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>16384</value> <!-- Configure this based on the capacity of your cluster. --> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>4</value> <!-- Configure this based on the capacity of your cluster. --> </property> <property> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>4</value> <!-- Configure this based on the capacity of your cluster. --> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>3584</value> <!-- Configure this based on the capacity of your cluster. --> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>14336</value> <!-- Configure this based on the capacity of your cluster. --> </property> </configuration>
-
-
Modify the
slavesconfiguration file.-
Open the
slavesconfiguration file:vim /usr/local/hadoop-2.7.2/etc/hadoop/slaves -
Add the hostnames of the compute nodes to the
slavesfile.cluster-header-1cluster-worker-1
-
-
Configure environment variables.
-
Open the
/etc/profileconfiguration file:vim /etc/profile -
Add the following Hadoop environment variables to the
/etc/profilefile:export HADOOP_HOME=/usr/local/hadoop-2.7.2 export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH -
Apply the changes to the current session:
source /etc/profile
-
-
Install the Java SDK for Apsara File Storage for HDFS.
You can download the latest Java SDK for Apsara File Storage HDFS and deploy it to the CLASSPATH of your Hadoop ecosystem components. For more information, see Mount an Apsara File Storage HDFS file system.
cp aliyun-sdk-dfs-x.y.z.jar /usr/local/hadoop-2.7.2/share/hadoop/hdfs -
Synchronize the
${HADOOP_HOME}directory to the other cluster nodes. Then, on each of those nodes, configure the Hadoop environment variables as described in Step 6.scp -r hadoop-2.7.2/ hadoop@cluster-worker-1:/usr/local/
Step 2: Verify Hadoop configuration
After you configure Hadoop, you do not need to format the NameNode or run the start-dfs.sh script to start HDFS services. To verify that Hadoop is configured correctly, see Verify the installation.
Step 3: Configure Flink
Extract the Flink archive to a specified directory, such as /usr/local/.
tar -zxf flink-1.12.5-bin-scala_2.11.tgz -C /usr/local/
-
Before using Flink, ensure the
HADOOP_HOME,HADOOP_CLASSPATH, andHADOOP_CONF_DIRenvironment variables are set for your cluster. For more information, see Configure environment variables. -
For additional Flink configurations, see the Configuration guide.
Step 4: Verify Flink configuration
Use the WordCount.jar example included with Flink to read data from Apsara File Storage for HDFS and write the computation results back to Apsara File Storage for HDFS.
-
Generate test data in your Apsara File Storage for HDFS file system.
${HADOOP_HOME}/bin/hadoop jar ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar \ randomtextwriter \ -D mapreduce.randomtextwriter.totalbytes=10240 \ -D mapreduce.randomtextwriter.bytespermap=1024 \ dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/inputIn the command, replace
f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.comwith the mount target address of your Apsara File Storage for HDFS file system. -
Check if the
HADOOP_CLASSPATHenvironment variable is set:echo $HADOOP_CLASSPATHIf
HADOOP_CLASSPATHis not set, run the following command to set it.export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$($HADOOP_HOME/bin/hadoop classpath) -
Start a Flink session on YARN:
/usr/local/flink-1.12.5/bin/yarn-session.sh --detached -
Run the
WordCount.jarjob:/usr/local/flink-1.12.5/bin/flink run \ /usr/local/flink-1.12.5/examples/batch/WordCount.jar \ --input dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/input \ --output dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/outputIn the command, replace
f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.comwith the mount target address of your Apsara File Storage for HDFS file system. -
View a sample of the output in your Apsara File Storage for HDFS file system.
${HADOOP_HOME}/bin/hadoop fs -cat dfs://f-xxxxxxx.cn-zhangjiakou.dfs.aliyuncs.com:10290/flink-test/output | tail -20If the command returns output similar to the following, Flink is configured correctly.
visceral 4 volcano 2 wandoo 1 warlike 4 warriorwise 2 weism 4 wemless 2 whilkut 1 whitlowwort 3 whittle 1 wingable 1 wvsportswomanship 1 x 1 y 2 yawler 1 yeat 1 yeelaman 2 yote 2 z 3 zoonitic 1