Dataphin在Flink SQL基础上遵循ANSI-SQL标准进行了部分语法改进,以帮助您提升开发效率。本文将为说明Dataphin Flink SQL支持的词法新特性及使用方法。
支持跨项目空间引用
DDL与DML表名语法增强
通常一个标准的Flink SQL任务需要完整DDL与DML语句。示例如下:
--DDL
CREATE TABLE input_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'datagen'
);
CREATE TABLE output_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'print'
);
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;
以上语句的DDL在多个任务中需要重复定义。为避免重复声明,提高开发效率。Dataphin对FlinkSQL的表名做了语法扩展增强。在Dataphin中定义元表(schema)后, 可以在任何任务中通过以下语法直接引用, 同时在多个任务中无需再次声明, 并且支持跨项目空间引用。
<compoundTableName> ::= [projectName <DOT>] tableName
示例说明
以上一段SQL为例,定义好元表后,研发任务中只需要输入以下语句。
INSERT INTO output_table -- 项目前缀缺省时默认是当前所在项目。
SELECT field1, field2 FROM ${project_name}.input_table;
内置函数与UDX语法增强
通常一个标准的Flink SQL任务使用UDX时,需要进行声明。示例如下:
CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS
SELECT MY_UDX(args)
FROM ...
以上自定义UDX语句在多个任务中需要重复定义。为避免重复声明,提高开发效率。Dataphin引入资源概念,您可以在Dataphin注册函数后,在任何任务中通过以下语法直接引用,同时在多个任务中无需再次声明,并且支持跨项目空间引用。
<compoundFunctionName> ::= [projectName <COLON>] functionName
示例说明
以上一段SQL为例,在Dataphin注册函数后,研发任务中只需要输入以下语句。
CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- 项目前缀缺省时默认是当前所在项目FROM。
FROM ...
特殊说明
Dataphin支持overload或override内置函数,且优先使用overload或override的内置函数。示例如下:
示例一: 有项目前缀。
SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin将默认使用已overload了的内置函数`SUBSTRING`, 若未在`${project_name}`下找到该自定义函数, 在预编译时将无法通过。
示例二: 无项目前缀。
SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin将优先在当前项目内寻找自定义的`SUBSTRING`函数, 若找不到, 则认为是内置函数。
使用UpdateStatement及TableHints设置任务级别DDL参数
在Dataphin中支持对已经创建的元表进行DDL的WITH参数配置,同时Dataphin支持配置任务粒度的参数,包括Watermark、计算列、主键、Header等, 且增加了SetOption
语法支持。语法规范如下:
SET {
[ projectName <DOT>] tableName <DOT>
{
propertyName
| WATERMARK
| <computedColumn>
| <primaryKey>
| <procTime>
| {fieldName <DOT> <isHeader>}
}
} <EQ> {
@variable = identifier
| @variable = literal
| @variable = expression
}
设置WITH参数时,建议将表DDL的公用WITH参数在元表中进行定义, 任务粒度的个性化参数可以通过SET
语句或TableHints
在代码中定义, 多个SET语句间以;
为分隔符。
使用Flink SQL保留字时需要加(``)符号。示例如下:
Watermark:SET语句使用
SET [project.]table.`watermark` = ...
。Partition:SET语句使用
SET [project.]table.`partition` = ...
。
DDL参数优先级
任务级别的参数覆盖
任务级别的参数设置会覆盖元表中配置的参数设置(仅针对可覆盖的参数)。例如,元表中设置表T
的primary key
为field1
, 任务中使用表T
后, 如果存在任何primary key
设置,都以任务中的设置为准 (即任务中设置表T的primary key
为field2
, 则field1
在该任务中不再是表T
的主键)。
基于Dataphin的安全和管理策略,元表上的部分参数不允许在SQL任务中通过SET覆盖。例如MaxCompute元表上定义的表名,不能在SQL任务中通过SET语句覆盖。不同类型的元表有不同的可覆盖参数,具体以实际操作为准。
任务内SET参数
任务内SET参数具有全局优先级, TableHints的作用范围是Context-specific
, SET参数会补充到TableHints中不存在的参数项,相同参数项会被Shade。
例如Blink在2.2.5版本后, 增加了对维表设置INDEX
和UNIQUE INDEX
的支持, 所以目前声明JOIN KEY
的关键字时存在PIRMARY KEY
、INDEX
、UNIQUE INDEX
三种,故Dataphin会联合做判断。示例如下:
SET my_dim_table.primarykey = 'field1';
CREATE VIEW tmp1 AS
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;
CREATE VIEW tmp2 AS
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;
以上代码Dataphin将会理解为在生成tmp1
时设置my_dim_table
的primary key
是field1
(参数填充),index
是field2
;在生成tmp2
时设置my_dim_table
的primary key
是field3
(参数Shade)。
更多示例说明:
示例一:Flink SQL Source表参数。
以创建kafka源表为例。
-- vvr语法 CREATE TEMPORARY TABLE kafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `category_id` BIGINT, `behavior` STRING, `topic` STRING METADATA VIRTUAL, `partition` BIGINT METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' )
个性化参数转换成Dataphin Flink SQL的SET表达式。
-- 如果表属性的名字是sql关键字,或者出现了其他字符,可以用反引号转义。 SET kafkaTable.`properties.group.id` = 'new_group_id'; SET kafkaTable.`scan.startup.mode` = 'latest-offset';
示例二:Flink SQL Sink表参数。
以创建aliHBase结果表为例。
create table hbase_output( rk varchar, rk1 varchar, rk2 varchar, f bigint, PRIMARY KEY(rk) ) with ( type='alihbase', diamondKey='xxxxxxx', diamondGroup='yyyyyyy', columnFamily='cf', tableName='blink_hbase_test', bufferSize=500; );
若需要在任务粒度中Overwrite掉元表中的配置, 可做如下设置。
SET hbase_output.bufferSize = 1000;
示例三:Flink SQL Dim表参数。
以创建MaxCompute维表为例。
CREATE TABLE white_list ( id varchar, name varchar, age int, PRIMARY KEY (id) --PERIOD FOR SYSTEM_TIME -- 备注: 从blink3.x开始维表DDL不需要PERIOD FOR SYSTEM_TIME标识。 ) with ( type = 'odps', endPoint = 'your_end_point_name', project = 'your_project_name', tableName = 'your_table_name', accessId = 'your_access_id', accessKey = 'your_access_key', `partition` = 'ds=20180905', cache = 'ALL' );
转换成Dataphin Flink SQL的SET表达式。
SET white_list.cache='ALL'; SET white_list.cacheTTLMs=86400000; --若任务粒度想特别设置该表的cache更新时间, 可在此增加SET。
示例四:WATERMARK、计算列、HEADER、PROCTIME、PRIMARY KEY/(UNIQUE)INDEX设置。
目前元表中只支持主键(Primary Key)和Header设置, 若需要使用WATERMARK、PROCTIME、计算列等,需要在任务粒度进行设置。语法规范如下:
设置WATERMARK。
语法如下:
SET { [projectName <DOT>] tableName <DOT> WATERMARK } <EQ> { WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset) }
下面以标准的Flink SQL任务DDL WATERMAKR为例。
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts TIMESTAMP, WATERMARK FOR ts AS withOffset(ts, 1000) ) with ( type = 'sls', ...);
转换为Dataphin Flink SQL后WATERMARK设置方式。
3.7 及以上版本,需要带上单引号。 SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)'; 3.6 版本 SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);
设置计算列。
语法如下:
SET { [projectName <DOT>] tableName <DOT> <computedColumn> } <EQ> { column_name AS computed_column_expression | { <LPAREN> column_name AS computed_column_expression (<COMMA> column_name AS computed_column_expression)* <RPAREN> } }
下面以标准的Flink SQL任务DDL计算列为例。
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts AS to_timestamp(c, 'yyyyMMddHHmmss') ) with ( type = 'sls', ...);
转换为Dataphin Flink SQL后计算列设置方式。
单个计算列方式。
SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');
多个计算列方式。
SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
设置PROCTIME。
语法如下:
SET { [projectName <DOT>] tableName <DOT> <procTime> } <EQ> { columnName AS PROCTIME() }
转换为Dataphin Flink SQL后PROCTIME设置方式。
SET sls_stream.procTime= d AS PROCTIME();
设置PRIMARYKEY、INDEX、UNIQUE INDEX。
重要Blink 3.6.0及以上版本, TableHints采用社区的Hints方式。Ververica Flink和开源Flink不支持INDEX和UNIQUE INDEX。
语法如下:
query : select /*+ hint_content */ ... from table_name1 /*+ hint_content */ join table_name2 /*+ hint_content */ ... hint_content : hint_item[, hint_item]* hint_item : hint_name | hint_name(k1=v1 [ , k2=v2 ]*) | hint_name(hint_opt [ ,hint_opt ]*) k : simple_identifier v : string_literal hint_opt : simple_identifier | numeric_literal | string_literal
下面以标准的Flink SQL任务TableHints为例。
INSERT INTO table SELECT source_table.* FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;
流表JOIN时Flink允许用户根据自身情况声明
JOIN KEY
的关键字,SET具有全局优先级,TableHints为context-specific
。SET dim_table.index = 'field1'; -- set pk和 set unique index 以此类推 -- 等价于dim_table的DDL 声明INDEX (field1) INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等价于dim_table的DDL 声明INDEX (field2)。 INSERT INTO table2 SELECT source_table.* FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等价于dim_table的DDL 声明PRIMARY KEY (field1), INDEX (field2)。 INSERT INTO table3 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等价于dim_table的DDL 声明PRIMARY KEY (field1), INDEX (field1)。 INSERT INTO table4 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 等价于dim_table的DDL 声明PRIMARY KEY (field1), UNIQUE INDEX (field2, field3)。 INSERT INTO table5 SELECT source_table.* FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME() ON ...;
重要以上的
join with
语法,为Blink引擎早期使用方式,新的Blink、Ververica Flink、开源Flink请使用TableHint。Tablehint示例如下:
-- 指定dim_table的主键是id字段 (blink、vvr、apache-flink都支持)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 指定dim_table的维表主键是id(仅blink支持)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- 指定dim_table的维表唯一主键是id(仅blink支持)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;
设置HEADER。
语法如下:
SET { [projectName <DOT>] tableName <DOT> <isHeader> } <EQ> { TRUE }
示例如下:
-- blink demo create table kafka_table ( `message` VARBINARY, topic varchar HEADER ) WITH ( type = 'kafka010', topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3' ); -- vvr demo create table kafka_table ( `message` VARBINARY, topic varchar METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' );
转换为Dataphin SET支持以下两种语法。
SET kafka_table.topic.isHeader = true; --不加单引号。 SET kafka_table.topic.isHeader = 'true'; --加单引号。
示例五:TimeTunnel自动订阅功能(TimeTunnel作为输入)。
示例代码如下:
set xx_project.xx_table.accessId='xxxxxx'; set xx_project.xx_table.accessKey='xxxxxx';
超级管理员账号及无权限账号不可自动订阅,必须在SQL中配置。
其他个人账号可配置,也可以不配置如上SET信息。通常情况下,在无法获取到JobName时会用默认的JobName名
default
去记录存档订阅的Topic信息,因此对于不同的Job,同一Topic在单独的预编译功能中可能不会重复订阅。提交和发布功能在当前Job和Topic下第一次使用时会自动订阅,有存档记录后不会重复订阅。
示例六:表名冲突兼容功能。
项目如果绑定了离线计算源,则有可能存在离线表和实时表名冲突的情况。针对此类情况,兼容逻辑如下:
当任务的SQL中使用了存在于实时和离线的同名表,则在运行预编译相关功能时,会报出错误提示信息:Please set only one type for this table adi_ae_matrix_rt_slr_prim_cate like SET XXX.XXX.tableType = 'XX' style! The type list as follows: [CLOUD_HBASE, MAX_COMPUTE]。
此时需要明确SQL中需要用到的表的数据类型,因为同一个
project
下同一种数据类型不会出现表名重复。可以根据报错提示信息选择CLOUD_HBASE或者 MAX_COMPUTE。示例代码如下:-- 强制指定使用MaxCompute物理表。 SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
示例七:MaxCompute(odps)数据源表支持。
如果要使用MaxCompute数据源表,需要设置
tableType='odps';
否则会报参数缺失。示例八:维度表列裁剪。
列裁剪可以通过以下SET语句手动开启。
SET {project}.{dimtable}.dataphinColumnPruning='true';
说明裁剪只会限于维表,且只支持直接join表,不支持直接join子查询。
示例九:流批一体参数自定义。
流批一体任务中会广泛使用镜像表,而镜像表在最终使用时会翻译为对应的流表或批表。
为了适应流表/批表的多样性(流表/批表的数据源可能不一样,带来
with
参数中key
可能不一样;流表/批表的某些设置可能不一样,比如batchSize
等),所以需要将使用TableHints进行流表/批表的对应。语法如下:set project.table.${mode}.${key}
设置批任务的起停时间。示例如下:
set project.table.batch.startTime='2020-11-11 00:00:00'; set project.table.batch.endTime='2020-11-12 00:00:00';
设置流的AccessKey。示例如下:
set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx'; set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';
示例十:结果表列裁剪。
通过hint方式,可以指定结果表的列裁剪。
重要该特性只对HBase的Sink表生效。
-- 假设hbase ddl的语句如下 CREATE TABLE hbase_sink( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q2 STRING, q3 BIGINT> ) with ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' ); -- 报错:因为表字段不匹配 insert into hbase_sink select key, ROW(f1q1) from ... -- 通过:这里通过hint方式指定裁剪sink表,指定写入rowkey和q1列 insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/ select key, ROW(f1q1) from ... -- 通过:这里通过hint方式指定裁剪sink表,指定写入rowkey和q2、q3列 insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/ select key, ROW(f2q2,f2q3) from ...