SQL常见问题

本文为您介绍实时计算Flink版的SQL常见问题,包括作业常见问题、开发报错、运维报错。

为什么使用POJO类作为UDTF返回类型时字段会出现“错位”?

  • 问题描述

    当使用POJO类作为UDTF返回类型,并在SQL中显式声明了UDTF返回列的别名列表(Alias Name)时,可能会出现字段错位(即使类型一致,但实际使用的字段可能与预期不符)问题。

    例如,如果使用如下POJO类作为UDTF的返回类型,并根据自定义函数开发的要求进行打包并完成函数注册(这里使用作业级自定义函数注册方式)后,SQL校验会失败。

    package com.aliyun.example;
    
    public class TestPojoWithoutConstructor {
    	public int c;
    	public String d;
    	public boolean a;
    	public String b;
    }
    package com.aliyun.example;
    
    import org.apache.flink.table.functions.TableFunction;
    
    public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> {
    	private static final long serialVersionUID = 1L;
    
    	public void eval(String str1, Integer i2) {
    		TestPojoWithoutConstructor p = new TestPojoWithoutConstructor();
    		p.d = str1 + "_d";
    		p.c = i2 + 2;
    		p.b = str1 + "_b";
    		collect(p);
    	}
    }
    CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor';
    
    CREATE TEMPORARY TABLE src ( 
      id STRING,
      cnt INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE sink ( 
      f1 INT,
      f2 STRING,
      f3 BOOLEAN,
      f4 STRING
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink
    SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);

    SQL校验报错信息如下:

    org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match.
    Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL.
    Hint: You will need to rewrite or cast the expression.
    
    Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING]
    Sink schema:  [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING]
    	at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)

    看起来从UDTF返回的字段和POJO类中的字段可能错位了,SQL中字段c最终是BOOLEAN,而字段a是INT类型,和POJO类的定义恰好相反。

  • 问题原因

    根据POJO类的类型规则:

    • 如果POJO类实现了有参构造函数,推导的返回类型会按构造函数的参数列表顺序。

    • 如果POJO类缺少有参构造函数,就会按字段名的字典序重排列。

    在上述示例中,由于UDTF返回类型缺少有参构造函数,因此对应的返回类型为BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)。虽然这一步并没有产生错误,但因为SQL中对返回字段加了重命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b),这导致对推导出的类型显式进行了重命名(基于字段位置进行映射),进而引发与POJO类中的字段错位问题,出现校验异常或非预期的数据错位问题。

  • 解决方案

    • POJO类缺少有参构造函数时,去掉对UDTF返回字段的显式重命名,如将上述SQL的INSERT语句改为:

      -- POJO类无有参构造函数时,推荐显式选择需要的字段名,使用 T.* 时需要明确知晓实际返回的字段顺序。
      SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
    • POJO类实现有参构造函数,以确定返回类型的字段顺序。这种情况下UDTF返回类型的字段顺序就是有参构造函数的参数顺序。

      package com.aliyun.example;
      
      public class TestPojoWithConstructor {
      	public int c;
      	public String d;
      	public boolean a;
      	public String b;
      
      	// Using specific fields order instead of alphabetical order
      	public TestPojoWithConstructor(int c, String d, boolean a, String b) {
      		this.c = c;
      		this.d = d;
      		this.a = a;
      		this.b = b;
      	}
      }

