本文为您介绍开发及调试有关的常见问题。
在DDL和DML同在一个文本中提交运行时,DDL需要怎么声明?
在DDL和DML同在一个文本中提交运行时,DDL需要声明为CREATE TEMPORARY TABLE,而不是声明为CREATE TABLE。否则单击深度检查后,出现报错详情如下。
CREATE TABLE datagen_source (a bigint, b int, c varchar)
WITH ('connector' = 'datagen');
CREATE TABLE print_sink(C bigint, var1 int)
WITH ('connector' = 'print','logger' = 'true');
INSERT INTO print_sink SELECT a,8 FROM datagen_source;
报错信息:
org.apache.flink.table.gateway.api.vvr.utils.SqlValidationException: A sequence of multiple statements to execute is supported if the last statement is a 'SELECT' statement or 'INSERT INTO' statement or 'CREATE TABLE IF NOT EXISTS ... AS TABLE' statement ...BASE IF NOT EXISTS ... AS DATABASE' statement or 'AUTO OPTIMIZE TABLE|DATABASE' statements or multiple 'INSERT INTO' or 'CREATE TABLE IF NOT EXISTS ... AS TAB ...ATABASE IF NOT EXISTS ... AS DATABASE' statements wrapped in a 'BEGIN STATEMENT SET' block and all other statements are CREATE TEMPORARY TABLE|VIEW|[SYSTEM] FUNCTION, 'SHOW', DESCRIBE, 'USE' statements.
at org.apache.flink.table.gateway.vvr.service.utils.SqlValidateUtils.validateDraft(SqlValidateUtils.java:107)
at org.apache.flink.table.gateway.vvr.service.command.DraftCommand.getDraftType(DraftCommand.java:120)
at org.apache.flink.table.gateway.vvr.service.command.DraftCommand.executeInternal(DraftCommand.java:71)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
多个INSERT INTO语句需要怎么写?
将多个INSERT INTO语句写在BEGIN STATEMENT SET;和END;之间组成一个逻辑单元。详情请参见INSERT INTO语句。否则单击深度检查后,验证报错详情如下。
CREATE TEMPORARY TABLE datagen_source (a bigint, b int, c varchar)
WITH ('connector' = 'datagen');
CREATE TEMPORARY TABLE print_sink(C bigint, var1 int)
WITH ('connector' = 'print','logger' = 'true');
CREATE TEMPORARY TABLE print_sink2(C bigint, var2 int)
WITH ('connector' = 'print','logger' = 'true');
INSERT INTO print_sink SELECT a,B FROM datagen_source;
INSERT INTO print_sink2 SELECT a,B FROM datagen_source;
解析: xxx test2
深度检查
SQL 语法正确性以及网络连通性检查未通过
org.apache.flink.table.gateway.api.vvr.utils.SqlValidationException: A sequence of multiple statements to execute is supported if the last statement is a 'SELECT' statement or 'INSERT INTO' statement or 'CREATE TABLE IF NOT EXISTS ... AS TABLE' statement or 'CREATE DATABASE IF NOT EXISTS ... AS DATABASE' statement or 'AUTO OPTIMIZE TABLE|DATABASE' statements or multiple 'INSERT INTO' or 'CREATE TABLE IF NOT EXISTS ... AS TABLE' or 'CREATE DATABASE IF NOT EXISTS ... AS DATABASE' statements wrapped in a 'BEGIN STATEMENT SET' block and all other statements are CREATE TEMPORARY TABLE|VIEW|[SYSTEM] FUNCTION, 'SHOW', DESCRIBE, 'USE' statements.
at org.apache.flink.table.gateway.vvr.service.utils.SqlValidateUtils.validateDraft(SqlValidateUtils.java:79)
at org.apache.flink.table.gateway.vvr.service.command.DraftCommand.getDraftType(DraftCommand.java:120)
at org.apache.flink.table.gateway.vvr.service.command.DraftCommand.executeInternal(DraftCommand.java:71)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.flink.table.gateway.service.context.SqlGatewaySecurityContext.runSecured(SqlGatewaySecurityContext.java:73)
at org.apache.flink.table.gateway.vvr.service.command.AbstractCommand.wrapClassLoader(AbstractCommand.java:171)
at org.apache.flink.table.gateway.vvr.service.command.AbstractCommand.execute(AbstractCommand.java:163)
at org.apache.flink.table.gateway.vvr.service.command.CommandManager.lambda$execute$0(CommandManager.java:71)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
使用Entry point Main Arguments传参数时,需要传特殊字符,应该如何处理?
-
问题原因
使用Entry Point Main Arguments传参时,需要传特殊字符时,例如#$,使用反斜线(\)转义也无法识别,特殊字符出现会被丢弃。
-
解决方案
在作业运维页面单击目标作业名称,在运行参数配置区域其他配置中添加参数
env.java.opts: -Dconfig.disable-inline-comment=true,具体操作请参见如何配置自定义的作业运行参数?。
为什么相同UDF JAR包在经过多次修改后上传失败?
-
问题原因
因为UDF里面限制了JAR包之间的类名不能重复。
-
解决方案
-
删除后重新上传。
-
在附加依赖文件中上传JAR包,并在代码中使用临时函数。临时函数使用方式详情请参见注册UDF。示例如下。
CREATE TEMPORARY FUNCTION `cp_record_reduce` AS 'com.taobao.test.udf.blink.CPRecordReduceUDF';附加依赖文件位于右侧更多配置面板中,需上传 UDF 对应 JAR 包的 OSS 地址。
-
为什么使用POJO类作为UDTF返回类型时字段会出现“错位”?
-
问题详情
当使用POJO类作为UDTF返回类型,并在SQL中显式声明了UDTF返回列的别名列表(Alias Name)时,可能会出现字段错位(即使类型一致,但实际使用的字段可能与预期不符)问题。
例如,如果使用如下POJO类作为UDTF的返回类型,并根据自定义函数开发的要求进行打包并完成函数注册(这里使用作业级自定义函数注册方式)后,SQL校验会失败。
package com.aliyun.example; public class TestPojoWithoutConstructor { public int c; public String d; public boolean a; public String b; }package com.aliyun.example; import org.apache.flink.table.functions.TableFunction; public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> { private static final long serialVersionUID = 1L; public void eval(String str1, Integer i2) { TestPojoWithoutConstructor p = new TestPojoWithoutConstructor(); p.d = str1 + "_d"; p.c = i2 + 2; p.b = str1 + "_b"; collect(p); } }CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor'; CREATE TEMPORARY TABLE src ( id STRING, cnt INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE sink ( f1 INT, f2 STRING, f3 BOOLEAN, f4 STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);SQL校验报错信息如下:
org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match. Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL. Hint: You will need to rewrite or cast the expression. Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING] Sink schema: [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING] at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)看起来从UDTF返回的字段和POJO类中的字段可能错位了,SQL中字段c最终是BOOLEAN,而字段a是INT类型,和POJO类的定义恰好相反。
-
问题原因
根据POJO类的类型规则:
-
如果POJO类实现了有参构造函数,推导的返回类型会按构造函数的参数列表顺序。
-
如果POJO类缺少有参构造函数,就会按字段名的字典序重新排列。
在上述示例中,由于UDTF返回类型缺少有参构造函数,因此对应的返回类型为
BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)。虽然这一步并没有产生错误,但因为SQL中对返回字段加了重命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b),这导致对推导出的类型显式进行了重命名(基于字段位置进行映射),进而引发与POJO类中的字段错位问题,出现校验异常或非预期的数据错位问题。 -
-
解决方案
-
POJO类缺少有参构造函数时,去掉对UDTF返回字段的显式重命名,如将上述SQL的INSERT语句改为:
-- POJO类无有参构造函数时,推荐显式选择需要的字段名,使用 T.* 时需要明确知晓实际返回的字段顺序。 SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T; -
POJO类实现有参构造函数,以确定返回类型的字段顺序。这种情况下UDTF返回类型的字段顺序就是有参构造函数的参数顺序。
package com.aliyun.example; public class TestPojoWithConstructor { public int c; public String d; public boolean a; public String b; // Using specific fields order instead of alphabetical order public TestPojoWithConstructor(int c, String d, boolean a, String b) { this.c = c; this.d = d; this.a = a; this.b = b; } }
-
如何解决Flink依赖冲突问题?
-
问题现象
-
有明显报错,且引发错误的为Flink或Hadoop相关类。
java.lang.AbstractMethodError java.lang.ClassNotFoundException java.lang.IllegalAccessError java.lang.IllegalAccessException java.lang.InstantiationError java.lang.InstantiationException java.lang.InvocationTargetException java.lang.NoClassDefFoundError java.lang.NoSuchFieldError java.lang.NoSuchFieldException java.lang.NoSuchMethodError java.lang.NoSuchMethodException -
无明显报错,但会引起一些不符合预期的现象,例如:
-
日志不输出或log4j配置不生效。
该类问题通常是由于依赖中携带了log4j相关配置导致的。需要排查作业JAR包中是否引入了log4j配置的依赖,可以通过在dependency中配置exclusions的方式去掉log4j配置。
说明如果必须要使用不同版本的log4j,需要使用maven-shade-plugin将log4j相关的类进行relocation。
-
RPC调用异常。
Flink的Akka RPC调用出现依赖冲突可能导致的异常,默认不会显示在日志中,需要开启Debug日志进行确认。
例如,Debug日志中出现
Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx},但是JM日志在Registering TaskManager with ResourceID xxx后,没有下文,直到资源请求超时报错NoResourceAvailableException。此外TM持续报错Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}。原因:开启Debug日志后,发现RPC调用报错
InvocationTargetException,该报错导致TM Slot分配到一半失败出现状态不一致,RM持续尝试分配Slot失败无法恢复。
-
-
-
问题原因
-
作业JAR包中包含了不必要的依赖(例如基本配置、Flink、Hadoop和log4j依赖),造成依赖冲突从而引发各种问题。
-
作业需要的Connector对应的依赖未被打入JAR包中。
-
-
排查方法
-
查看作业pom.xml文件,判断是否存在不必要的依赖。
-
通过
jar tf foo.jar命令查看作业JAR包内容,判断是否存在引发依赖冲突的内容。 -
通过
mvn dependency:tree命令查看作业的依赖关系,判断是否存在冲突的依赖。
-
-
解决方案
-
基本配置建议将scope全部设置为provided,即不打入作业JAR包。
-
DataStream Java
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> -
DataStream Scala
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> -
DataSet Java
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> -
DataSet Scala
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
-
添加作业需要的Connector对应的依赖,并将scope全部设置为compile(默认的scope是compile),即打入作业JAR包中,以Kafka Connector为例,代码如下。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> -
其他Flink、Hadoop和log4j依赖不建议添加。但是:
-
如果作业本身存在基本配置或Connector相关的直接依赖,建议将scope设置为provided,示例如下。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency> -
如果作业存在基本配置或Connector相关的间接依赖,建议通过exclusion将依赖去掉,示例如下。
<dependency> <groupId>foo</groupId> <artifactId>bar</artifactId> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> </dependency>
-
-
报错:Could not parse type at position 50: expected but was . Input type string: ROW
-
报错详情
在SQL编辑器中编写SQL时,使用UDTF出现语法检查错误(红色波浪线)。
Caused by: org.apache.flink.table.api.ValidationException: Could not parse type at position 50: <IDENTIFIER> expected but was <KEYWORD>. Input type string: ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>代码如下:
@FunctionHint( //input = @DataTypeHint("BYTES"), output = @DataTypeHint("ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>")) public class PointChangeMetaQPaser1 extends TableFunction<Row> { Logger logger = LoggerFactory.getLogger(this.getClass().getName()); public void eval(byte[] bytes) { try { String messageBody = new String(bytes, "UTF-8"); Map<String, String> resultDO = JSON.parseObject(messageBody, Map.class); logger.info("PointChangeMetaQPaser1 logger:" + JSON.toJSONString(resultDO)); collect(Row.of( getString(resultDO.get("resultId")), getString(resultDO.get("pointRange")), getString(resultDO.get("from")), getString(resultDO.get("to")), getString(resultDO.get("type")), getString(resultDO.get("pointScope")), getString(resultDO.get("userId")), getString(resultDO.get("point")), getString(resultDO.getOrDefault("triggerSource", "NULL")), getString(resultDO.getOrDefault("time", String.valueOf(System.currentTimeMillis()))), getString(resultDO.getOrDefault("uuid", String.valueOf(UUID.randomUUID()))) )); } catch (Exception e) { logger.error("PointChangeMetaQPaser1 error", e); } } private String getString(Object o) { if (o == null) { return null; } return String.valueOf(o); } } -
报错原因
当使用DataTypeHint定义函数的数据类型时,系统保留的关键字被直接作为了字段名称。
-
解决方案
-
将变量名换成非关键字的名称,例如to换成fto,from换成ffrom等。
-
将已经用关键字取名的变量名加上反撇号(``)。
-
写入表时报错:“Invalid primary key. Column 'xxx' is nullable.”
-
报错原因
这是 Flink 对主键语义的强制校验。Flink 要求:所有主键列必须显式声明为
NOT NULL。即使数据中没有 NULL 值,只要建表语句中主键列允许 NULL(如INT NULL),Flink 就会在写入前拒绝操作。这不是运行时错误,而是 DDL 解析阶段的语义检查。 -
解决方案
将报错中涉及的主键列声明为
NOT NULL后重新建表。
JSON 文件下载时在浏览器中打开而非直接下载
-
问题现象
在文件管理界面点击下载 JSON 文件时,浏览器未触发下载,而是新开标签页直接展示 JSON 内容。
-
问题原因
OSS 中该 JSON 文件缺少
Content-Disposition: attachmentHTTP 响应头,导致浏览器按默认行为将其作为可渲染内容直接显示。 -
解决方案
-
方案一:重新上传文件
该问题已在平台 4.5.0 版本修复,但仅对 2025 年 5 月之后上传的文件生效。此前上传的文件仍需手动处理。
-
方案二:修改 OSS 对象元数据
手动为对应 Object 添加如下 HTTP 标准属性:
-
Header 名称:Content-Disposition
-
Header 值:attachment
详情请参见管理文件元数据。
-
-