本文为您介绍实时计算Flink版上下游存储方面的常见问题。
- Kafka源表
- DataHub源表
- 全量和增量MaxCompute源表
- endPoint和tunnelEndpoint是指什么?如果配置错误会产生什么结果?
- 启动作业时出现Akka超时报错,但是全量MaxCompute源表和增量MaxCompute获取Metadata速率正常,应该如何处理?
- 全量MaxCompute和增量MaxCompute是如何读取MaxCompute数据的?
- 引用MaxCompute作为数据源,在作业启动后,向已有的分区或者表里追加数据,这些新数据是否能被全量MaxCompute或增量MaxCompute源表读取?
- 全量MaxCompute和增量MaxCompute源表作业是否支持暂停作业后修改并发数,再恢复作业?
- 作业启动位点设置了2019-10-11 00:00:00, 为什么启动位点前的分区也会被全量MaxCompute源表读取?
- 增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?
- 报错:ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
- MySQL CDC源表
- RDS MySQL结果表
- ClickHouse结果表
- Print结果表
- Tablestore维表
- MaxCompute维表
Flink如何获取JSON数据?
- 如果您需要获取普通JSON数据,方法详情请参见JSON Format。
- 如果您需要获取嵌套的JSON数据,则源表DDL中使用ROW格式定义JSON Object,结果表DDL中定义好要获取的JSON数据对应的Key,在DML语句中设置好Key获取的方式,就可以获取到对应的嵌套Key的Value值。代码示例如下:
- 测试数据
{ "a":"abc", "b":1, "c":{ "e":["1","2","3","4"], "f":{"m":"567"} } }
- 源表DDL定义
CREATE TEMPORARY TABLE `kafka_table` ( `a` VARCHAR, b int, `c` ROW<e ARRAY<VARCHAR>,f ROW<m VARCHAR>> --c是一个JSON Object,对应Flink里面是ROW;e是json list,对应ARRAY。 ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'xxx', 'properties.group.id' = 'xxx', 'format' = 'json', 'scan.startup.mode' = 'xxx' );
- 结果表DDL定义
CREATE TEMPORARY TABLE `sink` ( `a` VARCHAR, b INT, e VARCHAR, `m` varchar ) WITH ( 'connector' = 'print', 'logger' = 'true' );
- DML语句
INSERT INTO `sink` SELECT `a`, b, c.e[1], --Flink从1开始遍历数组,本示例为获取数组中的元素1。如果获取整个数组,则去掉[1]。 c.f.m FROM `kafka_table`;
- 测试结果
- 测试数据
Flink和Kafka网络连通,但Flink无法消费或者写入数据?
- 问题原因
如果Flink与Kafka之间存在代理或端口映射等转发机制,则Kafka客户端拉取Kafka服务端的网络地址为Kafka服务器本身的地址而非代理的地址。此时,虽然Flink与Kafka之间网络连通,但是Flink无法消费或者写入数据。
Flink和Kafka客户端(Flink Kafka Connector)之间建立连接分为两个步骤:- Kafka客户端拉取Kafka服务端(Kafka Broker)元信息,包括Kafka服务端所有Broker的网络地址。
- Flink使用Kafka客户端拉取下来的Kafka服务端网络地址来消费或者写入数据。
- 排查方法
通过以下步骤来确认Flink与Kafka之间是否存在代理或端口映射等转发机制:
- 使用ZooKeeper命令行工具(zkCli.sh或zookeeper-shell.sh)登录您Kafka使用的ZooKeeper集群。
- 根据您的集群实际情况执行正确的命令,来获取您的Kafka Broker元信息。
通常可以使用
get /brokers/ids/0
命令来获取Kafka Broker元信息。Kafka的连接地址位于endpoints
字段中。 - 使用
ping
或telnet
等命令来测试Endpoint中显示的地址与Flink的连通性。如果无法连通,则代表Flink与Kafka之间存在代理或端口映射等转发机制。
- 解决方案
- 不使用代理或端口映射等转发机制,直接打通Flink与Kafka之间的网络,使Flink能够直接连通Kafka元信息中显示的Endpoint。
- 联系Kafka运维人员,将转发地址作为Kafka Broker端的advertised.listeners,以使Kafka客户端拉取的Kafka服务端元信息包含转发地址。
说明 仅Kafka 0.10.2.0及以上版本支持将代理地址添加到Kafka Broker的Listener中。
如果您想了解更多的关于该问题的原理和解释,请参见KIP-103:区分内部与外部网络流量和Kafka网络连接问题详解。
为什么Kafka源表数据基于Event Time的窗口后,不能输出数据?
- 问题详情
Kafka作为源表,基于Event Time的窗口后,不能输出数据。
- 问题原因
Kafka某个分区没有数据,会影响Watermark的产生,从而导致Kafka源表数据基于Event Time的窗口后,不能输出数据。
- 解决方案
- 确保所有分区都存在数据。
- 开启源数据空闲监测功能。在目标作业详情页面右上角,单击编辑后,在页面右侧高级配置面板的更多Flink配置中,添加如下代码后保存生效。
table.exec.source.idle-timeout参数详情,请参见Configuration。table.exec.source.idle-timeout: 5
Flink中的Commit Offset有什么作用?
Flink在每次Checkpoint成功时,才会向Kafka提交当前读取Offset。如果未开启Checkpoint,或者Checkpoint设置的间隔过大,在Kafka端可能会查询不到当前读取的Offset。
为什么Flink和Kafka之间的网络是连通的,但是依然会有timeout expired while fetching topic metadata的报错?
- 使用zkCli.sh或者zookeeper-shell.sh工具登录Kafka使用的Zookeeper。
- 执行
ls /brokers/ids
命令列出所有的Kafka Broker ID。 - 使用
get /brokers/ids/{your_broker_id}
命令查看Broker metadata信息。Endpoint信息显示在listener_security_protocol_map中。
- 确认Flink是否可以连通该Endpoint。
如果该Endpoint中使用了域名,请为Flink配置对应的域名解析服务。
分裂或者缩容DataHub Topic后导致Flink作业失败,如何恢复?
如果分裂或者缩容了Flink正在读取的某个Topic,则会导致任务持续出错,无法自行恢复。该情况下需要重新启动(先停止再启动)来使任务恢复正常。
可以删除正在消费的DataHub Topic吗?
不支持删除或重建正在消费的DataHub Topic。
endPoint和tunnelEndpoint是指什么?如果配置错误会产生什么结果?
- 如果endPoint配置错误,则任务上线停滞在91%的进度。
- 如果tunnelEndpoint配置错误,则任务运行失败。
启动作业时出现Akka超时报错,但是全量MaxCompute源表和增量MaxCompute获取Metadata速率正常,应该如何处理?
请合并小文件,具体步骤请参见文档小文件优化及作业诊断常见问题。
全量MaxCompute和增量MaxCompute是如何读取MaxCompute数据的?
全量MaxCompute和增量MaxCompute是通过Tunnel读取MaxCompute数据的,读取速度受MaxCompute Tunnel带宽限制。
引用MaxCompute作为数据源,在作业启动后,向已有的分区或者表里追加数据,这些新数据是否能被全量MaxCompute或增量MaxCompute源表读取?
启动Flink作业后,如果正在被Source读取或已经被Source读取完成的表或分区有新的数据追加,则这部分数据不会被读取,而且可能导致作业Failover。
ODPS DOWNLOAD SESSION
读取表数据或者分区数据。新建DOWNLOAD SESSION
,服务端会创建一个Index文件,相当于创建DOWNLOAD SESSION
一瞬间数据的映射,后续的数据读取以这个映射为基础。因此在新建 DOWNLOAD SESSION
后,追加到MaxCompute表或者分区里的数据,正常流程下是不会被读取的。但如果MaxCompute源表中写入新数据,则会出现两种异常:
- 产生报错:如果Tunnel在读取数据的过程中写入新数据,则会产生报错
ErrorCode=TableModified,ErrorMessage=The specified table has been modified since the download initiated.
。 - 数据正确性无法保证:如果在Tunnel已经关闭后写入新数据,则这些数据不会被读取。但当作业发生Failover或者暂停后恢复作业,已经读过的数据可能会被重读,新写入的数据可能被读不全。
全量MaxCompute和增量MaxCompute源表作业是否支持暂停作业后修改并发数,再恢复作业?
MaxCompute源表暂时不支持暂停作业后修改并发数,再恢复作业。系统根据并发数,计算每个并发读取哪些分区的哪些数据。此外,每个并发会把消费情况记录至State,以便暂停恢复后或者Failover后,系统可以从上次读取的位置继续读取数据。
如果暂停作业后,修改了全量MaxCompute和增量MaxCompute源表的并发数后再恢复作业,会对作业产生无法预估的影响。因为已经读取的数据可能会被再次读取,没有读的数据可能会被遗漏。
作业启动位点设置了2019-10-11 00:00:00
, 为什么启动位点前的分区也会被全量MaxCompute源表读取?
- 分区表:读取当前所有分区。
- 非分区表:读取当前存在的数据。
增量MaxCompute源表监听到新分区时,如果该分区还有数据没有写完,如何处理?
报错:ErrorMessage=Authorization Failed [4019], You have NO privilege'ODPS:***'
不支持定义Watermark,那如何进行窗口聚合?
SELECT shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm'), COUNT(*), SUM(price) FROM order_mysql_cdc GROUP BY shop_id, DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm')
如何跳过Snapshot阶段,只从变更数据开始读取?
- never:在启动时,不读取数据库的快照,而是直接从变更的最开始位置读取。但需要注意MySQL的变更旧数据可能会被自动清理,因此不能保证变更数据中包含了全量的数据,读取的数据不完整。
- schema_only:如果你不需要保证数据的一致性,只关心作业启动后数据库的新增变更,则可以设置为schema_only,仅快照schema,不快照数据,从变更的最新数据开始读取。
如何读取一个分库分表的MySQL数据库?
如果MySQL是一个分库分表的数据库,分成了user_00、user_02和user_99等多个表,且所有表的schema一致。则可以通过table-name选项,指定一个正则表达式来匹配读取多张表,例如设置table-name为user_.*,监控所有user_前缀的表。database-name选项也支持该功能,但需要所有的表schema一致。
全表读取阶段效率慢、存在反压,应该如何解决?
- 增加并发数。
- 开启minibatch等聚合优化参数(下游聚合节点)。
如何判断CDC作业是否已完成全量数据同步?