为什么数据在LocalGroupAggregate节点中长时间卡住,无输出?

  • 问题描述

    如果作业未设置table.exec.mini-batch.size或设置table.exec.mini-batch.size为负,并且作业还包含WindowAggregate和GroupAggregate,且WindowAggregate的时间列为事件时间(proctime),在这种情况下启动作业,在作业拓扑图中会看到包含LocalGroupAggregate节点,但缺失MiniBatchAssigner节点。

    image

    作业中同时包含WindowAggregate和GroupAggregate,且WindowAggregate的时间列为事件时间(proctime)的代码示例如下。

    CREATE TEMPORARY TABLE s1 (
      a INT,
      b INT,
      ts as PROCTIME(),
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector'='datagen',
      'rows-per-second'='1',
      'fields.b.kind'='random',
      'fields.b.min'='0',
      'fields.b.max'='10'
    );
    
    CREATE TEMPORARY TABLE sink (
      a BIGINT,
      b BIGINT
    ) WITH (
      'connector'='print'
    );
    
    CREATE TEMPORARY VIEW window_view AS
    SELECT window_start, window_end, a, sum(b) as b_sum FROM TABLE(TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '2' SECONDS)) GROUP BY window_start, window_end, a;
    
    INSERT INTO sink SELECT count(distinct a), b_sum FROM window_view GROUP BY b_sum;
  • 问题原因

    在table.exec.mini-batch.size未配置或者为负的情况下,MiniBatch处理模式会使用Managed Memory缓存数据,但是此时会错误地无法生成MiniBatchAssigner节点,因此计算节点无法收到MinibatchAssigner节点发送的Watermark消息后触发计算和输出,只能在三种条件(Managed Memory已满、进行Checkpoint前和作业停止时)之一下触发计算和输出,详情请参见table.exec.mini-batch.size。如果此时Checkpoint间隔设置过大,就会导致数据积攒在LocalGroupAggregate节点中,长时间无法输出。

  • 解决方案

    • 调小Checkpoint间隔,让LocalGroupAggregate节点在执行Checkpoint前自动触发输出。

    • 通过Heap Memory来缓存数据,让LocalGroupAggregate节点内缓存数据达到N条时自动触发输出。即在运维中心 > 作业运维页面的部署详情运行参数配置区域其他配置中,设置table.exec.mini-batch.size参数为正值N。

运行拓扑图中显示的Low Watermark、Watermark以及Task InputWatermark指标显示的时间和当前时间有时差?

  • 原因1:声明源表Watermark时使用了TIMESTAMP_LTZ(TIMESTAMP(p) WITH LOCAL TIME ZONE)类型,导致Watermark和当前时间有时差。

    下文以具体的示例为您展示使用TIMESTAMP_LTZ类型和TIMESTAMP类型对应的Watermark指标差异。

    • 源表中Watermark声明使用的字段是TIMESTAMP_LTZ类型。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts as CURRENT_TIMESTAMP,--使用CURRENT_TIMESTAMP内置函数生成TIMESTAMP_LTZ类型。
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND 
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE t1 (
        k INT,
        ts_ltz timestamp_ltz(3),
        cnt BIGINT
      ) WITH ('connector' = 'print');
      
      -- 输出计算结果。
      INSERT INTO t1
      SELECT b, window_start, COUNT(*) FROM
      TABLE(
          TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND))
      GROUP BY b, window_start, window_end;
      说明

      Legacy Window对应的老语法和TVF Window(Table-Valued Function)产生的结果是一致的。以下为Legacy Window对应的老语法的示例代码。

      SELECT b, TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) FROM s1 GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), b;

      在Flink开发控制台将作业部署上线运行后,以北京时间为例,可以观察到作业运行拓扑图及监控告警上显示的Watermark和当前时间存在8小时时差。

      • Watermark&Low Watermark

        image

      • Task InputWatermark

        image

    • 源表中Watermark声明使用的字段是TIMESTAMP(TIMESTAMP(p) WITHOUT TIME ZONE)类型。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        -- 模拟数据源中的TIMESTAMP无时区信息,从2024-01-31 01:00:00开始逐秒累加。
        ts as TIMESTAMPADD(SECOND, a, TIMESTAMP '2024-01-31 01:00:00'),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND 
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.a.kind'='sequence','fields.a.start'='0','fields.a.end'='100000',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE t1 (
        k INT,
        ts_ltz timestamp_ltz(3),
        cnt BIGINT
      ) WITH ('connector' = 'print');
      
      -- 输出计算结果。
      INSERT INTO t1
      SELECT b, window_start, COUNT(*) FROM
      TABLE(
          TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND))
      GROUP BY b, window_start, window_end;

      在Flink开发控制台上将作业部署上线运行后,可以观察到作业运行拓扑图及监控告警上显示的Watermark和当前时间是同步的(本示例是与模拟数据的时间同步的),不存在时差现象。

      • Watermark&Low Watermark

        image

      • Task InputWatermark

        image

  • 原因2:Flink开发控制台和Apache Flink UI的展示时间存在时区差异。

    Flink开发控制台UI界面是以UTC+0显示时间,而Apache Flink UI是通过浏览器获取本地时区并进行相应的时间转换后的本地时间。以北京时间为例,为您展示二者显示区别,您会观察到在Flink开发控制台显示的时间比Apache Flink UI时间慢8小时。

    • Flink开发控制台

      image

    • Apache Flink UI

      image

