CREATE ETL

CREATE ETL语句用于在流引擎中创建ETL任务。

引擎与版本

CREATE ETL仅适用于流引擎。要求3.1.8及以上版本。

说明

您可以通过控制台查看并升级小版本

语法

create_etl_statement ::= CREATE ETL [IF NOT EXISTS] etl_name
                        [WITH etl_properties]
                        AS INSERT INTO [[catalog_name.]db_name.]table_name column_list 
                        select_statement

etl_properties       ::= '(' property_definition (',' property_definition)* ')'
property_definition  ::= property_name '=' property_value  
column_list          ::= '(' column_name (',' column_name)* ')'

使用说明

ETL名称(etl_name

必填参数。ETL名称的设置需遵循以下规则:

  • 可包含数字、大写英文字符、小写英文字符、半角句号(.)、中划线(-)和下划线(_)。

  • 不能以半角句号(.)或中划线(-)开头。

  • 长度为1~255字符。

ETL属性(etl_properties)

您可以通过WITH关键字添加以下ETL属性:

重要

设置时,属性名前后需添加反引号(`),属性值前后需添加单引号(')。例如`parallelism` = '2'

属性

数据类型

说明

默认值

parallelism

INTEGER

任务并行度。

1

sink.ignore-update-before

BOOLEAN

Sink时是否忽略-U

false

sink.ignore-delete

BOOLEAN

Sink时是否忽略-D

false

sink.null-mode

STRING

Sink时是否写入Null值,取值如下:

  • NO_OP(保留原数据Null值,直接写入)

  • SKIP(跳过Null值,不写入)

NO_OP

udf.xxxx

STRING

配置UDF,需先上传UDF jar。参数格式如下:udf.<udfFunction> = <jarName>#<className>,其中udfFunction是使用udf的函数名,jarName是该udfjar包名,className是具体的类名。

stream.xxx

ANY

流引擎作业参数,例如:execution.checkpointing.interval

指定结果表

参数

是否必填

说明

catalog_name

结果表的Catalog。

db_name

结果表所在数据库。

table_name

结果表的名称。

column_name

结果表的列名。

SQL查询语句(select_statement)

用于筛选数据的SQL语句,例如 SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;

示例

假设宽表引擎中的源表source和结果表sink的结构如下:

-- 源表source
CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
-- 结果表1:sink
CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
  • 示例一:创建ETL filter1,将源表source中符合条件的数据插入到结果表sink中。

    CREATE ETL IF NOT EXISTS filter1
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;
  • 示例二:创建ETL filter2,将源表source中符合条件的数据插入到结果表sink中,同时添加属性。

    CREATE ETL IF NOT EXISTS filter2
    WITH (
    `parallelism` = '2',
    `stream.execution.checkpointing.interval` = '30000'
    )
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;