本文汇总了DataFlow集群使用时的常见问题。

DataFlow集群外的机器,如何提交作业到DataFlow集群?

您可以根据以下步骤,通过DataFlow集群外的机器,提交作业到DataFlow集群:
  1. 确保DataFlow集群和DataFlow集群外的机器网络互通。
  2. 配置提交Flink作业的客户端的Hadoop环境。

    DataFlow集群中的Hadoop的软件安装目录是/usr/lib/hadoop-current,配置文件的目录是/etc/ecm/hadoop-conf,您需要将hadoop-current目录及hadoop-conf目录下载到提交Flink作业的客户端上。

    然后,在提交Flink作业的客户端上,配置如下环境变量。

    export HADOOP_HOME=/path/to/hadoop-current && \
    export PATH=${HADOOP_HOME}/bin/:$PATH && \
    export HADOOP_CLASSPATH=$(hadoop classpath) && \
    export HADOOP_CONF_DIR=/path/to/hadoop-conf
    注意 Hadoop的配置文件中(例如yarn-site.xml等)配置的服务地址(例如ResourceManager等),使用的是hostname,例如emr-header-1.cluster*****等。因此,如果您通过集群外的机器提交作业,需要能够解析这些hostname或者将配置文件中的hostname修改成对应的IP地址。

在DataFlow集群外机器上,如何解析DataFlow集群中的hostname?

您可以通过以下方式,在DataFlow集群外的机器上,解析DataFlow集群中的hostname:
  • 修改提交Flink作业的客户端上的/etc/hosts文件,添加相应的hostname到IP的映射。
  • 通过什么是PrivateZone提供的DNS解析服务。
    如果您有自己的域名解析服务,也可以通过如下方式,配置JVM的运行参数,使用自己的域名解析服务。
    env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"

如何查看Flink作业的运行状态?

  • 通过EMR控制台查看。

    EMR支持Knox,可以通过公网方式访问YARN、Flink等的Web UI界面,Flink的Web UI可以通过YARN进行查看,详细信息请参见通过Web UI查看作业状态

  • 通过SSH隧道的方式查看,详情信息请参见通过SSH隧道方式访问开源组件Web UI
  • 直接访问YARN REST接口。
    curl --compressed -v  -H "Accept: application/json" -X GET "http://emr-header-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"
    说明 需确保安全组开放了8443和8088端口,可以访问到YARN的REST接口或者DataFlow集群和访问的节点处于同一内网中。

如何访问Flink作业的日志?

  • 对于运行中的作业,可以通过Flink Web UI,访问Flink作业的日志。
  • 对于已经运行结束的作业,可以通过Flink History Server查看作业的统计信息或者通过命令yarn logs -applicationId application_xxxx_yyyy访问作业的日志,已经运行结束的作业的日志默认保存在HDFS集群的hdfs:///tmp/logs/$USERNAME/logs/目录下。

如何访问DataFlow集群中的Flink HistoryServer?

DataFlow集群会默认在emr-header-1节点的8082端口启动Flink HistoryServer,用于收集已运行结束的作业的统计信息,具体访问方式如下:
  1. 配置安全组规则,开放emr-header-1节点的8082端口的访问权限。
  2. 直接访问http://$emr-header-1-ip:8082

如何使用DataFlow集群中所支持的商业化Connector?

DataFlow集群提供了很多商业化Connector,例如Hologres、SLS、MaxCompute、DataHub、Elasticsearch、ClickHouse、Hudi和Hive等,您在Flink作业中除了可以使用开源的Connector之外,还可以使用这些商业化Connector。下面以Hologres Connector为例,介绍如何在Flink作业中使用DataFlow集群所携带的商业化Connector。

  • 作业开发
    1. 下载DataFlow集群所携带的商业化Connector的JAR包(位于DataFlow集群的/usr/lib/flink-current/opt/connectors目录下),并通过如下方式将商业化Connector安装在本地Maven环境中。
      mvn install:install-file -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.13-vvr-4.0.7 -Dpackaging=jar
    2. 在项目的pom.xml文件中添加以下依赖。
      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-hologres</artifactId>
          <version>1.13-vvr-4.0.7</version>
          <scope>provided</scope>
      </dependency>
  • 运行作业
    • 方式一:
      1. 拷贝Hologres Connector到一个独立的目录。
        hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/
        hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar  hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jar
      2. 提交作业时,命令中添加以下参数。
        -D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
    • 方式二:
      1. 拷贝Hologres Connector到提交Flink作业的客户端的/usr/lib/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar目录下,与DataFlow集群中的目录结构保持一致。
      2. 提交作业时,命令中添加以下参数。
        -C file:///usr/lib/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
    • 方式三:将Hologres Connector打包到作业的JAR包中。

如何使用GeminiStateBackend?

DataFlow集群提供了企业版StateBackend(即GeminiStateBackend),性能是开源版本的3~5倍。关于如何使用GeminiStateBackend以及更详细的信息,请参见Flink(VVR)作业配置

如何开启Flink作业JobManager的HA?

DataFlow集群基于YARN模式部署并运行Flink作业,您可以按照社区中的Configuration开启JobManager的HA,从而使Flink作业可以更稳定的运行。配置示例如下所示。
high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
注意 开启HA后,默认情况下,JobManager在失败后最多重启一次。如果您想让JobManager重启多次,还需要设置YARN的yarn.resourcemanager.am.max-attempts参数和Flink的yarn.application-attempts参数,详情请参见Flink官方文档。除此之外,根据经验,通常还需要调整yarn.application-attempt-failures-validity-interval参数的值,将其从默认的10000毫秒(10秒)调整到一个比较大的值,例如调大为300000毫秒(5分钟),防止JobManager不停的重启。

如何处理上下游存储(Connector)问题?

关于上下游存储方面的常见问题,请参见上下游存储