本文为您介绍实时计算Flink版上下游存储方面的常见问题。

Flink如何获取JSON数据?

  • 如果您需要获取普通JSON数据,方法详情请参见JSON Format
  • 如果您需要获取嵌套的JSON数据,则源表DDL中使用ROW格式定义JSON Object,结果表DDL中定义好要获取的JSON数据对应的Key,在DML语句中设置好Key获取的方式,就可以获取到对应的嵌套Key的Value值。代码示例如下:
    • 测试数据
      {
          "a":"abc",
          "b":1,
          "c":{
              "e":["1","2","3","4"],
              "f":{"m":"567"}
          }
      }
    • 源表DDL定义
      CREATE TEMPORARY TABLE `kafka_table` (
        `a` VARCHAR,
         b int,
        `c` ROW<e ARRAY<VARCHAR>,f ROW<m VARCHAR>>  --c是一个JSON Object,对应Flink里面是ROW;e是json list,对应ARRAY。
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'xxx',
        'properties.bootstrap.servers' = 'xxx',
        'properties.group.id' = 'xxx',
        'format' = 'json',
        'scan.startup.mode' = 'xxx'
      );
    • 结果表DDL定义
      CREATE TEMPORARY TABLE `sink` (
       `a` VARCHAR,
        b INT,
        e VARCHAR,
        `m` varchar
      ) WITH (
        'connector' = 'print',
        'logger' = 'true'
      );
    • DML语句
      INSERT INTO `sink`
        SELECT 
        `a`,
        b,
        c.e[1], --Flink从1开始遍历数组,本示例为获取数组中的元素1。如果获取整个数组,则去掉[1]。
        c.f.m
      FROM `kafka_table`;
    • 测试结果测试结果

Flink和Kafka网络连通,但Flink无法消费或者写入数据?

  • 问题原因

    如果Flink与Kafka之间存在代理或端口映射等转发机制,则Kafka客户端拉取Kafka服务端的网络地址为Kafka服务器本身的地址而非代理的地址。此时,虽然Flink与Kafka之间网络连通,但是Flink无法消费或者写入数据。

    Kafka客户端(Flink Kafka Connector)与Kafka服务端(Kafka Broker)之间建立连接分为两个步骤:
    1. Kafka客户端拉取Kafka服务端元信息,包括Kafka服务端所有Broker的网络地址。
    2. 使用第一步拉取下来的网络地址来消费或者写入数据。
  • 排查方法
    通过以下步骤来确认Flink与Kafka之间是否存在代理或端口映射等转发机制:
    1. 使用ZooKeeper命令行工具(zkCli.sh或zookeeper-shell.sh)登录您Kafka使用的ZooKeeper集群。
    2. 根据您的集群实际情况执行正确的命令,来获取您的Kafka Broker元信息。
      通常可以使用get /brokers/ids/0命令来获取Kafka Broker元信息。Kafka的连接地址位于endpoints字段中。endpoint
    3. 使用pingtelnet等命令来测试Endpoint中显示的地址与Flink的连通性。

      如果无法连通,则代表Flink与Kafka之间存在代理或端口映射等转发机制。

  • 解决方案
    • 不使用代理或端口映射等转发机制,直接打通Flink与Kafka之间的网络,使Flink能够直接连通Kafka元信息中显示的Endpoint。
    • 联系Kafka运维人员,将转发地址作为Kafka Broker端的advertised.listeners,以使Kafka客户端拉取的Kafka服务端元信息包含转发地址。
      说明 仅Kafka 0.10.2.0及以上版本支持将代理地址添加到Kafka Broker的Listener中。

    如果您想了解更多的关于该问题的原理和解释,请参见KIP-103:区分内部与外部网络流量Kafka网络连接问题详解

为什么Kafka源表数据基于Event Time的窗口后,不能输出数据?

  • 问题详情

    Kafka作为源表,基于Event Time的窗口后,不能输出数据。

  • 问题原因

    Kafka某个分区没有数据,会影响Watermark的产生,从而导致Kafka源表数据基于Event Time的窗口后,不能输出数据。

  • 解决方案
    1. 确保所有分区都存在数据。
    2. 开启源数据空闲监测功能。在目标作业详情页面右上角,单击编辑后,在页面右侧高级配置面板的更多Flink配置中,添加如下代码后保存生效。
      table.exec.source.idle-timeout: 5
      table.exec.source.idle-timeout参数详情,请参见Configuration

Flink中的Commit Offset有什么作用?

Flink在每次Checkpoint成功时,才会向Kafka提交当前读取Offset。如果未开启Checkpoint,或者Checkpoint设置的间隔过大,在Kafka端可能会查询不到当前读取的Offset。

为什么Flink和Kafka之间的网络是连通的,但是依然会有timeout expired while fetching topic metadata的报错?

