文档

Dataphin Flink SQL词法提效

更新时间:

Dataphin在Flink SQL基础上遵循ANSI-SQL标准进行了部分语法改进,以帮助您提升开发效率。本文将为说明Dataphin Flink SQL支持的词法新特性及使用方法。

支持跨项目空间引用

DDL与DML表名语法增强

通常一个标准的Flink SQL任务需要完整DDL与DML语句。示例如下:

--DDL
CREATE TABLE input_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'datagen'
  );
CREATE TABLE output_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'print'
  );
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;

以上语句的DDL在多个任务中需要重复定义。为避免重复声明,提高开发效率。Dataphin对FlinkSQL的表名做了语法扩展增强。在Dataphin中定义元表(schema)后, 可以在任何任务中通过以下语法直接引用, 同时在多个任务中无需再次声明, 并且支持跨项目空间引用。

<compoundTableName> ::= [projectName <DOT>] tableName

示例说明

以上一段SQL为例,定义好元表后,研发任务中只需要输入以下语句。

INSERT INTO output_table -- 项目前缀缺省时默认是当前所在项目。
SELECT field1, field2 FROM ${project_name}.input_table;

内置函数与UDX语法增强

通常一个标准的Flink SQL任务使用UDX时,需要进行声明。示例如下:

CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS 
SELECT MY_UDX(args)
FROM ...

以上自定义UDX语句在多个任务中需要重复定义。为避免重复声明,提高开发效率。Dataphin引入资源概念,您可以在Dataphin注册函数后,在任何任务中通过以下语法直接引用,同时在多个任务中无需再次声明,并且支持跨项目空间引用。

<compoundFunctionName> ::= [projectName <COLON>] functionName

示例说明

以上一段SQL为例,在Dataphin注册函数后,研发任务中只需要输入以下语句。

CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- 项目前缀缺省时默认是当前所在项目FROM。
FROM ...

特殊说明

Dataphin支持overload或override内置函数,且优先使用overload或override的内置函数。示例如下:

  • 示例一: 有项目前缀。

SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin将默认使用已overload了的内置函数`SUBSTRING`, 若未在`${project_name}`下找到该自定义函数, 在预编译时将无法通过。
  • 示例二: 无项目前缀。

SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin将优先在当前项目内寻找自定义的`SUBSTRING`函数, 若找不到, 则认为是内置函数。

使用UpdateStatement及TableHints设置任务级别DDL参数

在Dataphin中支持对已经创建的元表进行DDL的WITH参数配置,同时Dataphin支持配置任务粒度的参数,包括Watermark、计算列、主键、Header等,  且增加了SetOption语法支持。语法规范如下:

SET {
	[ projectName <DOT>] tableName <DOT> 
		{ 
			propertyName
			| WATERMARK
			| <computedColumn> 
			| <primaryKey> 
			| <procTime> 
			| {fieldName <DOT> <isHeader>} 
		}
} 	<EQ> {
		@variable = identifier
		| @variable = literal
		| @variable = expression
}

设置WITH参数时,建议将表DDL的公用WITH参数在元表中进行定义, 任务粒度的个性化参数可以通过SET语句或TableHints在代码中定义, 多个SET语句间以;为分隔符。

重要

使用Flink SQL保留字时需要加(``)符号示例如下:

  • Watermark:SET语句使用SET [project.]table.`watermark` = ...

  • Partition:SET语句使用SET [project.]table.`partition` = ...

DDL参数优先级

任务级别的参数覆盖

任务级别的参数设置会覆盖元表中配置的参数设置(仅针对可覆盖的参数)。例如,元表中设置表Tprimary keyfield1 , 任务中使用表T后, 如果存在任何primary key设置,都以任务中的设置为准 (即任务中设置表T的primary keyfield2, 则field1在该任务中不再是表T的主键)。

重要