报错:undefined

报错:Object '****' not found

  • 报错详情

    单击深度检查后,出现报错详情如下。报错详情

  • 报错原因

    在DDL和DML同在一个文本中提交运行时,DDL没有声明为CREATE TEMPORARY TABLE。

  • 解决方案

    在DDL和DML同在一个文本中提交运行时,DDL需要声明为CREATE TEMPORARY TABLE,而不是声明为CREATE TABLE。

报错:Only a single 'INSERT INTO' is supported

  • 报错详情

    单击深度检查后,验证报错详情如下。报错详情

  • 报错原因

    多个DML语句没有写在关键语句BEGIN STATEMENT SET;END;之间。

  • 解决方案

    将多个DML语句写在BEGIN STATEMENT SET;END;之间。详情请参见INSERT INTO语句

报错:The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

  • 报错详情

    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • 报错原因

    实时计算引擎vvr-3.0.7-flink-1.12及以前的版本,CDC Source只能单并发运行。但在实时计算引擎vvr-4.0.8-flink-1.13版本后增加了按PK分片进行多并发读取数据的功能并默认打开该功能(scan.incremental.snapshot.enabled默认设置为true),在该功能下必须要配置主键。

  • 解决方案

    如果您使用实时计算引擎vvr-4.0.8-flink-1.13及以后的版本,则可以根据需求来选择解决方案:

    • 如果您需要多并发读取MySQL CDC的数据,则在DDL中必须配置主键(PK)。

    • 如果您不需要多并发读取MySQL CDC的数据,需要将scan.incremental.snapshot.enabled设置为false,参数配置详情请参见WITH参数

报错:exceeded quota: resourcequota

  • 报错详情

    作业启动过程中报错。报错

  • 报错原因

    当前项目空间资源不足导致作业启动失败。

  • 解决方案

    您需要对项目资源进行资源变配,详情请参见资源调整

报错:Exceeded checkpoint tolerable failure threshold

  • 报错详情

    作业运行过程中报错。

    org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold.
      at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
  • 报错原因

    未设置任务允许Checkpoint失败的次数,系统默认Checkpoint失败一次就触发一次Failover。

  • 解决方案

    1. 运维中心 > 作业运维页面,单击目标作业名称。

    2. 部署详情页签,单击运行参数配置区域右侧的编辑

    3. 其他配置文本框,输入如下参数。

      execution.checkpointing.tolerable-failed-checkpoints: num

      您需要设置num值来调整任务允许Checkpoint失败的次数。num需要为0或正整数。如果num为0时,则表示不允许存在任何Checkpoint异常或者失败。

报错:Flink version null is not configured for sql

  • 报错详情

    StatusRuntimeException: INTERNAL: Flink version null is not configured for sql.
  • 报错原因

    系统升级至VVR 4.0.8,导致作业的Flink计算引擎版本信息没有了。

  • 解决方案

    在作业开发页面右侧更多配置页签中,配置正确的Flink计算引擎版本。引擎版本

    说明

    如果您需要使用调试功能,则还需要检查Session集群页面的引擎版本是否选择正确。

INFO:org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss

  • 报错详情报错详情

  • 报错原因

    OSS每次创建新目录时,会先检查是否存在该目录,如果不存在,就会报这个INFO信息,但该INFO信息不影响Flink作业运行。

  • 解决方案

    在日志模板中添加<Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>。详情请参见配置作业日志输出

