本文介绍如何在Dataflow集群中运行Flink作业来消费OSS数据。

背景信息

本文示例使用的是EMR数据开发功能,目前已不推荐使用,因为该功能在2022年2月21日21点停止更新。

如果您之前未使用过数据开发功能,推荐您通过EMR Studio进行数据开发,详情请参见EMR Studio概述

如果您之前使用过数据开发功能,建议您尽快迁移到EMR Studio进行数据开发,详情请参见EMR数据开发停止更新公告

前提条件

  • 已开通E-MapReduce服务和OSS服务。
  • 已完成云账号的授权,详情请参见角色授权

操作流程

  1. 步骤一:准备环境
  2. 步骤二:准备测试数据
  3. 步骤三:创建并运行Flink作业
  4. (可选)步骤四:查看作业提交日志和作业信息

步骤一:准备环境

在E-MapReduce上创建Flink模式的Dataflow集群,详情请参见创建集群
说明 本文以EMR-3.39.1版本的集群为例。

步骤二:准备测试数据

在创建Flink作业前,您需要在OSS上传测试数据。本示例上传一个test.txt文件,文件内容为Nothing is impossible for a willing heart. While there is a life, there is a hope~

  1. 登录 OSS管理控制台
  2. 创建存储空间并上传测试数据文件,详情请参见创建存储空间上传文件
    测试数据的上传路径在后续步骤的代码中会使用,本示例的上传路径为oss://vvr-test/test.txt

步骤三:创建并运行Flink作业

  1. 进入数据开发的项目列表页面。
    1. 通过阿里云账号登录阿里云E-MapReduce控制台
    2. 在顶部菜单栏处,根据实际情况选择地域和资源组
    3. 单击上方的数据开发页签。
  2. 数据开发页面,创建项目,详情请参见项目管理
  3. 新建Flink类型作业。
    1. 在页面左侧,在需要操作的文件夹上单击右键,选择新建作业
    2. 新建作业对话框中,输入作业名称作业描述,从作业类型下拉列表中选择Flink作业类型。
    3. 单击确定
  4. 编辑作业内容。
    作业内容示例如下。
    run -m yarn-cluster -yjm 1024 -ytm 1024 -ynm flink-oss-sample /usr/lib/flink-current/examples/batch/WordCount.jar --input oss://vvr-test/test.txt
    示例代码中的关键参数说明如下:
    • /usr/lib/flink-current/examples/batch/WordCount.jar:DataFlow集群内置的Flink WordCount作业,代码详细信息请参见官方代码仓库
    • oss://vvr-test/test.txt:上传到OSS的测试数据。
  5. 作业配置完成后,单击右上方的运行
    运行作业对话框中,选择执行集群为新建的Flink模式的Dataflow集群。
  6. 单击确定
    作业成功运行后,即成功实现了在E-MapReduce集群上运行Flink作业处理OSS数据。日志中会打印如下信息。
    (a,3)
    (for,1)
    (heart,1)
    (hope,1)
    (impossible,1)
    (is,3)
    (life,1)
    (nothing,1)
    (there,2)
    (while,1)
    (willing,1)
    =================JOB OUTPUT END=================

(可选)步骤四:查看作业提交日志和作业信息

如果需要定位作业失败的原因或了解作业的详细信息,则您可以查看作业的日志和作业信息。

  1. 查看作业提交日志。
    当前提交日志支持在E-MapReduce控制台查看,也支持在SSH客户端查看。
    • 提交作业后,您可以在E-MapReduce控制台的运行记录页签,单击待查看作业所在行的详情Flink作业执行记录

      单击提交日志页签,可以查看详细的日志信息。

      Flink作业日志
    • 通过SSH客户端登录到header节点,查看提交的日志信息。

      默认情况下,根据Flink的log4j配置(详情请参见/etc/ecm/flink-conf/log4j-yarn-session.properties),Flink客户端的提交日志会保存在/mnt/disk1/log/flink/flink-{user}-client-{hostname}.log

      其中,user为提交Flink作业的用户,hostname为提交作业所在的节点。以root用户在emr-header-1节点提交Flink作业为例,日志的路径为/mnt/disk1/log/flink/flink-flink-historyserver-0-emr-header-1.cluster-126601.log

  2. 查看作业信息。
    通过Yarn UI可以查看Flink作业的信息。访问Yarn UI有SSH隧道和Knox两种方式,SSH隧道方式请参见通过SSH隧道方式访问开源组件Web UI,Knox方式请参见Knox访问链接与端口。下面以Knox方式为例进行介绍。
    1. 在E-MapReduce控制台的访问链接与端口页面中,单击Yarn UI后的链接。
      YARN UI链接
    2. 在Hadoop控制台,单击作业的ID
      查看作业运行详情。Hadoop控制台>Flink作业列表
      详细信息如下。Hadoop控制台>Flink作业详情
    3. 如果您需要查看运行中的Flink作业,则可以在作业详情页面,单击Tracking URL后面的链接,进入Flink Dashboard查看。
    4. 作业运行结束后,您可以查看所有已经完成的作业列表和日志。