本文介绍使用自建的Apache Flink访问LindormDFS。
准备工作
开通LindormDFS,详情请参见开通指南。
在计算节点上安装JDK,版本不能低于1.8。
在计算节点上安装Scala。
Scala下载地址为官方链接,其版本要与使用的Apache Spark版本相兼容。
下载Apache Hadoop压缩包。
Apache Hadoop下载地址为官方链接。建议您选用的Apache Hadoop版本不低于2.7.3,本文中使用的Apache Hadoop版本为Apache Hadoop 2.7.3。
下载Apache Flink压缩包。
在LindormDFS上使用的Flink的版本必须为1.9.0及以上,Apache Flink下载地址为官方链接。本文中使用的Flink版本为官方提供的预编译版本Flink 1.9.0。
本文档的操作步骤中涉及的安装包版本号、文件夹路径,请根据实际情况进行替换。
Apache Hadoop 配置
解压压缩包到指定目录。
tar -zxvf hadoop-2.7.3.tar.gz -C /usr/local/
修改
hadoop-env.sh
配置文件。执行如下命令打开
hadoop-env.sh
配置文件。vim /usr/local/hadoop-2.7.3/etc/hadoop/hadoop-env.sh
配置
JAVA_HOME
。export JAVA_HOME=${jdk安装目录}
修改
core-site.xml
。执行如下命令打开
core-site.xml
。vim /usr/local/hadoop-2.7.3/etc/hadoop/core-site.xml
修改
core-site.xml
文件,修改的内容如下所示,其中实例ID需要替换为实际实例ID。<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://${实例ID}</value> </property> </configuration>
修改
mapred-site.xml
配置文件。执行如下命令打开
mapred-site.xml
。vim /usr/local/hadoop-2.7.3/etc/hadoop/mapred-site.xml
修改
mapred-site.xml
,修改的内容如下所示。<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
修改
yarn-site.xml
配置文件。执行如下命令打开
yarn-site.xml
配置文件。vim /usr/local/hadoop-2.7.3/etc/hadoop/yarn-site.xml
修改
yarn-site.xml
,修改的内容如下所示。<configuration> <property> <name>yarn.resourcemanager.hostname</name> <value>xxxx</value> <!-- 该地址填写集群中yarn的resourcemanager的hostname --> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>16384</value> <!-- 根据您当前的集群能力进行配置此项 --> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>4</value> <!-- 根据您当前的集群能力进行配置此项 --> </property> <property> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>4</value> <!-- 根据您当前的集群能力进行配置此项 --> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>3584</value> <!-- 根据您当前的集群能力进行配置此项 --> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>14336</value> <!-- 根据您当前的集群能力进行配置此项 --> </property> </configuration>
修改
slaves
配置文件。执行如下命令打开
slaves
配置文件。vim /usr/local/hadoop-2.7.3/etc/hadoop/slaves
修改
slaves
,修改的内容如下所示,假设Spark集群有两个节点node1,node2。node1 node2
说明node1和node2为Spark集群节点的机器名。
配置环境变量。
执行如下命令打开
/etc/profile
配置文件。vim /etc/profile
修改
/etc/profile
,在文件末尾添加如下内容。export HADOOP_HOME=/usr/local/hadoop-2.7.3 export HADOOP_CLASSPATH=/usr/local/hadoop-2.7.3/etc/hadoop:/usr/local/hadoop-2.7.3/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/common/*:/usr/local/hadoop-2.7.3/share/hadoop/hdfs:/usr/local/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.3/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/yarn/*:/usr/local/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.3/share/hadoop/mapreduce/*:/usr/local/hadoop-2.7.3/contrib/capacity-scheduler/*.jar export HADOOP_CONF_DIR=/usr/local/hadoop-2.7.3/etc/hadoop
执行如下命令以生效配置信息。
执行如下命令将
${HADOOP_HOME}
文件夹同步到集群的其他节点。scp -r hadoop-2.7.3/ testuser@node2:/usr/local/
验证Apache Hadoop配置
完成Hadoop配置后,不需要格式化namenode,也不需要使用start-dfs.sh来启动HDFS相关服务。如需使用yarn服务,只需在resourcemanager节点启动yarn服务,具体验证Hadoop是否配置成功的方法请参见使用开源HDFS客户端访问。
配置Apache Flink
执行如下命令解压Flink压缩包到指定目录。
tar -zxvf flink-1.9.0-bin-scala_2.11.tgz -C /usr/local/
在使用Apache Flink之前必须在您的集群环境变量中配置
HADOOP_HOME
,HADOOP_CLASSPATH
和HADOOP_CONF_DIR
,详情请参见Apache Hadoop 配置步骤7配置环境变量。如果您需要对Flink进行额外的配置,请参见官方文档配置操作指南。
验证Apache Flink配置
使用Flink自带的WordCount.jar
对LindormDFS上的数据进行读取,并将计算结果写入LindormDFS,在测试之前需要先启动yarn服务。
生成测试数据。
此处使用Apache Hadoop 2.7.3自带的JAR包
hadoop-mapreduce-examples-2.7.3.jar
中的randomtextwriter方法在LindormDFS上生成测试数据。/usr/local/hadoop-2.7.3/bin/hadoop jar /usr/local/hadoop-2.7.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar randomtextwriter -D mapreduce.randomtextwriter.totalbytes=10240 -D mapreduce.randomtextwriter.bytespermap=1024 -D mapreduce.job.maps=4 -D mapreduce.job.reduces=2 /flink-test/input
查看在LindormDFS上生成的测试数据。
/usr/local/hadoop-2.7.3/bin/hadoop fs -cat /flink-test/input/*
提交wordcount程序。
/usr/local/flink-1.9.0/bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 \ /usr/local/flink-1.9.0/examples/batch/WordCount.jar \ --input hdfs://${实例ID}/flink-test/input \ --output hdfs://${实例ID}/flink-test/output \
查看在LindormDFS上的结果文件。
/usr/local/hadoop-2.7.3/bin/hadoop fs -cat /flink-test/output