本文介绍如何通过阿里云实时计算Flink版实时读写云原生数据仓库AnalyticDB PostgreSQL版数据。
背景信息
云原生数据仓库AnalyticDB PostgreSQL版是一种大规模并行处理(MPP)数据仓库服务,可提供海量数据在线分析服务。实时计算Flink版是基于Apache Flink构建的⼀站式实时大数据分析平台,内置丰富上下游连接器,满足不同业务场景的需求,提供高效、灵活的实时计算服务。通过实时计算Flink版读取AnalyticDB PostgreSQL版数据,可以充分发挥云原生数据仓库的优势,提高数据分析的效率和精度。
使用限制
该功能暂不支持AnalyticDB PostgreSQL版Serverless模式。
仅Flink实时计算引擎VVR 6.0.0及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版连接器。
仅Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版7.0版本。
说明如果您使用了自定义连接器,具体操作请参见管理自定义连接器。
前提条件
已创建Flink全托管工作空间。具体操作,请参见开通Flink全托管。
已创建AnalyticDB PostgreSQL版实例。具体操作,请参见创建实例。
AnalyticDB PostgreSQL版实例和Flink全托管工作空间需要位于同一VPC下。
步骤一:配置AnalyticDB PostgreSQL版实例
- 登录云原生数据仓库AnalyticDB PostgreSQL版控制台。
将Flink工作空间所属的网段加入AnalyticDB PostgreSQL版的白名单。如何添加白名单,请参见设置白名单。
单击登录数据库,连接数据库的更多方式,请参见客户端连接。
在AnalyticDB PostgreSQL版实例上创建一张名为adbpg_dim_table的维表并插入50条测试数据。
建表SQL和插入数据SQL的示例如下:
--创建名称为adbpg_dim_table的表。 CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); --向adbpg_dim_table的表中插入50行数据,其中id字段的值为从1到50的整数,而username字段的值为username字符串后面跟随当前行数的文本表示。 INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);
创建一张名为adbpg_sink_table的结果表,用于Flink写入结果数据。
CREATE TABLE adbpg_sink_table( id int, username text, score int );
步骤二:创建Flink作业
步骤三:编写作业代码并部署
将以下作业代码拷贝到作业文本编辑区。
---创建一个datagen源表。 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --创建adbpg维表。 CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flink****test', 'password' = '*******', 'maxJoinRows'='100', 'maxRetryTimes'='1', 'cache'='lru', 'cacheSize'='1000' ); --创建adbpg结果表。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flink****test', 'password' = '******', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); --维表和源表join后的结果插入adbpg结果表。 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds join dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;
根据实际情况修改以下参数,参数说明如下。
参数
是否必填
说明
URL
是
AnalyticDB PostgreSQL版的JDBC连接地址。格式为
jdbc:postgresql://<内网地址>:<端口>/<连接的数据库名称>
,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres
。tablename
是
AnalyticDB PostgreSQL版的表名。
username
是
AnalyticDB PostgreSQL版的数据库账号。
password
是
AnalyticDB PostgreSQL版的数据库账号密码。
说明更多参数和类型映射,详情请参见连接器云原生数据仓库AnalyticDB PostgreSQL版(ADB PG)。
在作业开发页面顶部,单击深度检查,进行语法检查。
单击部署。
在作业运维页面,单击启动。
步骤四:查看Flink写入数据
- 登录云原生数据仓库AnalyticDB PostgreSQL版控制台。
单击登录数据库,连接数据库的更多方式,请参见客户端连接。
执行如下查询语句,查看Flink写入数据。
SELECT * FROM adbpg_sink_table;