Dataphin的Flink_SQL任务支持多种开发方式,包括原生DDL+DML开发、基于Catalog开发、使用Dataphin元表进行开发、使用Dataphin数据源表进行开发、以及使用计算源物理表进行开发,且不同开发方式所创建的表支持任意混用,包括镜像表。不同开发方式的使用方法、使用场景以及优缺点不同。本文将为您介绍各开发方式帮助您更好的完成Flink_SQL任务开发。
Dataphin计算源物理表开发方式
Dataphin计算源物理表开发方式是指您在开发Flink_SQL任务时,您可以直接通过写项目名.表名称
的方式直接访问计算源中的物理表。并且支持跨项目访问,访问其他项目绑定的物理表。
目前支持访问Hologres、Hadoop、StarRocks计算源的物理表数据。
访问的物理表所在的项目已绑定支持的计算源。
使用示例
若您需要将example
项目计算源中的test
物理表数据插入到test_demo
物理表中。您可以参考以下示例代码:
insert into test_demo select id,name from example.test;
Dataphin数据源表开发方式
Dataphin数据源表开发方式是指您在开发Flink_SQL任务时,可以直接访问在Dataphin中所创建的数据源中的表进行任务开发。如果您希望使用此种方式,需要先在数据源上配置数据源编码。具体操作,请参见数据源管理概述。
配置数据源编码后,可在Flink SQL任务中通过数据源编码.table
或数据源编码.schema.table
的格式引用数据源中的表;如果需要根据所处环境自动访问对应环境的数据源,请通过${数据源编码}.table或${数据源编码}.schema.table
的格式访问;下面以MySQL和Hologres数据源为例说明访问方式:
目前仅支持MySQL、Hologres、MaxCompute、Hive、Oracle、StarRocks数据源。
MySQL、Hive、Oracle、StarRocks数据源表:支持通过
数据源编码.表名称
的形式访问数据源中的物理表。Hologres数据源表:支持通过
数据源编码.schema名称.表名称
的形式访问Hologres数据源中的物理表。
使用示例
若您需要将MySQL数据源(数据源编码为ds_demo_mysql
)的demo_mysql
物理表的数据插入至test_demo
物理表中。您可以参考以下代码完成开发。
insert into test_demo select id,name from ds_demo_mysql.demo_mysql;
若您需要将Hologres数据源(数据源编码为ds_demo_hologres
、schema的名称为hologres
)的demo_hologres
物理表的数据插入至test_demo
物理表中。您可以参考以下代码完成开发。
insert into test_demo select id,name from ds_demo_hologres.hologres.demo_hologres;
Dataphin元表开发方式
在Dataphin中,元表是在原生DDL+DML开发上更高一层的逻辑概念,元表是通过数据管理的跨存储类型表。开发过程中所用到的输入表、输出表、维表可以通过创建元表方式来进行创建和管理,以支持您通过引用元表的方式来创建其他数据表。这种方式可以使您一次建表,可多次引用。您无需重复编写DDL语句,无需进行繁杂的输入、输出、维表映射,从而简化开发,提升效率和体验。同时通过元表可以有效避免直接编写原生Flink DDL语句导致的敏感信息透出等问题。
使用示例
若您需要创建demo01
和demo02
数据表,并将demo01
的数据插入至demo02
。您可以参考以下步骤完成开发。
通过Dataphin元表功能,创建
demo01
和demo02
数据表,具体操作,请参见新建元表。在Flink_SQL任务中编写插入语句,示例代码如下:
INSERT into demo02 select * from demo01;
基于Catalog开发
基于Catalog的开发方式是指在Flink_SQL任务中通过创建Catalog连接数据库,并使用Catalog中的表。通过该方式能够避免编写表的DDL语句,以简化Flink SQL编码工作。例如,在Flink_SQL任务中创建Catalog01
并建表t1
后,在新的Flink_SQL任务中再次创建Catalog01
,可以直接访问表t1
。
仅支持开源Flink实时计算引擎。
不支持在Catalog中创建物理表(仅支持创建内存临时表)。
不支持
USE CATALOG/USE DATABASE
语句。ALTER TABLE
语句仅支持Flink 1.17版本。不支持以
catalog.database.'schema.table'
的格式访问表,仅支持以catalog.database.table
格式访问表。目前支持的Catalog类型包括JDBC(MySQL、Oracle)和Paimon。
使用示例
CREATE CATALOG my_catalog WITH (
'type' = 'jdbc',
'base-url' = 'jdbc:mysql://rm-uf*******7o.mysql.rds.aliyuncs.com:3306',
'default-database' = 'dataphin_01',
'username' = '*******',
'password' = '*******'
);
CREATE TEMPORARY TABLE t2 (
id bigint,
name STRING
) WITH (
'connector' = 'print'
);
-- write streaming data to dynamic table
INSERT INTO t2 SELECT id,name FROM my_catalog.dataphin_01.pf_id_name;
原生DDL+DML开发方式
原生DDL开发是指在Flink_SQL任务使用Flink SQL语句直接创建和管理数据表的开发方式。如使用CREATE TABLE/CREATE TEMPORARY TABLE
创建表。这种开发方式通过代码定义表结构并通过SQL语句来创建和管理表。
原生DDL+DML开发方式因为需要在代码中编写明文的用户名和密码,导致数据不安全,可能造成数据泄露,请谨慎使用。
使用示例
若您需要在Flink_SQL任务中使用原生DDL+DML开发方式,您可以参考以下示例代码进行创建。以下示例代码实现了模拟数据的输入输出(从t1表中读数据写到t2表中)。
使用原生DDL+DML开发语句创建表,您需关闭Dataphin编码规范中的禁止使用Flink原生DDL语句设置。具体操作,请参见编码规范。
create temporary table t1 (
id bigint,
name varchar
) with (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create temporary table t2 (
id bigint,
name varchar
) with (
'connector' = 'print'
);
-- begin statement set;
insert into t2 select id,replace(name, '\"', '"') as name from t1;
-- set;