Flink_SQL任务开发方式

Dataphin的Flink_SQL任务支持多种开发方式,包括原生DDL+DML开发、基于Catalog开发、使用Dataphin元表进行开发、使用Dataphin数据源表进行开发、以及使用计算源物理表进行开发,且不同开发方式所创建的表支持任意混用,包括镜像表。不同开发方式的使用方法、使用场景以及优缺点不同。本文将为您介绍各开发方式帮助您更好的完成Flink_SQL任务开发。

Dataphin计算源物理表开发方式

Dataphin计算源物理表开发方式是指您在开发Flink_SQL任务时,您可以直接通过写项目名.表名称的方式直接访问计算源中的物理表。并且支持跨项目访问,访问其他项目绑定的物理表。

重要
  • 目前支持访问HologresHadoop、StarRocks计算源的物理表数据

  • 访问的物理表所在的项目已绑定支持的计算源。

使用示例

若您需要将example项目计算源中的test物理表数据插入到test_demo物理表中。您可以参考以下示例代码:

insert into test_demo select id,name from example.test;

Dataphin数据源表开发方式

Dataphin数据源表开发方式是指您在开发Flink_SQL任务时,可以直接访问在Dataphin中所创建的数据源中的表进行任务开发。如果您希望使用此种方式,需要先在数据源上配置数据源编码。具体操作,请参见数据源管理概述

配置数据源编码后,可在Flink SQL任务中通过数据源编码.table数据源编码.schema.table的格式引用数据源中的表;如果需要根据所处环境自动访问对应环境的数据源,请通过${数据源编码}.table或${数据源编码}.schema.table的格式访问;下面以MySQL和Hologres数据源为例说明访问方式:

重要

目前仅支持MySQLHologresMaxCompute、HiveOracleStarRocks数据源。

  • MySQL、HiveOracleStarRocks数据源表:支持通过数据源编码.表名称的形式访问数据源中的物理表。

  • Hologres数据源表:支持通过数据源编码.schema名称.表名称的形式访问Hologres数据源中的物理表。

使用示例

若您需要将MySQL数据源(数据源编码为ds_demo_mysql)的demo_mysql物理表的数据插入至test_demo物理表中。您可以参考以下代码完成开发。

insert into test_demo select id,name from ds_demo_mysql.demo_mysql;

若您需要将Hologres数据源(数据源编码为ds_demo_hologres、schema的名称为hologres)的demo_hologres物理表的数据插入至test_demo物理表中。您可以参考以下代码完成开发。

insert into test_demo select id,name from ds_demo_hologres.hologres.demo_hologres;

Dataphin元表开发方式

在Dataphin中,元表是在原生DDL+DML开发上更高一层的逻辑概念,元表是通过数据管理的跨存储类型表。开发过程中所用到的输入表、输出表、维表可以通过创建元表方式来进行创建和管理,以支持您通过引用元表的方式来创建其他数据表。这种方式可以使您一次建表,可多次引用。您无需重复编写DDL语句,无需进行繁杂的输入、输出、维表映射,从而简化开发,提升效率和体验。同时通过元表可以有效避免直接编写原生Flink DDL语句导致的敏感信息透出等问题。

使用示例

若您需要创建demo01demo02数据表,并将demo01的数据插入至demo02。您可以参考以下步骤完成开发。

  1. 通过Dataphin元表功能,创建demo01demo02数据表,具体操作,请参见新建元表

  2. 在Flink_SQL任务中编写插入语句,示例代码如下:

INSERT into demo02 select * from demo01;

基于Catalog开发

基于Catalog的开发方式是指在Flink_SQL任务中通过创建Catalog连接数据库,并使用Catalog中的表。通过该方式能够避免编写表的DDL语句,以简化Flink SQL编码工作。例如,在Flink_SQL任务中创建Catalog01并建表t1后,在新的Flink_SQL任务中再次创建Catalog01,可以直接访问表t1

重要
  • 仅支持开源Flink实时计算引擎。

  • 不支持在Catalog中创建物理表(仅支持创建内存临时表)。

  • 不支持USE CATALOG/USE DATABASE语句。

  • ALTER TABLE语句仅支持Flink 1.17版本。

  • 不支持以catalog.database.'schema.table'的格式访问表,仅支持以catalog.database.table格式访问表。

  • 目前支持的Catalog类型包括JDBCMySQLOracle)和Paimon

使用示例

CREATE CATALOG my_catalog WITH (
    'type' = 'jdbc',
    'base-url' = 'jdbc:mysql://rm-uf*******7o.mysql.rds.aliyuncs.com:3306',
    'default-database' = 'dataphin_01',
    'username' = '*******',
    'password' = '*******'
);
CREATE TEMPORARY TABLE t2 (
    id bigint,
    name STRING
) WITH (
    'connector' = 'print'
);

-- write streaming data to dynamic table
INSERT INTO t2 SELECT id,name FROM my_catalog.dataphin_01.pf_id_name;

原生DDL+DML开发方式

原生DDL开发是指在Flink_SQL任务使用Flink SQL语句直接创建和管理数据表的开发方式。如使用CREATE TABLE/CREATE TEMPORARY TABLE创建表。这种开发方式通过代码定义表结构并通过SQL语句来创建和管理表。

重要

原生DDL+DML开发方式因为需要在代码中编写明文的用户名和密码,导致数据不安全,可能造成数据泄露,请谨慎使用

使用示例

若您需要在Flink_SQL任务中使用原生DDL+DML开发方式,您可以参考以下示例代码进行创建。以下示例代码实现了模拟数据的输入输出(从t1表中读数据写到t2表中)。

说明

使用原生DDL+DML开发语句创建表,您需关闭Dataphin编码规范中的禁止使用Flink原生DDL语句设置。具体操作,请参见编码规范

create temporary table t1 (
id bigint,
name varchar
) with (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create temporary table t2 (
id bigint,
name varchar
) with (
'connector' = 'print'
);

-- begin statement set;
insert into t2 select id,replace(name, '\"', '"') as name from t1;
-- set;