本节介绍如何在E-MapReduce上创建Hadoop集群,然后在Hadoop集群中运行Flink作业来消费OSS数据。

前提条件

  • 已注册阿里云账号,详情请参见注册云账号
  • 已开通E-MapReduce服务和OSS服务。
  • 已完成云账号的授权,详情请参见角色授权

背景信息

在开发过程中,通常会遇到需要消费存储在阿里云OSS中的数据的场景。在阿里云E-MapReduce中,您可通过运行Flink作业来消费OSS存储空间中的数据。本节将在E-MapReduce上创建一个Flink作业,然后在Hadoop集群上运行这个Flink作业来读取并打印OSS中指定文件的内容。

步骤一 准备环境

在创建Flink作业前,您需要在本地安装Maven和Java环境,以及在E-MapReduce上创建Hadoop集群。如果Maven是3.0以上版本,则建议Java选择2.0及以下版本,否则会造成不兼容情况。

  1. 在本地安装Maven和Java环境。
  2. 登录阿里云 E-MapReduce 控制台
  3. 创建Hadoop集群(可选服务中必须选中Flink服务),详情请参见创建集群

步骤二 准备测试数据

在创建Flink作业前,您需要在OSS上传测试数据。本例以上传一个test.txt文件为例,文件内容为:Nothing is impossible for a willing heart. While there is a life, there is a hope~。

  1. 登录OSS 管理控制台
  2. 创建存储空间并上传测试数据文件,详情请参见创建存储空间上传文件

    测试数据的上传路径在后续步骤的代码中会使用,本例的上传路径为oss://emr-logs2/hengwu/test.txt

    说明 上传文件后,请保留OSS的登录窗口,后续仍会使用。

步骤三 制作JAR包并上传到OSS或Hadoop集群

本例JAR包来源:下载E-MapReduce示例代码aliyun-emapreduce-demo,编译生成JAR包。JAR包可上传到Hadoop集群的header主机中,也可上传到OSS中,本例以上传到OSS为例。

  1. 下载E-MapReduce示例代码aliyun-emapreduce-demo到本地。
  2. 运行mvn clean package -DskipTests命令打包代码。

    打包好的JAR包为存储在../target/目录下,例如,target/examples-1.2.0.jar

  3. 返回到OSS 管理控制台
  4. 上传JAR包到OSS任一路径下。

    JAR包的上传路径在后续步骤的代码中会使用,本例的上传路径为oss://emr-logs2/hengwu/examples-1.2.0.jar

步骤四 创建并运行Flink作业

  1. 返回到阿里云 E-MapReduce 控制台
  2. 数据开发页面创建项目,详情请参见项目管理
  3. 进入新建的项目,在作业编辑页面新建Flink类型的作业。
  4. 新建Flink作业后,配置其作业内容

    Flink作业内容

    作业内容是一段代码,本节使用的示例代码如下:

    run -m yarn-cluster  -yjm 1024 -ytm 1024 -yn 4 -ys 4 -ynm flink-oss-sample -c com.aliyun.emr.example.flink.FlinkOSSSample  ossref://emr-logs2/hengwu/examples-1.2.0.jar --input oss://emr-logs2/hengwu/test.txt

    示例代码中的关键参数说明如下:

    • ossref://emr-logs2/hengwu/examples-1.2.0.jar:上传至OSS的JAR包。
    • oss://emr-logs2/hengwu/test.txt:上传到OSS的测试数据。
    说明 实际操作时,您需要根据步骤一 准备环境步骤三 制作JAR包并上传到OSS或Hadoop集群中的配置来替换这两个参数。
  5. 作业配置完成后,单击右上方的运行,在弹出的对话框中选择执行集群为新建的Hadoop集群。
  6. 单击确定,运行Flink作业。
    作业开始运行时,会自动弹出日志。作业成功运行后,会从OSS读取指定文件内容并打印在日志中。至此,我们成功实现了在E-MapReduce集群上运行Flink作业消费OSS数据。
    Flink作业结果

步骤五 查看作业提交日志和作业信息(可选)

如果需要定位作业失败的原因或了解作业的详细信息,则您可查看作业的日志和作业信息。

  1. 查看作业提交日志。
    当前提交日志支持在E-MapReduce控制台查看,也支持在SSH客户端查看。
    • 登录阿里云 E-MapReduce 控制台查看提交日志。
      在控制台提交作业后,可通过运行记录列表进入某次作业运行的详情页面,在详情页面可查看作业的日志。
      Flink作业执行记录
      Flink作业日志
    • 通过SSH客户端登录到Hadoop集群的header主机查看提交日志。

      默认情况下,根据Flink的log4j配置(详情请参见/etc/ecm/flink-conf/log4j-yarn-session.properties),提交日志会保存在/mnt/disk1/log/flink/flink-{user}-client-{hostname}.log

      其中,user为提交Flink作业的用户,hostname为提交作业所在的节点。以root用户在 emr-header-1节点提交Flink作业为例,日志的路径为/mnt/disk1/log/flink/flink-flink-historyserver-0-emr-header-1.cluster-126601.log

  2. 查看作业信息。

    通过Yarn UI可查看Flink作业的信息。访问Yarn UI有SSH隧道和Knox两种方式,SSH隧道方式请参见SSH 登录集群,Knox方式请参见Knox 使用说明访问链接与端口。下面以Knox方式为例进行介绍。

    1. 在Hadoop集群的访问链接与端口页面中,单击Yarn UI后的链接,进入Hadoop控制台。

      YARN UI链接
    2. 在Hadoop控制台,单击作业的ID,查看作业运行详情。

      Hadoop控制台>Flink作业列表
      Hadoop控制台>Flink作业详情
    3. 如果需要查看运行中的Flink作业,则可在作业详情页面单击Tracking URL后面的链接,进入Flink Dashboard查看。
    4. 作业运行结束后,通过访问http://emr-header-1:8082,可以查看所有已经完成的作业列表。