本文通过简单的示例,在实时计算开发控制台带您快速体验Paimon的基本功能,包括创建和删除Paimon Catalog,创建和删除Paimon Catalog表,向Paimon表写入、更新以及消费数据。
前提条件
如果您使用RAM用户或RAM角色等身份访问,需要确认已具有实时计算开发控制台相关权限,详情请参见权限管理。
已创建Flink工作空间,详情请参见开通实时计算Flink版。
已开通对象存储OSS且Bucket的存储类型为标准存储,详情请参见OSS控制台快速入门。OSS用于存储Paimon表的相关文件,包括数据文件与元数据文件等。
仅实时计算引擎VVR 8.0.5及以上版本支持Paimon表。
步骤一:创建Paimon Catalog
进入查询脚本页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击SQL开发,在查询脚本页签,新建查询脚本。
在文本编辑区域,输入如下代码创建Paimon Catalog。
-- my-Catalog为自定义的Catalag名称 CREATE Catalog `my-catalog` WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = '<warehouse>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>' );
参数配置项如下:
配置项
说明
是否必填
备注
type
Catalog类型。
是
固定值为Paimon。
metastore
元数据存储类型。
是
本文示例填写filesystem,其他类型详情请参考管理Paimon Catalog。
warehouse
OSS服务中所指定的数仓目录。
是
格式为oss://<bucket>/<object>。其中:
bucket:表示您创建的OSS Bucket名称。
object:表示您存放数据的路径。
请在OSS管理控制台上查看您的Bucket和Object名称。
fs.oss.endpoint
OSS服务的连接地址。
否
当warehouse指定的OSS Bucket与Flink工作空间不在同一地域,或使用其它账号下的OSS bucket时需要填写。
请参见访问域名和数据中心。
说明如果需要将Paimon表存储在OSS-HDFS中,则fs.oss.endpoint、fs.oss.accessKeyId与fs.oss.accessKeySecret均需要填写,其中fs.oss.endpoint的值为cn-<region>.oss-dls.aliyuncs.com,例如cn-hangzhou.oss-dls.aliyuncs.com。
fs.oss.accessKeyId
拥有读写OSS权限的阿里云账号或RAM账号的AccessKey。
否
当warehouse指定的OSS Bucket与Flink工作空间不在同一地域,或使用其它账号下的OSS Bucket时需要填写。获取方法请参见创建AccessKey。
fs.oss.accessKeySecret
拥有读写OSS权限的阿里云账号或RAM账号的AccessKey Secret。
否
选中创建Paimon Catalog代码,单击左侧的运行。
返回
The following statement has been executed successfully!
信息表示Catalog创建成功。
步骤二:创建Paimon Catalog表
在查询脚本页面,输入如下代码创建名为my_db的Paimon数据库以及名为my_tbl的Paimon表。
CREATE DATABASE `my-catalog`.`my_db`; CREATE TABLE `my-catalog`.`my_db`.`my_tbl` ( dt STRING, id BIGINT, content STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'changelog-producer' = 'lookup' );
说明为了后续能够流式消费Paimon表,本示例在WITH中指定了
changelog-producer
参数为lookup
,表示采用查找策略生成变更日志,详情请参见变更数据产生机制。选中创建Paimon数据库以及Paimon表的代码,单击左侧的运行。
返回
The following statement has been executed successfully!
信息表示名为my_db的Paimon数据库以及名为my_tbl的Paimon表创建成功。
步骤三:向Paimon表写入数据
在SQL开发页面的作业草稿页签,新建空白流作业草稿,详情请参见SQL作业开发。将如下插入语句复制到SQL编辑器。
-- Paimon结果表在每次检查点完成之后才会正式提交数据。 -- 此处将检查点间隔缩短为10s,是为了更快地提交数据。 -- 在生产环境下,系统检查点的间隔与两次系统检查点之间的最短时间间隔根据业务对延时要求的不同,一般设置为1分钟到10分钟。 SET 'execution.checkpointing.interval'='10s'; INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108',1,'apple'), ('20240108',2,'banana'), ('20240109',1,'cat'), ('20240109',2,'dog');
在作业开发页面顶部,单击部署,在部署新版本对话框中,根据需要填写或选中相关内容,单击确定。
在作业运维页面,单击目标作业名称操作列下的启动,选择无状态启动,单击启动。
当作业状态变为已完成,表示已完成数据的写入。
步骤四:从Paimon表中流式消费数据
新建空白流作业草稿,将如下SQL代码复制到SQL编辑器,使用print连接器,输出my_tbl表中所有数据到日志中。
CREATE TEMPORARY TABLE Print ( dt STRING, id BIGINT, content STRING ) WITH ( 'connector' = 'print' ); INSERT INTO Print SELECT * FROM `my-catalog`.`my_db`.`my_tbl`;
在作业开发页面顶部,单击部署,在部署新版本对话框中,根据需要填写或选中相关内容,单击确定。
在作业运维页面,单击目标作业操作列下的启动,选择无状态启动,单击启动。
在作业运维详情页面,查看Flink计算结果。
在作业运维页面,单击目标作业名称。
在作业日志页签下的运行日志页面,单击运行Task Managers页签下的Path, ID。
单击Stdout后,查看消费的Paimon数据。
步骤五:向Paimon表更新数据
新建空白流作业草稿,将如下SQL代码复制到SQL编辑器。
SET 'execution.checkpointing.interval' = '10s'; INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108', 1, 'hello'), ('20240109', 2, 'world');
在作业开发页面顶部,单击部署,在部署新版本对话框中,根据需要填写或选中相关内容,单击确定。
在作业运维页面,单击目标作业操作列下的启动,选择无状态启动,单击启动。
当作业状态变为已完成,表示已完成数据的写入。
在步骤四作业的作业运维页面的Stdout页签下,查看向Paimon表中更新的数据。
步骤六(可选):停止流式消费作业并清理资源
测试完毕后,如果您需要停止流式消费作业并清理资源,可参考以下步骤:
在作业运维页面,单击目标作业操作列的停止,停止作业运行。
在查询脚本页签的文本编辑区域,输入如下代码,删除Paimon数据文件以及Paimon Catalog。
DROP DATABASE `my-catalog`.`my_db` CASCADE; --将删除存储在OSS上的整个Paimon数据库的数据文件。 DROP CATALOG `my-catalog`; --将Paimon Catalog从实时计算开发控制台元数据中删除,但对OSS上的数据文件没有影响。
返回
The following statement has been executed successfully!
信息表示Paimon数据文件以及Paimon Catalog删除成功。
相关文档
向Paimon表写入数据或从Paimon表消费数据,详情请参见Paimon表数据写入和消费。
修改Paimon表结构(例如增加列、修改列类型),以及临时修改表参数,详情请参见修改表结构。
不同场景下Paimon主键表和Append Scalable表的常用优化,详情请参见Paimon性能优化。
关于Paimon的常见问题,详情请参见上下游存储。