EMR-3.27.x及之前版本使用Flink社区开源版本,EMR-3.27.x之后版本使用完全兼容开源Flink的企业版(VVR)。本文介绍如何配置Flink(VVR)类型的作业。
背景信息
Flink企业版由Apache Flink创始团队官方出品,拥有全球统一商业化品牌。
VVR提供企业版StateBackend,性能是开源版本的3~5倍。在EMR Hadoop集群中,您可使用VVR引擎和EMR数据开发功能提交作业。VVR支持开源Flink 1.10版本,默认使用商业GeminiStateBackend,具备以下特性:
采用创新的数据结构,提高随机查询、降低读磁盘I/O的性能。
优化Cache策略,内存充足情况下热数据不落盘,并且Compaction后Cache不会失效。
完全使用Java实现,消除RocksDB的JNI开销。
使用堆外内存,并基于GeminiDB的特点实现高效的内存分配器,消除JVM GC带来的影响。
支持异步增量Checkpoint,同步阶段只进行内存索引的拷贝,相较于RocksDB可以避免I/O带来的抖动。
支持Local Recovery和Timer落盘。
如果您想使用GeminiStateBackend,请不要在代码中指定StateBackend类型。使用GeminiStateBackend启动时,TM的内存不少于1728 MB。
Flink中Checkpoint和StateBackend的基础配置同样适用于GeminiStateBackend,具体请参见Configuration。
您可以根据具体需求配置参数,部分特殊参数设置如下。
参数 | 说明 |
state.backend.gemini.memory.managed | 默认值为true,表示将自动根据Managed Memory以及Task Slot数计算每个Backend的内存。取值如下:
|
state.backend.gemini.offheap.size | 默认值为2 GB,当state.backend.gemini.memory.managed为false时,设置每个Backend的内存。 |
state.backend.gemini.local.dir | 表示GeminiDB本地数据文件的存放目录。 |
state.backend.gemini.timer-service.factory | 默认值为HEAP,表示timer-service state的存储位置。取值如下:
|
前提条件
已创建Hadoop集群。
已创建项目。
已获取作业所需的资源,以及作业需要处理的数据文件,例如,JAR包、数据文件名称及其保存路径。
说明建议使用OSS维护要运行的JAR包。
如果使用本地路径,请使用文件的绝对路径。
操作步骤
- 进入数据开发的项目列表页面。
- 通过阿里云账号登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 单击上方的数据开发页签。
- 单击待编辑项目所在行的作业编辑。
新建Flink类型作业。
- 在页面左侧,在需要操作的文件夹上单击右键,选择新建作业。
在新建作业对话框中,输入作业名称和作业描述,从作业类型下拉列表中选择Flink作业类型。
- 单击确定。
编辑作业内容。
在作业内容中,填写提交该作业需要提供的命令行参数。
Flink类型作业支持JAR包形式的Flink Datastream、Table和SQL作业,示例如下。
run -m yarn-cluster -yjm 1024 -ytm 2048 ossref://path/to/oss/of/WordCount.jar --input oss://path/to/oss/to/data --output oss://path/to/oss/to/result
EMR-3.x版本自EMR-3.28.2版本开始,Flink类型作业同时支持PyFlink作业,示例如下。
run -m yarn-cluster -yjm 1024 -ytm 2048 -py ossref://path/to/oss/of/word_count.py
PyFlink作业其它可用参数,请参见Apache Flink官方文档。
单击保存。
说明您可以根据集群的版本来访问Flink的Web UI:
EMR-3.29.0之前版本
仅支持通过SSH隧道方式访问Web UI。
EMR-3.29.0及后续版本
(推荐)您可以通过EMR控制台的方式访问Web UI。
您可以通过SSH隧道方式访问Web UI。