Flink和Kafka之间的网络连通并不意味着能读取数据,只有Kafka Broker在bootstrap过程中返回的集群metadata中描述的Endpoint, 才可以连通Flink和Kafka,并读取到Kafka的数据,详情请参见Flink-cannot-connect-to-Kafka。检查办法为:
  1. 使用zkCli.sh或者zookeeper-shell.sh工具登录Kafka使用的Zookeeper。
  2. 执行ls /brokers/ids命令列出所有的Kafka Broker ID。
  3. 使用get /brokers/ids/{your_broker_id}命令查看Broker metadata信息。

    Endpoint信息显示在listener_security_protocol_map中。

  4. 确认Flink是否可以连通该Endpoint。

    如果该Endpoint中使用了域名,请为Flink配置对应的域名解析服务。

分裂或者缩容DataHub Topic后导致Flink作业失败,如何恢复?

如果分裂或者缩容了Flink正在读取的某个Topic,则会导致任务持续出错,无法自行恢复。该情况下需要重新启动(先停止再启动)来使任务恢复正常。

可以删除正在消费的DataHub Topic吗?

不支持删除或重建正在消费的DataHub Topic。

endPoint和tunnelEndpoint是指什么?如果配置错误会产生什么结果?

endPoint和tunnelEndpoint参数说明参见Endpoint。VPC环境中这两个参数如果配置错误可能会导致任务异常,异常情况详情如下:
  • 如果endPoint配置错误,则任务上线停滞在91%的进度。
  • 如果tunnelEndpoint配置错误,则任务运行失败。

启动作业时出现Akka超时报错,但是全量MaxCompute源表和增量MaxCompute获取Metadata速率正常,应该如何处理?

请合并小文件,具体步骤请参见文档MaxCompute什么情况下会产生小文件?如何解决小文件问题?

全量MaxCompute和增量MaxCompute是如何读取MaxCompute数据的?

全量MaxCompute和增量MaxCompute是通过Tunnel读取MaxCompute数据的,读取速度受MaxCompute Tunnel带宽限制。

引用MaxCompute作为数据源,在作业启动后,向已有的分区或者表里追加数据,这些新数据是否能被全量MaxCompute或增量MaxCompute源表读取?

启动Flink作业后,如果正在被Source读取或已经被Source读取完成的表或分区有新的数据追加,则这部分数据不会被读取,而且可能导致作业Failover。

全量MaxCompute和增量MaxCompute源表均使用ODPS DOWNLOAD SESSION读取表数据或者分区数据。新建DOWNLOAD SESSION,服务端会创建一个Index文件,相当于创建DOWNLOAD SESSION一瞬间数据的映射,后续的数据读取以这个映射为基础。因此在新建 DOWNLOAD SESSION 后,追加到MaxCompute表或者分区里的数据,正常流程下是不会被读取的。但如果MaxCompute源表中写入新数据,则会出现两种异常:
  • 产生报错:如果Tunnel在读取数据的过程中写入新数据,则会产生报错ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated.
  • 数据正确性无法保证:如果在Tunnel已经关闭后写入新数据,则这些数据不会被读取。但当作业发生Failover或者暂停后恢复作业,已经读过的数据可能会被重读,新写入的数据可能被读不全。

全量MaxCompute和增量MaxCompute源表作业是否支持暂停作业后修改并发数,再恢复作业?

MaxCompute源表暂时不支持暂停作业后修改并发数,再恢复作业。系统根据并发数,计算每个并发读取哪些分区的哪些数据。此外,每个并发会把消费情况记录至State,以便暂停恢复后或者Failover后,系统可以从上次读取的位置继续读取数据。

如果暂停作业后,修改了全量MaxCompute和增量MaxCompute源表的并发数后再恢复作业,会对作业产生无法预估的影响。因为已经读取的数据可能会被再次读取,没有读的数据可能会被遗漏。

作业启动位点设置了2019-10-11 00:00:00, 为什么启动位点前的分区也会被全量MaxCompute源表读取?

设置启动位点只对消息队列(例如DataHub)类型的数据源有效,对MaxCompute源表无效。Flink作业启动后数据读取的范围如下:
  • 分区表:读取当前所有分区。
  • 非分区表:读取当前存在的数据。

增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?

没有机制可以标志一个分区的数据是否已经完整。目前只要监听到新分区,就会读入。用增量MaxCompute源表读取一个MaxCompute分区表T,分区列是ds,推荐的写入方法为:不创建分区,先执行Insert overwrite table T partition (ds='20191010') ...语句,作业结束且成功后,分区和数据一起可见。
注意 不允许的写入方法为:先创建好分区,例如ds=20191010,再往分区里写数据。增量MaxCompute源表监听到新分区ds=20191010,立刻读入,如果此时该分区还有数据没有写完,就会漏读数据。

报错:ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'

  • 报错详情
    作业运行过程中会在Failover页面或TaskManager.log页面报错,报错信息如下。
    ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
  • 报错原因

    MaxCompute DDL定义中填写的用户身份信息无法访问MaxCompute。

  • 解决方案

    通过阿里云账号、RAM用户账号或RAM角色认证用户身份,详情请参见用户认证。如果您有其他疑问,请提交工单咨询,产品名称选择为MaxCompute。

不支持定义Watermark,那如何进行窗口聚合?

