本文介绍如何在Dataflow集群中运行Flink作业来消费OSS数据。
背景信息
本文示例使用的是EMR数据开发功能,目前已不推荐使用,因为该功能在2022年2月21日21点停止更新。
如果您之前未使用过数据开发功能,推荐您通过EMR Studio进行数据开发,详情请参见EMR Studio概述。
如果您之前使用过数据开发功能,建议您尽快迁移到EMR Studio进行数据开发,详情请参见EMR数据开发停止更新公告。
前提条件
- 已开通E-MapReduce服务和OSS服务。
- 已完成云账号的授权,详情请参见角色授权。
步骤一:准备环境
在E-MapReduce上创建Flink模式的Dataflow集群,详情请参见
创建集群。
步骤二:准备测试数据
在创建Flink作业前,您需要在OSS上传测试数据。本示例上传一个test.txt文件,文件内容为Nothing is impossible for a willing heart. While there is a life, there is a hope~
。
- 登录 OSS管理控制台。
- 创建存储空间并上传测试数据文件,详情请参见创建存储空间和上传文件。
测试数据的上传路径在后续步骤的代码中会使用,本示例的上传路径为oss://vvr-test/test.txt。
步骤三:创建并运行Flink作业
- 进入数据开发的项目列表页面。
- 通过阿里云账号登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 单击上方的数据开发页签。
- 在数据开发页面,创建项目,详情请参见项目管理。
- 新建Flink类型作业。
- 在页面左侧,在需要操作的文件夹上单击右键,选择新建作业。
- 在新建作业对话框中,输入作业名称和作业描述,从作业类型下拉列表中选择Flink作业类型。
- 单击确定。
- 编辑作业内容。
作业内容示例如下。
run -m yarn-cluster -yjm 1024 -ytm 1024 -ynm flink-oss-sample /usr/lib/flink-current/examples/batch/WordCount.jar --input oss://vvr-test/test.txt
示例代码中的关键参数说明如下:
- /usr/lib/flink-current/examples/batch/WordCount.jar:DataFlow集群内置的Flink WordCount作业,代码详细信息请参见官方代码仓库。
- oss://vvr-test/test.txt:上传到OSS的测试数据。
- 作业配置完成后,单击右上方的运行。
在运行作业对话框中,选择执行集群为新建的Flink模式的Dataflow集群。
- 单击确定。
作业成功运行后,即成功实现了在E-MapReduce集群上运行Flink作业处理OSS数据。日志中会打印如下信息。
(a,3)
(for,1)
(heart,1)
(hope,1)
(impossible,1)
(is,3)
(life,1)
(nothing,1)
(there,2)
(while,1)
(willing,1)
=================JOB OUTPUT END=================
(可选)步骤四:查看作业提交日志和作业信息
如果需要定位作业失败的原因或了解作业的详细信息,则您可以查看作业的日志和作业信息。
- 查看作业提交日志。
当前提交日志支持在E-MapReduce控制台查看,也支持在SSH客户端查看。
- 提交作业后,您可以在E-MapReduce控制台的运行记录页签,单击待查看作业所在行的详情。

单击提交日志页签,可以查看详细的日志信息。

- 通过SSH客户端登录到header节点,查看提交的日志信息。
默认情况下,根据Flink的log4j配置(详情请参见/etc/ecm/flink-conf/log4j-yarn-session.properties),Flink客户端的提交日志会保存在/mnt/disk1/log/flink/flink-{user}-client-{hostname}.log。
其中,user为提交Flink作业的用户,hostname为提交作业所在的节点。以root用户在emr-header-1节点提交Flink作业为例,日志的路径为/mnt/disk1/log/flink/flink-flink-historyserver-0-emr-header-1.cluster-126601.log。
- 查看作业信息。
- 在E-MapReduce控制台的访问链接与端口页面中,单击Yarn UI后的链接。
- 在Hadoop控制台,单击作业的ID。
查看作业运行详情。

详细信息如下。

- 如果您需要查看运行中的Flink作业,则可以在作业详情页面,单击Tracking URL后面的链接,进入Flink Dashboard查看。
- 作业运行结束后,您可以查看所有已经完成的作业列表和日志。