本文介绍AnalyticDB PostgreSQL版如何通过DMS的作业调度功能,实现定时调度RDS PostgreSQL数据库的数据。
功能介绍
本次作业调度使用OSS作为中间态的存储,调度任务会将数据从RDS PostgreSQL数据库加载到OSS上,再使用AnalyticDB PostgreSQL版Serverless模式对该数据进行分析。整个ETL(Extract-Transform-Load)链路调度均通过DMS实现。作业调度示意图如下:
优势
数据存储在OSS上,实现低成本存储归档,且数据不会被删除。
数据从RDS数据库以“T+1”的形式加载到AnalyticDB PostgreSQL版Serverless模式中进行高性能数据分析。
DMS作业调度支持配置自动调度框架,低代码、白屏化操作,上手容易。
注意事项
RDS数据库中的数据,需要可以指定条件来增量归档。例如通过表中时间列按天归档。
RDS PostgreSQL实例、AnalyticDB PostgreSQL版实例和OSS Bucket需在同一地域内。
准备工作
AnalyticDB PostgreSQL版
RDS PostgreSQL
创建RDS PostgreSQL实例。如何创建实例,请参见快速创建RDS PostgreSQL实例。
说明RDS PostgreSQL实例需要为PostgreSQL 9.4至PostgreSQL 13.0版本。
创建高权限账号。如何创建高权限账号,请参见创建账号。
OSS
创建OSS Bucket。如何创建OSS Bucket,请参见控制台创建存储空间。
获取OSS Bucket的Bucket名称和Endpoint(地域节点)信息,获取方式如下:
登录OSS管理控制台。
在左侧导航栏中,单击目标Bucket列表。
在Bucket列表,单击目标Bucket。
在Bucket列表页面,您可以获取Bucket名称。
单击左侧导航栏中的概览。
在概览页面的访问端口区域,您可以获取Endpoint(地域节点)。
建议使用ECS的VPC网络访问(内网)的访问域名进行访问。
获取AccessKey ID和AccessKey Secret
获取AccessKey ID和AccessKey Secret的具体操作,请参见创建AccessKey。
准备服务和数据
RDS PostgreSQL
连接RDS PostgreSQL数据库。如何连接数据库,请参见连接PostgreSQL实例。
本文示例中所有操作均使用DMS连接并执行。
创建测试表t_src,并插入测试数据。语句如下:
CREATE TABLE t_src (a int, b int, c date); INSERT INTO t_src SELECT generate_series(1, 1000), 1, now();
安装OSS外表插件。语句如下:
CREATE EXTENSION IF NOT EXISTS oss_fdw;
创建一个OSS外表。语句如下:
CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS (host '<bucket_host>' , id '<access_key>', key '<secret_key>',bucket '<bucket_name>');
参数说明如下:
参数
说明
host
准备工作中获取的OSS的Endpoint(地域节点)。
id
准备工作中获取的AccessKey ID。
key
准备工作中获取的AccessKey Secret。
bucket
准备工作中获取的OSS的Bucket名称。
AnalyticDB PostgreSQL版
连接AnalyticDB PostgreSQL版数据库。如何连接数据库,请参见客户端连接。
本文示例中所有操作均使用DMS连接并执行。
创建一张与RDS PostgreSQL侧表结构一致的表t_target。语句如下:
CREATE TABLE t_target (a int, b int, c date);
说明AnalyticDB PostgreSQL版Serverless模式暂不支持主键。
安装OSS外表插件。语句如下:
CREATE EXTENSION IF NOT EXISTS oss_fdw;
创建OSS Server和User Mapping。语句如下:
CREATE SERVER oss_serv FOREIGN DATA WRAPPER oss_fdw OPTIONS ( endpoint '<bucket_host>', bucket '<bucket_name>' ); CREATE USER MAPPING FOR PUBLIC SERVER oss_serv OPTIONS ( id '<access_key>', key '<secret_key>' );
参数说明如下:
参数
说明
endpoint
准备工作中获取的OSS的Endpoint(地域节点)。
id
准备工作中获取的AccessKey ID。
key
准备工作中获取的AccessKey Secret。
bucket
准备工作中获取的OSS的Bucket名称。
配置ETL任务
登录数据管理服务DMS控制台。
单击顶部菜单栏中的集成与开发(DTS),然后在左侧导航栏中,选择 。
在任务流区域中,单击新增任务流。
在新建任务流对话框中,输入任务流名称后,单击确认。
本次示例中,任务流名称为RDSPG数据导入OSS。
配置RDS PostgreSQL数据归档任务流。具体步骤如下:
在RDSPG数据导入OSS页签中,将左侧数据加工分类中的单实例SQL拖动到中间画布中。
可选:单击画布中该任务的图标,重命名该任务。
重命名任务是为了方便后续维护整个ETL链路,您可以根据自身需求设置任务名。本次示例中的任务名设置为RDS数据抽取。
单击画布中新建任务的图标。
选择需要绑定的RDS PostgreSQL数据库。
您可以切换至RDS PostgreSQL数据库的SQL编辑页签查看数据库。
在下方编辑框中,粘贴以下SQL:
DROP FOREIGN TABLE IF EXISTS oss_${mydate}; CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate} (a int, b int, c date) SERVER ossserver OPTIONS ( dir 'rds/t3/${mydate}/', DELIMITER '|' , format 'csv', encoding 'utf8'); INSERT INTO oss_${mydate} SELECT * FROM t_src WHERE c >= '${mydate}';
单击右侧的变量设置,选择节点变量,将变量名设置为
mydata
,时间格式设置为yyyyMMdd
。
返回RDSPG数据导入OSS任务流,配置AnalyticDB PostgreSQL版的加载任务。具体步骤如下:
在RDSPG数据导入OSS页签中,将左侧数据加工分类中的单实例SQL拖动到中间画布中。
可选:单击画布中该任务的图标,重命名该任务。
重命名任务是为了方便后续维护整个ETL链路,您可以根据自身需求设置任务名。本次示例中的任务名设置为ADBPG数据加载。
单击画布中新建任务的图标。
选择需要绑定的AnalyticDB PostgreSQL版数据库。
AnalyticDB PostgreSQL版数据库获取方式与步骤5中获取RDS PostgreSQL数据库方式一致。
在下方编辑框中,粘贴以下SQL:
CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate}( a int , b int , c date ) SERVER oss_serv OPTIONS ( dir 'rds/t3/${mydate}/', format 'csv', delimiter '|', encoding 'utf8'); INSERT INTO t_target SELECT * FROM oss_${mydate};
单击右侧的变量设置,选择节点变量,将变量名设置为
mydata
,时间格式设置为yyyyMMdd
。
配置调度任务,需要先运行RDS PostgreSQL数据抽取任务,再运行AnalyticDB PostgreSQL版数据加载任务。配置方法如下:
选中RDS数据抽取任务右侧的圆点,拖动到ADBPG数据加载任务上,完成拖动后显示效果如下:
单击页面下方的任务流信息,打开调度配置下开启调度的开关。
选择需要的作业调度周期,每个周期调度任务都会进行RDS侧数据的抽取和AnalyticDB PostgreSQL版侧数据的加载。
完成配置后,单击左上方试运行。
任务流测试无误后单击发布。