本文汇总了DataFlow集群使用时的常见问题。
集群使用与运维:
作业问题:
报错Multiple factories for identifier '...' that implement '...' found in the classpath,该如何处理?
报错java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out,该如何处理?
报错java.lang.OutOfMemoryError: GC overhead limit exceeded,该如何处理?
报错Exception in thread "main" java.lang.NoSuchFieldError: DEPLOYMENT_MODE,该如何处理?
集群中的日志在哪里?如何查看?
根据以下两种情况查看日志:
如果Flink集群的JobManager已退出,可以在集群机器中通过命令yarn logs -applicationId application_xxxx_yy拉取到本地进行查看,也可以在YARN的Web UI中访问结束的作业的日志链接在网页端进行查看。
如果Flink集群的JobManager仍在运行。
可以通过访问对应的Flink Web UI进行查看。
使用命令行命令,通过yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log查看JobManager日志,通过yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log来查看TaskManager日志。
作业JAR包和集群内Flink的JAR包存在冲突
该问题发生时,您一般可以在作业日志中看到类似报错NoSuchFieldError/NoSuchMethodError/ClassNotFoundException
等。您可以通过以下步骤排查和解决:
定位引起冲突的依赖类。根据报错中的异常类,您可以找到该类所在的依赖JAR,然后在作业JAR的pom.xml所在目录运行mvn dependency:tree查看依赖树,判断该类是如何被引入的。
排除引起冲突的依赖类。
如果是在pom.xml中错误设置了JAR包的Scope,则可以修改Scope为Provided来将对应JAR包排除。
如果确实需要使用该异常类所在的JAR,则可通过添加Exclude来排除特定类。
如果确实需要使用该异常类,无法更换为集群内对应版本的类,可以通过Maven Shade Plugin对该类进行Shade。
此外,如果Classpath中存在多个版本的JAR包,作业实际使用的Class版本和类的加载顺序有关,为了确认某个类具体是从哪个JAR加载而来,可以在flink-conf.yaml中设置JVM参数env.java.opts: -verbose:class或者通过指定动态参数-Denv.java.opts="-verbose:class"来打印加载的类及其来源。
说明对于JobManager或TaskManager来说,上述信息会打印到
jobmanager.out
或taskmanager.out
中。
DataFlow集群外的机器,如何提交作业到DataFlow集群?
您可以根据以下步骤,通过DataFlow集群外的机器,提交作业到DataFlow集群:
确保DataFlow集群和DataFlow集群外的机器网络互通。
配置提交Flink作业的客户端的Hadoop YARN环境。
DataFlow集群中的Hadoop YARN的软件安装目录是/opt/apps/YARN/yarn-current,配置文件的目录是/etc/taihao-apps/hadoop-conf/,您需要将yarn-current目录及hadoop-conf目录下载到提交Flink作业的客户端上。
然后,在提交Flink作业的客户端上,配置如下环境变量。
export HADOOP_HOME=/path/to/yarn-current && \ export PATH=${HADOOP_HOME}/bin/:$PATH && \ export HADOOP_CLASSPATH=$(hadoop classpath) && \ export HADOOP_CONF_DIR=/path/to/hadoop-conf
重要Hadoop的配置文件中(例如yarn-site.xml等)配置的服务地址(例如ResourceManager等),使用的是全域名(FQDN,Fully Qualified Domain Name)。例如,master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com。因此,如果您通过集群外的机器提交作业,需要能够解析这些FQDN或者将配置文件中的FQDN修改成对应的IP地址。
完成以上配置后,您在集群外的机器上启动Flink作业(例如,运行命令
flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar
)后,应当能在DataFlow集群的YARN Web UI中看到相应的Flink作业。
在DataFlow集群外机器上,如何解析DataFlow集群中的hostname?
您可以通过以下方式,在DataFlow集群外的机器上,解析DataFlow集群中的hostname:
修改提交Flink作业的客户端上的/etc/hosts文件,添加相应的hostname到IP的映射。
通过PrivateZone提供的DNS解析服务。
如果您有自己的域名解析服务,也可以通过如下方式,配置JVM的运行参数,使用自己的域名解析服务。
env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"
如何查看Flink作业的运行状态?
通过EMR控制台查看。
EMR支持Knox,可以通过公网方式访问YARN、Flink等的Web UI界面,Flink的Web UI可以通过YARN进行查看,详细信息请参见通过Web UI查看作业状态。
通过SSH隧道的方式查看,详情信息请参见通过SSH隧道方式访问开源组件Web UI。
直接访问YARN REST接口。
curl --compressed -v -H "Accept: application/json" -X GET "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"
说明需确保安全组开放了8443和8088端口,可以访问到YARN的REST接口或者DataFlow集群和访问的节点处于同一内网中。
如何访问Flink作业的日志?
对于运行中的作业,可以通过Flink Web UI,访问Flink作业的日志。
对于已经运行结束的作业,可以通过Flink History Server查看作业的统计信息或者通过命令
yarn logs -applicationId application_xxxx_yyyy
访问作业的日志,已经运行结束的作业的日志默认保存在HDFS集群的hdfs:///tmp/logs/$USERNAME/logs/目录下。
如何访问DataFlow集群中的Flink HistoryServer?
DataFlow集群会默认在master-1-1节点(即header机器组的第一台机器)的18082端口启动Flink HistoryServer,用于收集已运行结束的作业的统计信息 ,具体访问方式如下:
配置安全组规则,开放master-1-1节点的18082端口的访问权限。
直接访问http://$master-1-1-ip:18082。
Flink HistoryServer目前不存储已完成作业的具体日志,如需查看日志请通过YARN API或者YARN的WebUI查询。
如何使用DataFlow集群中所支持的商业化Connector?
DataFlow集群提供了很多商业化Connector,例如Hologres、SLS、MaxCompute、DataHub、Elasticsearch和ClickHouse等,您在Flink作业中除了可以使用开源的Connector之外,还可以使用这些商业化Connector。下面以Hologres Connector为例,介绍如何在Flink作业中使用DataFlow集群所携带的商业化Connector。
作业开发
下载DataFlow集群所携带的商业化Connector的JAR包(位于DataFlow集群的/opt/apps/FLINK/flink-current/opt/connectors目录下),并通过如下方式将商业化Connector安装在本地Maven环境中。
mvn install:install-file -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.13-vvr-4.0.7 -Dpackaging=jar
在项目的pom.xml文件中添加以下依赖。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>1.13-vvr-4.0.7</version> <scope>provided</scope> </dependency>
运行作业
方式一:
拷贝Hologres Connector到一个独立的目录。
hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/ hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jar
提交作业时,命令中添加以下参数。
-D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
方式二:
拷贝Hologres Connector到提交Flink作业的客户端的/opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar目录下,与DataFlow集群中的目录结构保持一致。
提交作业时,命令中添加以下参数。
-C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
方式三:将Hologres Connector打包到作业的JAR包中。
如何使用GeminiStateBackend?
DataFlow集群提供了企业版状态后端(即GeminiStateBackend),性能是开源版本的3~5倍。DataFlow集群在配置文件中默认使用GeminiStateBackend,关于GeminiStateBackend的更多高级配置,详情请参见企业级状态后端存储配置。
如何使用开源的StateBackend?
DataFlow集群在配置文件中默认使用企业版状态后端(即GeminiStateBackend),您如果想针对单个作业使用开源的状态后端(例如rocksdb),可以通过-D指定,例如:
flink run-application -t yarn-application -D state.backend=rocksdb /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
或者如果您想让上述修改对后续作业生效,在EMR控制台,修改state.backend参数的值为您想使用的状态的后端(例如rocksdb)。单击保存,然后单击部署客户端配置:
客户端日志在哪里?如何查看?
在EMR的集群环境中我们配置了FLINK_LOG_DIR环境变量来指明Flink客户端的日志存放位置。它的默认值是/var/log/taihao-apps/flink(在3.43.0之前的版本中默认是/mnt/disk1/log/flink)。您如果需要查看客户端的完整日志(如SQL-Client的日志)可以在该目录下查看对应文件。
通过flink run命令运行作业时,作业的参数没有生效
在通过命令行命令运行Flink作业时,Flink作业的参数需要放在Flink作业JAR包的后面,例如flink run -d -t yarn-per-job test.jar arg1 arg2。
报错Multiple factories for identifier '...' that implement '...' found in the classpath,该如何处理?
报错原因
表明在Classpath中找到了某个Connector的多个实现。原因通常为在作业JAR添加了相关Connector依赖,同时又手动在$FLINK_HOME/ib目录下放入了相同的Connector依赖,导致了依赖冲突。
解决方案
解决思路为去除重复的依赖,详细步骤可以参考作业JAR包和集群内Flink的JAR包存在冲突问题排查。
如何开启Flink作业JobManager的HA?
DataFlow集群基于YARN模式部署并运行Flink作业,您可以按照社区中的Configuration开启JobManager的HA,从而使Flink作业可以更稳定的运行。配置示例如下所示。
high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
开启HA后,默认情况下,JobManager在失败后最多重启一次。如果您想让JobManager重启多次,还需要设置YARN的yarn.resourcemanager.am.max-attempts参数和Flink的yarn.application-attempts参数,详情请参见Flink官方文档。除此之外,根据经验,通常还需要调整yarn.application-attempt-failures-validity-interval参数的值,将其从默认的10000毫秒(10秒)调整到一个比较大的值,例如调大为300000毫秒(5分钟),防止JobManager不停的重启。
如何查看Flink作业的监控指标?
您可以在EMR控制台目标集群的集群监控页面,单击指标监控。
在Dashboard下拉框中选择FLINK。
选择待查看作业对应的Application ID和Job ID,即可展示Flink作业的各项监控指标。
监控指标详情,请参见Flink指标。
说明仅当集群中已有运行的Flink作业时,才会有可供选择的Application ID和Job ID。
部分指标只有配置了相应上下游的Source和Sink才会有输出信息。例如,sourceIdleTime。
如何处理上下游存储(Connector)问题?
关于上下游存储方面的常见问题,请参见上下游存储。
通过DataFlow集群运行Flink作业免密读写OSS时报错,该如何处理?
您需要根据具体的报错信息,进行相应的处理:
报错提示
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
。问题原因:DataFlow集群目前是通过内置的JindoSDK来支持免密读写OSS,并支持StreamingFileSink等API的,不需要再按照社区文档进行额外的配置,否则会由于依赖冲突导致此报错。
处理方法:检查您集群内提交作业的机器的$FLINK_HOME/plugins目录,查看是否放置了oss-fs-hadoop目录。如果放置了该目录,请删除该目录后重新提交作业。
报错提示
Could not find a file system implementation for scheme 'oss'. The scheme is directly supported by Flink through the following plugin: flink-oss-fs-hadoop. ....
。问题原因:EMR-3.40及之前的版本的EMR集群中,master机器组内非master-1-1机器上可能缺少Jindo相关的JAR包。
处理方法:
EMR-3.40及之前的版本:检查您的集群内提交作业的机器的$FLINK_HOME/lib目录下是否有Jindo相关的JAR包,例如jindo-flink-4.0.0.jar。如果没有Jindo相关的JAR包,您可以在集群中运行以下命令将Jindo相关的JAR包,拷贝到$FLINK_HOME/lib目录后重新提交作业。
cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib
EMR-3.40之后版本:优化了支持的方式,即使$FLINK_HOME/lib目录下没有Jindo相关的JAR包,读写OSS的作业也是可以正常运行的。请使用Flink on YARN的相关命令来启动作业。
报错java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out,该如何处理?
报错原因
直接原因是TaskManager心跳超时,具体原因可以在TaskManager日志中查看报错信息进行定位。除此之外,还存在TaskManager堆内存大小有限或者作业代码存在内存泄露导致的内存溢出错误,例如报错java.lang.OutOfMemoryError: GC overhead limit exceeded,该如何处理?。
解决方案
上述报错遇到该类报错时需要您调大内存或者分析作业内存使用情况来进一步定位原因。
报错java.lang.OutOfMemoryError: GC overhead limit exceeded,该如何处理?
报错原因
该报错代表为作业设定的内存不够,导致GC超时。常见原因为代码(如UDF)发生内存泄露或者内存大小确实不能满足业务需求。
解决方案
您可在重新运行问题作业前通过-D方式指定JVM参数,保存OutOfMemoryError发生时的现场
-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"
。在flink-conf.yaml中添加参数
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof
来配置OutOfMemoryError发生时进行heap dump。
待作业再次报错之后,您可针对HeapDumpPath指定的heap dump文件进行分析(例如使用MAT工具或者jvisualvm工具),确定问题根因。
Flink UI上作业只有一个Operator,并且显示Records Received为0,该如何处理?
这是正常现象,Flink的Records Received相关指标用于描述不同Operator之间的数据通信,当作业被优化为一个Operator时,该指标值恒为0。
Flink作业如何开启火焰图?
火焰图(Flame Graph)用于可视化进程中各个方法的CPU消耗,协助用户解决性能瓶颈。Flink 1.13版本开始支持火焰图功能,但为了避免火焰图对生产环境中作业的影响,默认关闭该功能。如果您需要借助火焰图功能对作业性能进行分析,可以在EMR控制台Flink服务配置页签的flink-conf.yaml中新增参数为rest.flamegraph.enabled,参数值为true的配置项。新增配置项的具体操作,请参见管理配置项。
关于火焰图的更多介绍,请参见Flame Graphs。
报错Exception in thread "main" java.lang.NoSuchFieldError: DEPLOYMENT_MODE,该如何处理?
报错原因
您的作业的JAR中直接或者间接引入了与集群中Flink版本不兼容的flink-core依赖,造成了依赖冲突。
解决方案
在pom.xml中添加以下信息,将flink-core依赖的
scope
设置为provided
来修复该问题。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <!-- Change to your own flink version --> <version>1.16.1</version> <scope>provided</scope> </dependency>
说明version
需要修改为您实际使用的Flink版本。如果您想进一步定位引入该依赖的原因,可以参见作业JAR包和集群内Flink的JAR包存在冲突。