报错:DateTimeParseException: Text 'xxx' could not be parsed

  • 报错详情

    作业运行过程中,会出现报错DateTimeParseException: Text 'xxx' could not be parsed

  • 报错原因

    VVR 4.0.13以下版本,您在DDL中声明的日期格式和实际数据的格式不一致,Flink系统会直接报错。

  • 解决方案

    VVR 4.0.13及以上版本对JSON格式(Json、Canal Json、Debezium Json、Maxwell Json和Ogg Json)中TIMESTAMP类型的数据解析进行了增强,提升数据解析的能力。数据解析增强详情如下:

    • 支持声明的TIMESTAMP类型解析DATE格式数据。

    • 支持声明的TIMESTAMP_LTZ类型解析DATE或TIMESTAMP格式数据。

      Flink系统根据您设置的table.local-time-zone的时区信息来转换TIMESTAMP数据至TIMESTAMP_LTZ。例如,在DDL中声明如下信息。

      CREATE TABLE source (
        date_field TIMESTAMP,
        timestamp_field TIMESTAMP_LTZ(3)
      ) WITH (
        'format' = 'json',
        ...
      );

      当解析数据 {"date_field": "2020-09-12", "timestamp_field": "2020-09-12T12:00:00"} ,且在当前时区为东八区的情况下,会得到如下结果:"+I(2020-09-12T00:00:00, 2020-09-12T04:00:00.000Z)"。

    • 支持自动解析TIMESTAMP或TIMESTAMP_LTZ格式。

      增强前,JSON Format在解析TIMESTAMP数据时,需要您正确设置timestamp-format.standard为SQL或ISO-8601,数据才能被正确解析。增强后,Flink系统会自动推导TIMESTAMP的格式并解析,如果无法正确解析,则会报告错误。您手动设置的timestamp-format.standard的值会作为提示供解析器使用。

报错:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'

  • 报错详情

    Cause by:java.sql.SQLSyntaxErrorException:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        ...
  • 报错原因

    MySQL的CDC流结合where条件过滤使用时,update类型的数据会发送update_before和update_after两条数据到下游,update_before数据到下游会被识别为DELETE操作,需要用户具有DELETE权限。

  • 解决方案

    检查SQL逻辑是否存在retract相关操作,如果存在相关操作,给结果表的操作用户赋予DELETE权限。

报错:java.io.EOFException: SSL peer shut down incorrectly

  • 报错详情

    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • 报错原因

    在MySQL driver版本为8.0.27时,MySQL数据库开启了SSL协议,但默认访问方式不通过SSL连接数据库,导致报错。

  • 解决方案

    建议WITH参数中connector设置为rds,且MySQL维表URL参数中追加characterEncoding=utf-8&useSSL=false,例如:

    'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

报错:binlog probably contains events generated with statement or mixed based replication format

  • 报错详情

    Caused by: io.debezium.DebeziumException: Received DML 'insert into table_name (...) values (...)' for processing, 
    binlog probably contains events generated with statement or mixed based replication format
  • 报错原因

    MySQL CDC Binlog不可以为mixed格式,只能为ROW格式。

  • 解决方案

    1. 在MySQL产品侧,通过show variables like "binlog_format"命令查看当前模式的Binlog格式。

      说明

      您可以使用show global variables like "binlog_format"查看全局模式的Binlog格式。

    2. 在MySQL产品侧,将Binlog格式设置为ROW格式。

    3. 重启作业生效。

报错:java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

  • 报错详情

    Causedby:java.lang.ClassCastException:org.codehaus.janino.CompilerFactorycannotbecasttoorg.codehaus.commons.compiler.ICompilerFactory
        atorg.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
        atorg.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
        atorg.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)
        ...66more
  • 报错原因

    • JAR包中引入了会发生冲突的janino依赖。

    • UDF JAR或连接器JAR中,误打入Flink中的某些依赖(例如flink-table-planner和flink-table-runtime)。

  • 解决方案

    分析JAR包里面是否含有org.codehaus.janino.CompilerFactory。因为在不同机器上的Class加载顺序不一样,所以有时候出现类冲突。该问题的解决步骤如下:

    1. 运维中心 > 作业运维页面,单击目标作业名称。

    2. 部署详情页签,单击运行参数配置区域右侧的编辑

    3. 其他配置文本框,输入如下参数。

      classloader.parent-first-patterns.additional: org.codehaus.janino

      其中,参数的value值需要替换为冲突的类。