基于Dataphin的安全和管理策略,元表上的部分参数不允许在SQL任务中通过SET覆盖。例如MaxCompute元表上定义的表名,不能在SQL任务中通过SET语句覆盖。不同类型的元表有不同的可覆盖参数,具体以实际操作为准。

任务内SET参数

任务内SET参数具有全局优先级, TableHints的作用范围是Context-specific, SET参数会补充到TableHints中不存在的参数项,相同参数项会被Shade。

例如Blink在2.2.5版本后, 增加了对维表设置INDEXUNIQUE INDEX的支持, 所以目前声明JOIN KEY的关键字时存在PIRMARY KEYINDEXUNIQUE INDEX三种,故Dataphin会联合做判断。示例如下:

SET my_dim_table.primarykey = 'field1';

CREATE VIEW tmp1 AS 
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

CREATE VIEW tmp2 AS 
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

以上代码Dataphin将会理解为在生成tmp1时设置my_dim_tableprimary keyfield1(参数填充),indexfield2;在生成tmp2时设置my_dim_tableprimary keyfield3(参数Shade)。

更多示例说明:

  • 示例一:Flink SQL Source表参数。

    以创建kafka源表为例。

    -- vvr语法
    CREATE TEMPORARY TABLE kafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `category_id` BIGINT,
        `behavior` STRING,
        `topic` STRING METADATA VIRTUAL,
        `partition` BIGINT METADATA VIRTUAL
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'my_excellent_topic',
        'properties.bootstrap.servers' = 'mykafka:9092',
        'properties.group.id' = 'my_excellent_group'
        'format' = 'csv',
        'scan.startup.mode' = 'earliest-offset'
    )

    个性化参数转换成Dataphin Flink SQL的SET表达式。

    -- 如果表属性的名字是sql关键字,或者出现了其他字符,可以用反引号转义。
    SET kafkaTable.`properties.group.id` = 'new_group_id';
    SET kafkaTable.`scan.startup.mode` = 'latest-offset';
  • 示例二:Flink SQL Sink表参数。

    以创建aliHBase结果表为例。

    create table hbase_output(
      rk  varchar,
      rk1 varchar,
      rk2 varchar,
      f bigint,
      PRIMARY KEY(rk)
    ) with (
      type='alihbase',
      diamondKey='xxxxxxx',
      diamondGroup='yyyyyyy',
      columnFamily='cf',
      tableName='blink_hbase_test',
      bufferSize=500;
    );

    若需要在任务粒度中Overwrite掉元表中的配置, 可做如下设置。

    SET hbase_output.bufferSize = 1000;
  • 示例三:Flink SQL Dim表参数。

    以创建MaxCompute维表为例。

     CREATE TABLE white_list (
      id varchar,
      name varchar,
      age int,
      PRIMARY KEY (id)
      --PERIOD FOR SYSTEM_TIME -- 备注: 从blink3.x开始维表DDL不需要PERIOD FOR SYSTEM_TIME标识。
    ) with (
      type = 'odps',
      endPoint = 'your_end_point_name',
      project = 'your_project_name',
      tableName = 'your_table_name',
      accessId = 'your_access_id',
      accessKey = 'your_access_key',
      `partition` = 'ds=20180905',
      cache = 'ALL'
    );

    转换成Dataphin Flink SQL的SET表达式。

    SET white_list.cache='ALL';
    SET white_list.cacheTTLMs=86400000;
    --若任务粒度想特别设置该表的cache更新时间, 可在此增加SET。
  • 示例四:WATERMARK、计算列、HEADER、PROCTIME、PRIMARY KEY/(UNIQUE)INDEX设置。

    目前元表中只支持主键(Primary Key)和Header设置, 若需要使用WATERMARK、PROCTIME、计算列等,需要在任务粒度进行设置。语法规范如下:

    • 设置WATERMARK。

      语法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> WATERMARK
      } 	<EQ> {
      		WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
      }

      下面以标准的Flink SQL任务DDL WATERMAKR为例。

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts TIMESTAMP,
      		WATERMARK FOR ts AS withOffset(ts, 1000)
      	  ) with (
      		type = 'sls',
        ...);

      转换为Dataphin Flink SQL后WATERMARK设置方式。

      3.7 及以上版本,需要带上单引号。
      SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)';
      3.6 版本
      SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);
    • 设置计算列。

      语法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> <computedColumn>
      } 	<EQ> {
      		column_name AS computed_column_expression
          |
          {
          	<LPAREN>
          		column_name AS computed_column_expression
          		(<COMMA> column_name AS computed_column_expression)*
            <RPAREN>  
          }
      }

      下面以标准的Flink SQL任务DDL计算列为例。

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts AS to_timestamp(c, 'yyyyMMddHHmmss')
      	  ) with (
      		type = 'sls',
        ...);

      转换为Dataphin Flink SQL后计算列设置方式。

      • 单个计算列方式。

        SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');
      • 多个计算列方式。

        SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
    • 设置PROCTIME。

      语法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> <procTime>
      } 	<EQ> {
      		columnName AS PROCTIME()
      }

      转换为Dataphin Flink SQL后PROCTIME设置方式。

      SET sls_stream.procTime= d AS PROCTIME();
    • 设置PRIMARYKEY、INDEX、UNIQUE INDEX。

      重要

      Blink 3.6.0及以上版本, TableHints采用社区的Hints方式。Ververica Flink和开源Flink不支持INDEX和UNIQUE INDEX。

      语法如下:

      query :
            select /*+ hint_content */
            ...
            from
                table_name1 /*+ hint_content */
                join 
                table_name2 /*+ hint_content */
            ...
      hint_content :
             hint_item[, hint_item]*
      hint_item :
             hint_name
         |   hint_name(k1=v1 [ , k2=v2 ]*)
         |   hint_name(hint_opt [ ,hint_opt ]*)
      k :
            simple_identifier
      v :
            string_literal
      hint_opt :
            simple_identifier
         |  numeric_literal
         |  string_literal

      下面以标准的Flink SQL任务TableHints为例。

      INSERT INTO table
      SELECT source_table.*
      FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;

      流表JOIN时Flink允许用户根据自身情况声明JOIN KEY的关键字,SET具有全局优先级,TableHints为context-specific

      SET dim_table.index = 'field1'; -- set pk和 set unique index 以此类推
      
      -- 等价于dim_table的DDL 声明INDEX (field1)
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 等价于dim_table的DDL 声明INDEX (field2)。
      INSERT INTO table2
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      
      -- 等价于dim_table的DDL 声明PRIMARY KEY (field1), INDEX (field2)。
      INSERT INTO table3
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 等价于dim_table的DDL 声明PRIMARY KEY (field1), INDEX (field1)。
      INSERT INTO table4
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 等价于dim_table的DDL 声明PRIMARY KEY (field1), UNIQUE INDEX (field2, field3)。
      INSERT INTO table5
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      重要

      以上的join with语法,为Blink引擎早期使用方式,新的Blink、Ververica Flink、开源Flink请使用TableHint。

      Tablehint示例如下:

      -- 指定dim_table的主键是id字段 (blink、vvr、apache-flink都支持)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 指定dim_table的维表主键是id(仅blink支持)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- 指定dim_table的维表唯一主键是id(仅blink支持)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
    • 设置HEADER。

      语法如下:

      SET {
      	[projectName <DOT>] tableName <DOT> <isHeader>
      } 	<EQ> {
      		TRUE
      }

      示例如下:

      -- blink demo
      create table kafka_table (
          `message` VARBINARY,
          topic varchar HEADER
      ) WITH (
          type = 'kafka010',
          topic = 'test_kafka_topic',
          `group.id` = 'test_kafka_consumer_group',
          bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3'
      );
      
      -- vvr demo
      create table kafka_table (
          `message` VARBINARY,
          topic varchar METADATA VIRTUAL
      ) WITH (
          'connector' = 'kafka',
          'topic' = 'my_excellent_topic',
          'properties.bootstrap.servers' = 'mykafka:9092',
          'properties.group.id' = 'my_excellent_group'
          'format' = 'csv',
          'scan.startup.mode' = 'earliest-offset'
      );

      转换为Dataphin SET支持以下两种语法。

      SET kafka_table.topic.isHeader = true; --不加单引号。
      
      SET kafka_table.topic.isHeader = 'true'; --加单引号。
  • 示例五:TimeTunnel自动订阅功能(TimeTunnel作为输入)。

    示例代码如下:

    set xx_project.xx_table.accessId='xxxxxx';
    set xx_project.xx_table.accessKey='xxxxxx';
    • 超级管理员账号及无权限账号不可自动订阅,必须在SQL中配置。

    • 其他个人账号可配置,也可以不配置如上SET信息。通常情况下,在无法获取到JobName时会用默认的JobName名default去记录存档订阅的Topic信息,因此对于不同的Job,同一Topic在单独的预编译功能中可能不会重复订阅。提交和发布功能在当前Job和Topic下第一次使用时会自动订阅,有存档记录后不会重复订阅。

  • 示例六:表名冲突兼容功能。

    项目如果绑定了离线计算源,则有可能存在离线表和实时表名冲突的情况。针对此类情况,兼容逻辑如下:

    1. 当任务的SQL中使用了存在于实时和离线的同名表,则在运行预编译相关功能时,会报出错误提示信息:Please set only one type for this table adi_ae_matrix_rt_slr_prim_cate like SET XXX.XXX.tableType = 'XX' style! The type list as follows: [CLOUD_HBASE, MAX_COMPUTE]

    2. 此时需要明确SQL中需要用到的表的数据类型,因为同一个project下同一种数据类型不会出现表名重复。可以根据报错提示信息选择CLOUD_HBASE或者 MAX_COMPUTE。示例代码如下:

      -- 强制指定使用MaxCompute物理表。
      SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
  • 示例七:MaxCompute(odps)数据源表支持。

    如果要使用MaxCompute数据源表,需要设置tableType='odps';否则会报参数缺失。

  • 示例八:维度表列裁剪。

    列裁剪可以通过以下SET语句手动开启。

    SET {project}.{dimtable}.dataphinColumnPruning='true';
    说明

    裁剪只会限于维表,且只支持直接join表,不支持直接join子查询。

  • 示例九:流批一体参数自定义。

    流批一体任务中会广泛使用镜像表,而镜像表在最终使用时会翻译为对应的流表或批表。

    为了适应流表/批表的多样性(流表/批表的数据源可能不一样,带来with参数中key可能不一样;流表/批表的某些设置可能不一样,比如batchSize等),所以需要将使用TableHints进行流表/批表的对应。语法如下:

    set project.table.${mode}.${key}

    设置批任务的起停时间。示例如下:

    set project.table.batch.startTime='2020-11-11 00:00:00';
    set project.table.batch.endTime='2020-11-12 00:00:00';

    设置流的AccessKey。示例如下:

    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx';
    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';
  • 示例十:结果表列裁剪。

    通过hint方式,可以指定结果表的列裁剪。

    重要

    该特性只对HBase的Sink表生效。

    -- 假设hbase ddl的语句如下
    CREATE TABLE hbase_sink(
      rowkey INT,
      family1 ROW<q1 INT>,
      family2 ROW<q2 STRING, q3 BIGINT>
    ) with (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>'
    );
    
    -- 报错:因为表字段不匹配
    insert into hbase_sink select key, ROW(f1q1) from ...
    
    -- 通过:这里通过hint方式指定裁剪sink表,指定写入rowkey和q1列
    insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/
    select key, ROW(f1q1) from ...
    
    -- 通过:这里通过hint方式指定裁剪sink表,指定写入rowkey和q2、q3列
    insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/
    select key, ROW(f2q2,f2q3) from ...