如果您需要在MySQL CDC源表上进行窗口聚合,可以考虑采用非窗口聚合的方式,即将时间字段转换成窗口值,然后根据窗口值进行GROUP BY聚合。例如,统计每个店铺每分钟的订单数和销量,代码示例如下。
SELECT shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm'), COUNT(*), SUM(price) FROM order_mysql_cdc GROUP BY shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm')

如何跳过Snapshot阶段,只从变更数据开始读取?

可以通过WITH参数debezium.snapshot.mode来控制,您可以设置为:
  • never:在启动时,不读取数据库的快照,而是直接从变更的最开始位置读取。但需要注意MySQL的变更旧数据可能会被自动清理,因此不能保证变更数据中包含了全量的数据,读取的数据不完整。
  • schema_only:如果你不需要保证数据的一致性,只关心作业启动后数据库的新增变更,则可以设置为schema_only,仅快照schema,不快照数据,从变更的最新数据开始读取。

如何读取一个分库分表的MySQL数据库?

如果MySQL是一个分库分表的数据库,分成了user_00、user_02和user_99等多个表,且所有表的schema一致。则可以通过table-name选项,指定一个正则表达式来匹配读取多张表,例如设置table-name为user_.*,监控所有user_前缀的表。database-name选项也支持该功能,但需要所有的表schema一致。

全表读取阶段效率慢、存在反压,应该如何解决?

可能是下游节点处理太慢导致反压了。因此您需要先排查下游节点是否存在反压。如果存在,则需要先解决下游节点的反压问题。您可以通过以下方式处理:
  • 增加并发数。
  • 开启minibatch等聚合优化参数(下游聚合节点)。

报错:com.github.shyiko.mysql.binlog.network.ServerException

  • 报错详情
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
  • 报错原因

    MySQL CDC在启动全量读取之前记录Binlog位点,等全量读取结束后再从Binlog位点读取增量数据。该报错一般是因为全量读取耗时太长,超过了MySQL Binlog的淘汰周期,导致MySQL Binlog位点的数据已经被 MySQL清理掉了。

  • 解决方案
    查看MySQL Binlog的清理规则,例如时间、存储空间和文件个数等,建议保留Binlog一天以上,RDS Binlog详情请参见删除本地日志(Binlog)
    说明 :VVR 4.0.8及以上版本,MySQL CDC支持并发读取全量数据,可以提高全量数据的读取速度,针对该问题会起到缓解作用。

报错:The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'

  • 报错详情
    MySQL CDC源表在Flink 1.13版本语法检查时报错详情如下。
    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • 报错原因
    在MySQL CDC DDL WITH参数中,您未设置主键(Primary Key)信息。因为Flink 1.13版本,新增支持按PK分片,进行多并发读取数据的功能。
    说明 Flink 1.12及以下版本,MySQL CDC源表仅支持单并发读取数据。
  • 解决方案
    • 如果在Flink 1.13版本,您需要多并发读取MySQL数据,则在DDL中添加PK信息。
    • 如果在Flink 1.13版本,您不需要多并发读取MySQL数据,则在DDL中添加scan.incremental.snapshot.enabled 参数,且把该参数值设置为false,无需设置PK信息。

Flink的结果数据写入RDS表,是按主键更新的,还是生成1条新的记录?

如果在DDL中定义了主键,会采用INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;的方式更新记录,即对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。如果DDL中没有声明PRIMARY KEY,则会用insert into方式插入记录,追加数据。

使用RDS表中的唯一索引进行GROUP BY时需要注意什么?

  • 需要在作业中的GROUP BY中声明该唯一索引。
  • RDS中只有一个自增主键,Flink作业中不能声明为PRIMARY KEY。

ClickHouse结果表是否支持回撤更新数据?

当Flink结果表的DDL上指定了Primary Key时,则支持回撤更新数据。因为ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统,对UPDATE和DELETE的支持不够完善。如果在Flink DDL上指定了Primary Key,则尝试使用ALTER TABLE UPDATE和DELETE来更新和删除数据,因此会造成性能显著下降。

如何在控制台查看print数据结果?

查看print数据结果的步骤,有以下两种方式:
  • 在Flink开发控制台查看:
    1. 在目标作业Task Manager页签,单击Path, ID
    2. logs页签,查看print数据结果。
  • 跳转到Flink UI界面查看:
    1. 在目标作业作业总览页签,单击Flink UI
    2. 单击Task Managers
    3. 单击Path, ID
    4. logs页签,查看print数据结果。

维表进行JOIN时,如果查询不到数据,应该如何处理?

检查DDL语句和物理表中的Schema类型和名称是否一致。

max_pt()和max_pt_with_done()的区别是什么?

max_pt()选取的是所有分区中字典序最大的分区。max_pt_with_done()选取的是所有分区中字典序最大,且伴随有.done分区的分区。如果分区列表示例如下:
  • ds=20190101
  • ds=20190101.done
  • ds=20190102
  • ds=20190102.done
  • ds=20190103
max_pt()max_pt_with_done()的区别如下:
  • `partition`='max_pt_with_done()'匹配的分区是ds=20190102
  • `partition`='max_pt()',匹配的分区是ds=20190103