- 当该指标为0时,则代表作业还在全量数据同步阶段。
- 当该指标大于0时,则代表作业完成了全量数据同步,进入了Binlog读取阶段。
报错:com.github.shyiko.mysql.binlog.network.ServerException
- 报错详情
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
- 报错原因
MySQL CDC在启动全量读取之前记录Binlog位点,等全量读取结束后再从Binlog位点读取增量数据。该报错一般是因为全量读取耗时太长,超过了MySQL Binlog的淘汰周期,导致MySQL Binlog位点的数据已经被 MySQL清理掉了。
- 解决方案
查看MySQL Binlog的清理规则,例如时间、存储空间和文件个数等,建议保留Binlog一天以上,RDS Binlog详情请参见删除本地日志(Binlog)。说明 :VVR 4.0.8及以上版本,MySQL CDC支持并发读取全量数据,可以提高全量数据的读取速度,针对该问题会起到缓解作用。
报错:The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'
- 报错详情
MySQL CDC源表在Flink 1.13版本语法检查时报错详情如下。
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true' at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186) at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ... 30 more
- 报错原因
在MySQL CDC DDL WITH参数中,您未设置主键(Primary Key)信息。因为Flink 1.13版本,新增支持按PK分片,进行多并发读取数据的功能。说明 Flink 1.12及以下版本,MySQL CDC源表仅支持单并发读取数据。
- 解决方案
- 如果在Flink 1.13版本,您需要多并发读取MySQL数据,则在DDL中添加PK信息。
- 如果在Flink 1.13版本,您不需要多并发读取MySQL数据,则在DDL中添加
scan.incremental.snapshot.enabled
参数,且把该参数值设置为false,无需设置PK信息。
Flink的结果数据写入RDS表,是按主键更新的,还是生成1条新的记录?
如果在DDL中定义了主键,会采用INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...)
ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;
的方式更新记录,即对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。如果DDL中没有声明PRIMARY KEY,则会用insert into
方式插入记录,追加数据。
使用RDS表中的唯一索引进行GROUP BY时需要注意什么?
- 需要在作业中的GROUP BY中声明该唯一索引。
- RDS中只有一个自增主键,Flink作业中不能声明为PRIMARY KEY。
为什么MySQL物理表(包含RDS MySQL和ADB)的INT UNSIGNED字段类型,在Flink SQL中要被声明为其他类型?
因为MySQL的JDBC Driver在获取数据时,由于精度问题,会采用不同的数据类型进行承接。具体说来,对于MySQL的INT UNSIGNED类型会使用LONG类型来承接数据,而对于MySQL的BIGINT UNSIGNED类型会使用BIGINTEGER类型来承接数据。
报错:Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1
- 报错详情
Caused by: java.sql.BatchUpdateException: Incorrect string value: '\xF0\x9F\x98\x80\xF0\x9F...' for column 'test' at row 1 at sun.reflect.GeneratedConstructorAccessor59.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.cj.util.Util.handleNewInstance(Util.java:192) at com.mysql.cj.util.Util.getInstance(Util.java:167) at com.mysql.cj.util.Util.getInstance(Util.java:174) at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224) at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755) at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426) at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeBatch(DruidPooledPreparedStatement.java:565) at com.alibaba.ververica.connectors.rds.sink.RdsOutputFormat.executeSql(RdsOutputFormat.java:488) ... 15 more
- 报错原因
数据中存在Emoji表情,导致数据库编码不能被正常解析。
- 解决方案
在使用JDBC连接MySQL数据库时,URL地址后面需要加上UTF-8,例如
jdbc:mysql://<内网地址>/<databaseName>?characterEncoding=UTF-8
。详情请参见Using Character Sets and Unicode。
ClickHouse结果表是否支持回撤更新数据?
当Flink结果表的DDL上指定了Primary Key时,则支持回撤更新数据。因为ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统,对UPDATE和DELETE的支持不够完善。如果在Flink DDL上指定了Primary Key,则尝试使用ALTER TABLE UPDATE和DELETE来更新和删除数据,因此会造成性能显著下降。
如何在控制台查看print数据结果?
- 在Flink开发控制台查看:
- 在Flink全托管开发控制台左侧导航栏,选择 。
- 单击目标作业名称。
- 单击作业探查。
- 在左侧运行日志页签,选择正在运行的作业实例。
- 在Task Managers页签,单击Path, ID。
- 单击日志,查看print数据结果。
- 跳转到Flink UI界面查看:
- 在在Flink全托管开发控制台左侧导航栏,选择 。
- 单击目标作业名称。
- 在目标作业作业总览页签,单击Flink UI。
- 单击Task Managers。
- 单击Path, ID。
- 在logs页签,查看print数据结果。
维表进行JOIN时,如果查询不到数据,应该如何处理?
检查DDL语句和物理表中的Schema类型和名称是否一致。
max_pt()和max_pt_with_done()的区别是什么?
.done
分区的分区。如果分区列表示例如下:
- ds=20190101
- ds=20190101.done
- ds=20190102
- ds=20190102.done
- ds=20190103
max_pt()
和max_pt_with_done()
的区别如下:
`partition`='max_pt_with_done()'
匹配的分区是ds=20190102
。`partition`='max_pt()'
,匹配的分区是ds=20190103
。