企业级数仓从Teradata迁移到AnalyticDB最佳实践

罗成对
  • 收获赞:73
  • 擅长领域:十几年云计算/数据库从业经验,涉及OLTP/OLAP/NoSQL/DBaaS等。擅长领域:云计算 数据库

前言

Teradata数仓迁移容易么?

2021年4月,云原生数仓AnalyticDB PostgreSQL(以下简称ADB PG)产研团队全程走完宏源证券的Teradata数仓升级改造项目,躬身入局才深有感触,认知也发生转变(确实不容易),过程中与ISV、GTS人员沟通发现他们也有同感。因此,有必要把项目实施过程抽象总结出来形成文案,以服务后来者,如果后续实施类似项目能少走弯路且能提高一点效率也不枉编写此书。

编写目的

旧屋改造比建新房要复杂得多,借用金融行业实施过数仓迁移前辈的话“迁移难度不亚于飞机在空中更换发动机,任何风险都可能导致飞机坠毁。”。数仓迁移是一项系统工程,不仅涉及技术性工作如方案设计/方案实施(模型、数据、ETL作业任务迁移等),而且非技术性工作如项目管理与协调也非常重要(阿里内部各团队之间,客户多部门之间,阿里/ISV/客户三方之间)。既然复杂,就得梳理,编写此书目的:

  • 提升认知,达成共识

  • 为后续类似项目提供参考,以期降本提效

  • 成为可复制方案的重要成果之一

预期读者

推荐预期读者为ADB PG产研、SA、PDSA、GTS、ISV。

对于ADB PG产研,有助于更多产研同学投入到前线指导GTS/ISV做好迁移服务。

对于SA/PDSA,有助于统一思路,做可复制案例宣贯。

对于GTS交付,有助于少走弯路,提高交付效率。

对于ISV,有助于做好交付,践行ADB PG最佳实践。

术语

英文

英文简

解释

Independent Software Vendors

ISV

独立软件开发商

Enterprise Data Warehouse

EDW

企业级数据仓库

Subject

Subject

主题,是在较高层次上将企业信息系统中的数据进行综合、归类和分析利用的一个抽象概念,每一个主题基本对应一个宏观的分析领域,例如“销售分析”就是一个分析领域

Extract-Transform-Load

ETL

抽取、转换、加载

Data Marts

DM

数据集市,部门级数据仓库

Stage

Stage

又称贴源层,Stage层作为一个临时缓冲区,Stage层中的表结构和数据定义一般与业务系统保持一致

Operational Data Store

ODS

操作数据存储,是最接近数据源中数据的一层,数据源中的数据经过ETL之后,装入本层

Data Warehouse Detail

DWD

数据明细层,属于分析的公共资源,部分数据直接来自消息系统如Kafka/Flink,部分数据为接口层数据与历史数据合成

Data WareHouse Middle

DWM

数据中间层,又称轻度汇总层,对DWD层的生产数据进行轻度综合和汇总统计

Data WareHouse Service

DWS

数据服务层,又称主题层/数据集市/宽表,由DWM轻度汇总层和DWD明细层数据计算生成

Application Data Service

ADS

应用数据层,该层主要是提供数据产品和数据分析使用的数据,一般会存储在ES、mysql等系统中供线上系统使用

Dimension

DIM

维表层,高基数维度数据(用户资料表、商品资料表等),低基数维度数据(配置表等)

什么是数据仓库?数据仓库之父Bill Inmon在1991年出版的“Building the Data Warehouse”一书中所提出的定义被广泛接受——数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反映历史变化(Time Variant)的数据集合,用于支持管理决策(Decision Making Support)。

  • 面向主题:各个源系统之间在物理上往往是分离的,数据也是按照源系统服务的业务/流程进行组织,而数据仓库中的数据是按照一定主题域进行组织。

  • 集成性:数据仓库中的数据是在对原有分散的数据库数据进行抽取、清理的基础上,再通过系统加工、汇总和整理后得到,必须消除源数据中的不一致性,以保证数据仓库内的信息在企业级的全局一致性。

  • 相对稳定:数据仓库的数据主要供企业决策分析之用,主要用来查询,很少涉及修改和删除,通常情况下数据也不会轻易的被刷新。

  • 反映历史变化:数据仓库中的数据通常包含历史信息,记录了企业从过去某一时点到目前的各个阶段的信息,通过这些信息,可以对企业的发展历程和未来趋势做出定量分析和预测。

数据仓库概述

数仓发展史

非互联网时代

基本上可以分为五个时代、四种架构:

  • 约在1991年前的全企业集成

  • 1991年后的企业数据集成EDW时代

  • 1994年-1996年的数据集市

  • 1996-1997年左右的两个架构吵架

  • 1998年-2001年左右的合并年代

互联网时代

按角色划分三阶段:

  • 阶段一:约在2008年-2011年初的互联网数据平台,与传统行业数仓建设相似度高,主要体现在角色划分和技术选型。

  • 阶段二:约在2011年到2014 年左右,随着数据平台建设逐渐进入快速迭代期,数据产品、数据产品经理这两个词逐渐升温以及被广泛得到认可。各类数据产品经理(偏业务数据产品、偏工具平台数据产品)推进数据平台的建设,分析师参与数据平台直接建设比重增加。

  • 阶段三:2014年之后,使用数据的一些角色(分析师、运营或产品)会自己参与到从数据整理、加工、分析阶段,原有ETL、数据模型角色转为给用户提供平台、产品、数据培训与使用咨询,数据分析师直接参与到数据平台过程、数据产品的建设中去。

数据特点对比

非互联网

互联网

移动互联网

数据来源

各类业务数据库结构化数据

系统日志,各类业务数据库结构化数据,网页,多媒体信息

含互联网数据,IoT数据

数据特性

结构化数据

非结构化数据居多

结构化/非结构化数据都多

数据量

GB到TB级别

TB到PB级别

TB级,PB级,可能EB级

产生周期

天/周为单位

秒为单位

秒或更小单位

数据价值

长期有效

随时间衰减

随时间快速衰减

数据聚合度

高度聚合

聚合度低

聚合度很低

数仓模型

Kimball的维度模型和Inmon的ER模型是两种主流的数仓建模方法。

关系建模(Bill Inmon)

关系建模站在企业角度进行面向主题的抽象,用实体加关系描述的数据模型描述企业业务架构,在范式理论上符合3NF,不针对某个具体业务流程。关系模型要求数据以最细粒度存在。

Inmon都是以数据源头为导向。Inmon 模式从流程上看是自顶向下的,即从数据源到数据仓库再到数据集市的(先有数据仓库再有数据集市)一种瀑布流开发方法。

维度建模(Ralph Kimball)

维度建模以分析决策的需求为出发点构建模型,一般有较好的大规模复杂查询的响应性能,更直接面向业务,典型代表是我们比较熟知的星形模型,以及在一些特殊场景下适用的雪花模型。多维模型则以轻粒度汇总数据存在。

Kimball都是以最终任务为导向。Kimball模式从流程上看是自底向上的,即从数据集市到数据仓库再到数据源(先有数据集市再有数据仓库)的一种敏捷开发方法。Kimball往往意味着快速交付、敏捷迭代,不会对数据仓库架构做过多复杂的设计,在变换莫测的互联网行业,这种架构方式逐渐成为一种主流范式。

Inmon与Kimball对比

Inmon关系模型

Kimball维度模型

成本

成本高;

构建周期长,需深入了解源系统业务;

成本低;

构建周期短,可快速响应分析需求;

性能

ETL算力耗费较大;

不适合做大数据量的分析查询;

大数据量的分析查询性能较好;

扩展性

高度抽象,模型扩展能力灵活;

对源系统适配能力较弱;

易用性

比较好理解源系统的数据;

比较好理解分析需求;

相关概念对比

数据集市

数据集市(Data Mart),也叫数据市场,就是满足特定的部门或者用户的需求,按照多维的方式进行存储,包括定义维度、需要计算的指标、维度的层次等,生成面向决策分析需求的数据立方体。

数据仓库

数据集市

数据来源

OLTP系统、外部结构化数据

数据仓库

范围

企业级

部门级或工作组级

主题

企业主题

部门或特殊分析主题

数据粒度

最细

较粗

历史数据

大量历史数据

适度历史数据

目的

处理海量数据,数据探索

从某个维度快速访问和分析数据

数据湖

Pentaho的CTO James Dixon 在2011年提出了“Data Lake”的概念。在面对大数据挑战时,他声称:不要想着数据的“仓库”概念,想想数据的“湖”概念。数据仓库概念和数据湖概念的重大区别是:数据仓库中数据在进入仓库之前需要是事先归类,以便于未来的分析。这在OLAP时代很常见,但是对于离线分析却没有任何意义,不如把大量的原始数据保存下来,而现在廉价的存储提供了这个可能。

数据仓库

数据湖

数据

OLTP系统、外部结构化数据

来之IoT、网站、APP、社交媒体等采集而来的结构化和非结构化数据

Schema

写入型Schema,数据存储之前需要定义Schema,数据集成之前需要完成大量ETL工作,数据价值需要提前明确

读取型Schema,数据存储之后才需定义Schema,以便提供数据集成,数据价值尚未明确

扩展性

中等开销获得较大的容量扩展

低成本开销获得极大的容量扩展

性价比

较高成本支持快查询

较低成本支持快查询

数据质量

高质量,可作为重要事实依据

原始数据,参差不齐

用户

业务分析师

数据科学家、数据开发人员和业务分析师

分析

批处理、BI报表

机器学习、数据挖掘

优势

高并发、快速响应、高质量数据

无限扩展性、数据存储成本低、支持编程框架

数据中台

数据仓库

数据中台

计算存储

OLAP型数据库构建的存储体系

混合架构,随需搭配,满足各类数据计算

计算体系

传统ETL开发和报表开发为主

数仓建设、数据开发IDE、任务调度、数据集成、数据治理、统一数据服务、数据资产管理、元数据管理、流批计算、敏捷BI报表开发等

应用场景

报表为主

除传统报表,还支持商品推荐、精准推送等非确定性场景,数据服务业务、业务与数据互补形成闭环

价值体现

面向管理层和业务人员的辅助决策

除完成传统业务人员辅助决策,还能面向业务系统推动优化升级、数据变现等,把数据资产变成数据服务能力

典型数仓产品

重点介绍AnalyticDB和Teradata,其他商业/开源产品如Snowflake、Redshift、Greenplum、TiDB等可Google一下。

AnalyticDB PostgreSQL

云原生数据仓库AnalyticDB PostgreSQL版提供PB级数据实时交互式分析,ETL/ELT,和BI报表展示功能,支持数据高吞吐实时写入与批量导入,提供ACID保证和标准事务隔离级别,采用MPP全并行架构,是一款具有高性价比的云原生数仓产品,提供基于阿里云生态的公共云和混合云服务。

AnalyticDB PostgreSQL支持JDBC/ODBC连接,支持SQL 2003语法标准,兼容PostgreSQL,Greenplum,和部分Oracle语法。同时提供PL/pgSQL存储过程,以及Java/Python UDF。另外在SQL基础上,支持Apache MADLib机器学习,PostGIS地理位置分析,以及JSON/JSONB半结构化数据,图片音频等非结构化数据与结构化数据融合分析功能。

在部署形态层面,AnalyticDB PostgreSQL提供阿里云公共云服务,按量付费,支持垂直升降配和水平扩容,另外支持存储容量独立在线扩容;提供阿里云企业版和敏捷版DBStack混合云部署形态,同时支持x86和Arm平台。

在第三方认证层面,AnalyticDB PostgreSQL先后通过了“国际数据库TPC官方TPC-H 30TB认证”(性价比综合排名第一),信通院“分布式事务型数据库基础能力评测”(TPC-C)和“分布式分析型数据库大规模性能认证”(640节点 TPC-DS 100TB)。

详细见参考资料【3】。

Teradata

Teradata天睿公司(纽交所代码:TDC),是美国前十大上市软件公司之一。经过逾30 年的发展,Teradata天睿公司已经成为全球最大的专注于大数据分析、数据仓库和整合营销管理解决方案的供应商。

Teradata天睿公司的主要产品和解决方案包括:企业级数据仓库;动态数据仓库和动态企业智能™。主要的软硬件产品包括:Teradata 数据库软件;Teradata 专用平台系列;Teradata逻辑数据模型和Teradata 分析应用程序和服务。Teradata数据仓库配备性能最高、最可靠的大规模并行处理 (MPP) 平台,能够高速处理海量数据。它使得企业可以专注于业务,无需花费大量精力管理技术,因而可以更加快速地做出明智的决策,实现 ROI 最大化。

自成立伊始,Teradata 天睿公司迅速发展为全球领先的大数据分析和数据仓库解决方案厂商,赢得了超过2,000家客户的信任,在多个行业表现卓越,居于领导者地位:

  • 全球领先商业和储蓄银行 85%

  • 全球领先通信运营商 90%

  • 全球领先航空公司100%

  • 全球领先旅游及交通公司 65%

  • 全球领先零售商 75%

  • 全球领先医疗保健供应商55%

  • 全球领先制造业 50%

参考资料

【1】https://blog.csdn.net/weixin_39032019/article/details/107386931

【2】https://blog.csdn.net/mnbvxiaoxin/article/details/105659391

【3】https://help.aliyun.com/document_detail/211072.html

【4】https://www.teradata.com/

【5】https://baike.baidu.com/item/Teradata/1792590?fr=aladdin

【6】https://www.toutiao.com/i6998120156975186443

传统数仓局限性及迁移挑战

数据仓库是数据管理市场上一个方兴未艾的领域,有着良好的发展前景,数据仓库的应用随着现代社会商业模式的变革而进一步普及和深入。近年来,一场革命正在悄悄地改变着产品制造和提供服务的方式,它就是数字化定制经济模式,而定制化的需求则依托于大量数据的采集和分析,根据每个行业、每个个体的特性来针对性的提供服务。未来,数据爆炸将会是常态,数据如何有效存储,如何快速挖掘数据价值,如何确保数据安全等等,都是数据仓库从业者(业务侧和技术侧)需要解决的问题。

传统数仓应用的局限性

商业数据库起步于二十世纪八十年代,主要代表为Oracle,SQL Server,DB2等结构化数据在线处理的关系型数据库,而以MySQL,PostgreSQL为代表的开源关系型数据库也在二十世纪九十年代得到了发展。

目前国内的企业级数仓环境,主要以Teradata和Oracle为主,已经过十几年的磨练,虽然这两款产品都非常稳定,但是同样存在着一些致命的缺陷,无法满足市场的需求。

Teradata数仓

以XX证券企业为例,当前在用Teradata数据仓库平台,自上线以来,在多条业务线发挥着重要支撑作用。然而,历经近10年的服务器,该平台已积累了诸多问题:

  • 平台面临硬件老化、资源耗尽、难以升级诸多亟待解决的风险隐患。

  • 业务发展进入关键期,Teradata数据仓库已难以满足业务需求。

  • 金融科技发展潮流与金融机构数字化转型的内在要求,促使金融机构必须搭建新的数据基础设施--数据中台,以数字化赋能业务发展。

  • 中美关系与国际形式变化,迫使国内IT基础设施,特别是金融行业重要IT系统,尽快实现国产自主可控。

Oracle数仓

以XX银行为例,从初建数据仓库的Oracle 10g,到后期的Oracle 11g及12C版本。始终是在Oracle的数据库上支撑着每日的报表需求。但是近几年用户频繁提出以下几个问题:

  • 当运算的数据量非常大时,Oracle上运行的SQL语句执行异常缓慢,往往需要经过几个小时的等待才能跑出结果。每日凌晨的跑批任务还必须要有值守人员进行看护,一旦跑批出错,则直接影响第二天的业务需求。

  • Oracle环境支持的数据类型有限,很多类型的数据在Oracle数据库中没法存储,需要手工转化。

  • Oracle产品的成本较高,不仅需要付出高额的费用来购买产品,在日后的维护费用也是非常巨大。

  • Oracle数据库与其他数据库环境在进行数据交互的过程中,显得非常的无力。

数仓应用发展趋势

经过近30多年的发展,企业级数仓都有了不同程度的发展,积淀了大量的业务数据。同时随着多维度的业务发展转变,数仓应用将面临如下的发展趋势。

  • 数据层面:数据规模不断突破,非结构化信息持续增长

这些年,每个行业的业务数据均呈指数增长,小则TB大则EB级的数据存储量普遍存在,业务数据如字符串、数字、文本等基本类型逐渐扩展,智能感知设备生产的数据如图像、声音、视频等非结构化数据成为分析重要源,导致我们使用传统的数据存储和处理技术已经很难满足现实需要。我们需要采用更为先进的数据库产品、更多的技术手段来存储和管理这些数据。

  • 业务层面:离在线快速响应,实时交互成为常态

数据规模的增长导致传统T+1的处理模式已经很难在规定时间内得到响应,而对T+0、ad-hoc、离在线ETL等多维度交互式分析的诉求越来越强烈。我们需要采用新型的数据存储计算架构、更高效的计算与服务化技术来对数据进行处理。

  • 架构层面:数据库与大数据加速融合,云原生将成为必然

在过去的几十年里,企业的业务数据都是放置在运营商的数据中心或自建机房,场地诸多因素在一定程度上制约着业务系统后期的迭代优化和扩容。近几年企业内部基于数据库与大数据的烟囱式架构解决业务问题是基本事实,业务复杂性催生架构复杂性问题期待梳理和解决。基于云原生的数据库与大数据一体化技术能很好解决存储计算弹性问题,烟囱式架构问题。

数仓架构升级路径

从业内主流方案来看,传统数仓迁移的技术选型方向通常有两种:一是从数据库到数据库(简称 DW on MPP),二是从数据库到大数据平台(简称 DW on Hadoop)。

  • 数据库到数据库(DW on MPP)

DW on MPP 方案因为两套系统都是数据库(并且基本上都是MPP数据库),所以虽然产品不同,但技术路线差异不大,源平台和目标平台的兼容性问题相对较少,技术上的风险也就相对较小。而且很多时候,为了技术可控的目标,大多数都是选择从成熟MPP产品迁移到新兴 MPP 产品,当系统体量上去之后,仍然存在未知的技术风险。此方案在金融证券行业比较受欢迎,风险可控。

  • 数据库到大数据(DW on Hadoop)

DW on Hadoop 方案因为涉及到源平台和目标平台的差异较大,兼容性问题会比较多,技术风险也是相对较大的。但是 DW on Hadoop 方案本身在互联网行业,无论是数据规模还是技术架构,都已经被证明是经得起考验并且非常成熟的架构,技术的前瞻性、可扩展性和灵活度都要优于 DW on MPP 方案。只是金融企业的业务复杂性和历史包袱要远高于互联网行业,而金融行业在大数据技术能力方面的储备又要弱于互联网企业,所以即使知道目标在哪里,但是出于对整个迁移过程的风险没有掌控力和信心,所以此方案在金融企业一直未有成功落地的实践。

数仓架构升级挑战

技术方面

  • 功能兼容性。市面上众多的OLAP产品,无法准确清晰的筛选出哪一款产品可以完美地替代当前的数仓环境。

  • 改造方案全面性。由旧老的数仓环境,改造为新型的数仓环境,涉及的环节非常多,稍有遗漏可能就会面临改造失败。

  • 迁移实施复杂度。历史沉淀数据太过庞大,同时老旧数仓环境中存储了各种类型数据,在迁移过程中需要由多个厂家共同合作,整个迁移过程会变得非常复杂。

成本方面

  • 评估改造成本。由于摈弃老旧的集中部署技术,采用新型的分布式数据库技术,改造评估成本无法准确估算。

  • 评估应用改造周期。老旧的数仓环境,已经经过了30多年的数据沉淀,数据迁移速度、应用改造难度目前无法有效评估,势必造成数仓环境实际改造周期与评估周期存在的差距性。

运维方面

  • 数据安全监管。未来数据监管会变得空前严格,多场景的运维需求也会日益突出。

  • 开发人员技能。为了无缝衔接新型分布式数仓环境,应用开发厂商必须对现有的应用进行必要调整,开发人员在这方面的能力是否能够胜任,也决定着数仓改造的成败。

  • DB管理技能。新型分布式数仓环境的DB运维能力的提升,则是对当前运维人员的又一技能挑战。

  • 云资源管理能力。新型的分布式数仓技术依附于先进的云计算平台,日常的运维工作不仅要有业务系统、数据库方面的能力,同时也要有云计算方面的技能。

参考资料

【1】http://www.cnki.com.cn/Article/CJFDTotal-WDNJ199909010.htm

【2】https://blog.51cto.com/u_15127572/2723902

【3】https://blog.csdn.net/weixin_39074599/article/details/103403826

【4】https://baijiahao.baidu.com/s?id=1672733779170088633&wfr=spider&for=pc

【5】https://www.sohu.com/a/431749281_315839

【6】https://wenku.baidu.com/view/c752d435f8b069dc5022aaea998fcc22bdd1435a.html

数仓迁移规划与实施方案

通常企业级数据仓库尤其金融证券行业数仓历经10多年的建设,已经形成了一个具有数百TB级以上或PB级数据量、数十万张库表、数万脚本的巨大复杂系统。迁移如此系统,难度不亚于飞机在空中更换发动机。本章节将全面讲述数仓迁移规划和实施方案。本章以宏源证券去Teradata迁移至ADB PG为真实实践案例。

数仓迁移总体规划

在技术方案上,需要对每个技术关键点都能考虑周全,深入探索每个技术细节并进行充分的论证和测试;在迁移方法上,需要科学完善的实施方法论,充分考虑迁移项目的工程特点和平滑过渡目标,把迁移风险做到可识别、可分析、可预测、可防范;在实施资源上,不仅需要团队对于新技术具有前瞻性认知和把控能力,更需要对原有数仓体系的盘根错节有深入了解,能够在风险发生时从技术、方案、业务等不同层面提出应对方案,及时化解风险。

Teradata数仓迁移方法总结归纳为“五步十阶”法,迁移项目为保证平滑过渡、风险可控等目标,总体会倾向平迁策略,即不改架构,不动流程,尽力兼容。“五步十阶”内容如下图示:

image.png

迁移规划和实施方案基于此图展开描述。

业务调研

业务调研阶段需对原系统上下游做详实调研,调研内容包括但不限于:

  • 原数仓系统架构

  • 原数仓网络架构

  • 原数仓数据交互流程

  • 原系统资源盘点

  • 原数仓库表统计

最后迭代输出调研分析报告,并与业务方做深入讨论与修正。

原数仓系统架构示意

image.png

图示为比较典型的证券企业数仓架构:上游数据依赖采集程序生成数据文件,通过Teradata的FSLoad加载入库;下游系统不直接访问Teradata数仓,通过前置环境来过渡;数仓内部分层建模,ETL任务通过Automation调度工具集成。这种架构主要好处是管控能力强,体现在安全可控、性能可控、并发可控。

原数仓网络架构示意

image.png

原数仓数据交互流程示意

图片 1.png

原系统资源盘点

类别

内容

数仓体量

生产库多少TB,其中压缩数据多少TB(压缩比多少)

多少张表,其中分区表多少张,多少索引

每天新增数据量,涉及多少张表更新

ETL脚本/作业

多少个作业,多少个脚本

每层脚本数多少,每层脚本代码量

脚本代码有什么显著特征,比如update/delete/insert,多表join等

脚本涉及的UDF等

调度

调度工具,版本/功能/性能,优缺点,新诉求等

周边系统

上游源系统情况

下游业务系统情况

第三方ETL工具等

定制化的程序/接口等

...

...

原数仓库表统计

TD库名

表名

表类型

大小(G)

更新频率

是否迁移

ODS_DATA

ODS_EMS_PUB

拉链

1100.5

ODS_DATA

ODS_MMS_BBS_T

切片

253.0

...

...

...

...

...

...

原数仓ETL任务/脚本统计

ETL_SYS

作业名称

步骤

执行命令

DSQL脚本

是否迁移

ADM

COLLECTTABLESTATE_HY

1

Dsql -c /etc/LOGON_HYDBA.env -f

/APP/ADM/CollectTableState_HY.dsql

ADM

REPLACE_DESEN_VIEW

0

Dsql -c /etc/LOGON_DBA.env -f

/APP/ADM/adm_replace_desen_view0010.dsql

...

...

...

...

...

...

原数仓用户/权限统计

用户名

库名

通用权限

个性化表/视图

权限

是否迁移

etl_user1

etl_data

etl_ddl

sdata_ddl

sdata

读/写

pdm_ddl

pdm_data

table1

禁读

etl_user2

etl_log

etl_data

etl_ddl

读/写

...

...

...

...

方案设计

该阶段需多方参与共创,设计并编制可落地的执行方案,提请业务方审议。内容包括但不限于:

  • 系统架构

  • 规划设计

  • 迁移方案

  • 并行方案

  • 容灾方案

  • 运维保障

  • 组织保障

  • 实施计划

系统架构

描述内容包括:

  • 新系统架构

  • 网络架构与配置

  • 硬件配置

  • ADB PG组件配置

新系统架构示意图:

image.png

ADB PG组件部署逻辑示意图:

image.png

规划设计

规划类别

规划内容

交付规划

测试环境交付

生产环境交付

场景设计

归纳总结应用场景,应用场景对应的业务算法,如:

  1. 全删全插算法,每次表的数据全部删除再从上游获取数据全部重新插入,不需要保留历史记录;

  2. 拉链算法,表上增加开始时间和结束时间,按天记录数据的变化情况,变化的时候才会有开链和关链;

  3. 切片算法,每隔一段时间增加一个分区切片的数据;

  4. 更新算法,永远保留最新的全量,即使上游系统中数据可能会删除。

ADB实例规格设计

Master/Segment规格

实例节点数

Master/Segment部署方式(独立/混合)

库表设计

生产库/历史库表命名规则

表存储方式(行存/列存),压缩方式,压缩比

分布/分区表设计,分布键选择规则,分区键划分依据等

用户与权限设计

ADB用户与权限与原Teradata映射关系

数仓数据交互设计

新增数据采集入库,流程/技术实现路径

数仓接口集市层数据共享输出,流程/技术实现路径

上下游系统交互设计

上下游系统盘点

上下游系统实时/离线数据交互逻辑等

迁移方案

类别

内容

数据集成

通过定制程序或成熟平台从源系统抽取数据,通过ADB数据装载程序或其他,将数据批量转入ADB。这里涉及到程序改造与适配,脚本改造与适配等

模型迁移

DDL迁移及验证

视图迁移及验证

DSQL迁移及验证

UDF适配及验证

作业迁移

老数仓作业任务中有效任务梳理

新、老环境作业任务的迁移和验证

新调度配置管理和监控功能验证

数据迁移

静态历史数据迁移

动态数据迁移

数据一致性验证

业务验证

辅底基础数据迁移完成后,进入新、老并行业务验证环节,每天对新、老数仓处理结果进行验证,需要制定验证方案

外围适配

在源数据经过集成、处理加工后,开始向下游各类业务提供数据,需要适配哪些外围工具,如何适配等

需求变更

新数仓迁移至完成期间,原数仓TD会持续不间断的承接新需求或进行需求变更的操作,为实现新、老数仓在迁移过程中的对新增或变更需求的统一管控,设计需求变更方案和实现路径

并行方案

新老数仓并行运行期间,梳理清楚新老系统是否存在交互,变更同步如何执行,业务如何切换,性能如何保障,稳定性如何保证等等。

容灾方案

类别

内容

容灾机制

常见故障场景的定义(如分为实例级、服务器故障、机房级故障),故障对应的服务等级指标要求

故障应对策略与流程

容灾切换

不同级别的故障,如何触发自动主备切换

如果产生不一致,如何通过手工修复完成一致性追平

应急机制

涉及技术、业务、组织、及迁移过程中需求变更等的风险应对方案

运维保障

描述清楚运维期和运维期满后的运维模式,运维内容,故障处理机制和流程等。

如日常运维采用“一线+二线”联合运维方式,即由一线保障人员负责监控,第一时间响应故障,然后将故障信息发给二线运维人员(相关系统负责人)处置。

image.png

组织保障

角色

参与方

职责

项目领导组

甲方

负责项目整体方向把控和重大事项决策,负责确认项目总体目标,对项目实施中的重大问题做出决策,协调各单位之间的关系

阿里

项目管理组

甲方

负责日常项目沟通、风险、时间管理等工作

阿里

ISV

技术经理

阿里

负责项目产品技术需求管理和技术支持

数据库专家

阿里

负责数据迁移工具、模型迁移工具、调度迁移工具的调优、产品使用过程中问题处理、技术培训等工作

数据架构师

阿里

负责迁移方案的制定,包括数据和模型迁移、调度工具对接、数据校验、容灾架构设计、上下游应用切换方案等工作

业务架构师

甲方

负责业务输入,协助数据架构师制定迁移方案

研发工程师

ISV

负责具体迁移工作的实施

质量工程师

ISV

负责测试,数据质量检验工作

实施计划

描述清楚关键任务及关键里程碑,里程碑计划如下图示:

图片 1.png

以1年为单位计,推荐时间分配比例1:2:3:3:2:1。

更为详细的实施计划依据里程碑计划来导出,实施过程中做适当调整。

数仓迁移实施

前面方案设计主要讲原则、策略、方法、实现路径,实施方案基本会按照以上的方案设计内容去执行,会更聚焦执行细节,更注重变化和灵活性,更强调项目管理和协调。迁移实施主流程为调研 -> 设计 -> 典型场景验证 -> 测试环境迁移 -> 生产环境迁移 -> 系统并行 -> 项目验收。

案例:宏源证券数仓迁移

案例背景

宏源证券目前在用的Teradata数据仓库平台,自上线以来,在多个业务条线发挥着重要支撑作用。然而,历经近10年的服务期,该平台已积累了诸多问题,一方面,面临着硬件老化、资源耗尽、难以升级等诸多亟待解决的风险隐患;另一方面,随着公司整合逐步完成,业务发展也进入了关键期,TD数据仓库已难以满足业务需求。同时,金融科技发展潮流与金融机构数字化转型的内在要求,促使金融机构必须搭建新的数据基础设施——数据中台,以数字化赋能业务发展。再者,中美关系与国际形势变化,迫使国内IT基础设施,特别是金融行业重要IT系统,尽快实现国产自主可控。

业务价值

1.替换京沪两地Teradata数据仓库,实现100多套上游业务源系统、约30套下游系统、25000多个任务、800多个服务接口、100多TB数据的平稳迁移,保障现有业务平稳有序运转,且确保整体性能提升30%以上。

2.逐步实现数据中台底座国产化,满足信创要求。

3.以新数据仓库作为数据中台底座的重要组成部分,支持未来数据中台规划和建设需要。

image.png

数仓迁移配套工具及使用说明

模型迁移工具

ADAM是一款将IT系统轻松的从原有的运行环境迁移上云的产品,在把传统IT架构改造成互联网架构方面(比如把Oracle数据库迁移到云数据库PolarDB)积累了多年的成功经验。目前ADAM支持的源库有Oracle 10g / 11g / 12c版本,Teradata 13 / 14 / 15版本,DB2_LUW版本。

ADAM推出数据库平滑迁云解决方案,覆盖数据库迁移的全生命周期,包括数据库与应用评估(兼容性、关联关系、性能、风险点、应用改造点)、转换(转换不兼容点、引擎特征优化转换)、结构迁移、数据迁移、一致性校验、SQL仿真回放、割接、优化。ADAM可以将源数据库的迁移成本和周期缩短到原来的1/10甚至更低。同时支持改造迁移方案,结合云端丰富的数据平台产品特性以及用户业务特性给出数据库和应用改造方案,助力企业的源数据库迁移上云。

ADAM功能说明可访问参考资料【3】,ADAM具体使用说明可访问参考资料【4】。

数据迁移工具

除DTS数据迁移工具外,ADB PG对外提供黑屏化的数据迁移工具,工具经过Teradata数仓升级改造项目实践验证,成熟度颇高。

ADB PG数据迁移工具会远程连接Teradata源端与ADB PG目的端,从Teradata源端导出数据到管道,再将管道中的数据写入到外部表,然后通过gpfdist以外部表的方式导入到ADB PG中,主体流程:

  1. 在ADB PG创建目标表的空表;

  2. 在ETL服务器中启动GPFDIST服务;

  3. Teradata源数据库通过BTEQ或者FASTEXPORT将数据导出到ETL服务器的管道文件中;

  4. 为ADB PG中的目标表创建对应的可读外部表,外部表使用GPFDIST协议,并指定ECS中的管道文件作为填充外部表的URI,然后通过 insert into table select * from 外部表启动ADB PG的同步;

  5. GPFDIST将管道文件的数据同步到外部表。

迁移工具主要功能:

  • 支持基本数据类型;

  • 支持单表、多表数据迁移;

  • 可选并发度;

  • 全/增量数据迁移;

  • 可选数据量校验;

  • 可选覆盖目标表。

安装与卸载

可以从参考资料【1】下载最新RPM进行安装,默认安装路径为/data/adbpg-migrant

# install
rpm -ivh t-adbpg-td-<...>.rpm

# uninstall
rpm -e t-adbpg-td

工具依赖包安装

# 安装perl依赖包
cd /data/adbpg-migrant/dependency
sh install.sh

# 安装psql用以脚本远程登录,安装gpfdist用于通过外表导入
# 依赖包 t-olap-adbpg-***.rpm 通过参考资料【7】下载
yum install t-olap-adbpg-***.rpm

配置数据库连接

# 1、生成gpconf文件(格式:host:port:dbname:user:password)
192.168.x.xx:5432:postgres:adbpgadmin:123

# 2、生成tdconf文件(格式:host:port:user:password)
39.105.xxx.xx:1025:dbc:dbc

# 3、对原始文件进行加密转换,通过命令:
echo cebgpdb | gpg --batch --yes --passphrase-fd 0 -o gplogon_encrypt -c gpconf
echo cebgpdb | gpg --batch --yes --passphrase-fd 0 -o tdlogon_encrypt -c tdconf

# 4、将加密文件gplogon_ecrypt和tdlogon_ecrypt拷贝至/data/adbpg-migrant/etc目录

设置环境变量

编写source.td文件:

export T2G_AUTO_HOME=/data/adbpg-migrant
export GPFDIST_IP=192.168.0.1
export ISTRUNCATE=true
export ISCHECKNUM=true
export USE_TD_TEMP=true
export BATCH_NUM=10
export ISDOWNLOAD=false
# DOWNLOADMODE = { 0: export && import, 1: only export, 2: only import }
export DOWNLOADMODE=0

环境变量说明:

字段名

说明

T2G_AUTO_HOME

迁移工具目录

ISTRUNCATE

迁移前是否清空目标表

ISCHECKNUM

是否校验数量

USE_TD_TEMP

是否使用TD TEMP创建LOGTABLE

BATCH_NUM

并发度

ISDOWNLOAD

是否支持数据下盘存储

DOWNLOADMODE

ISDOWNLOAD=true有效

0: 先导出再导入,落盘数据文件最后会删除, 1: 只导出, 2: 只导入

编辑迁移配置文件

编写copy.conf文件(单表串行模式):

SOURCE_DB=DBTD1
SOURCE_TABLE=test1,test2
SOURCE_WHERE= 1=1
TARGET_DB=swdb
TARGET_SCHEMA=public
TARGET_TABLE=test1,test2
DELIMITER=|@|
TABLE_COUNT=2
GP_LOGONFILE=gplogon_encrypt
TD_LOGONFILE=tdlogon_encrypt
REJECT_LIMIT=1
GP_EXTERNAL_MAX_SEGS=88
SpecialForNull=???
ENCODE=GBK
ENCODE_ERROR_IGNORE=f

编写copy.conf文件(多表并行模式):

SOURCE_DB=DBTD1
TARGET_DB=swdb
TARGET_SCHEMA=public
DELIMITER=|@|
GP_LOGONFILE=gplogon_encrypt
TD_LOGONFILE=tdlogon_encrypt
REJECT_LIMIT=1
GP_EXTERNAL_MAX_SEGS=88
SpecialForNull=???
ENCODE=GBK
ENCODE_ERROR_IGNORE=f

参数说明:

字段名

说明

SOURCE_DB

源端DB

SOURCE_TABLE

源端同一SOURCE_DB下的多张表名,用 ',' 作为分隔符

TARGET_DB

目的端DB

TARGET_SCHEMA

目的端SCHEMA

TARGET_TABLE

目的端同一TARGET_SCHEMA下的多张表名,用 ',' 作为分隔符

SOURCE_WHERE

where过滤条件

TABLE_COUNT

表数量

DELIMITER

分隔符

GP_LOGONFILE

GP连接加密文件

TD_LOGONFILE

TD连接加密文件

REJECT_LIMIT

错误数据的条数,超过设置值会报错。最小值是2。用来确保数据的完整性

GP_EXTERNAL_MAX_SEGS

配置数等同与实例segment节点数

SpecialForNull

转换NULL时的特殊字符,需要写在数据中不会出现的特殊字符串

ENCODE

编码方式

ENCODE_ERROR_IGNORE

是否支持乱码入库(t=可以,f=不可以)

ADB库创建UDF

CREATE OR REPLACE FUNCTION get_tuple_count
(
  p_schema character varying, 
  p_table character varying
)
RETURNS bigint AS
$BODY$
DECLARE
  v_rowcount bigint;
  v_full_tablename varchar := $1 ||'.'||$2 ;
  v_get_cnt_sql varchar;
BEGIN
--Partitioned tables
IF EXISTS (select 1 from pg_class a,pg_namespace b  where b.nspname=lower($1) and a.relname=lower($2) and a.relnamespace = b.oid and a.relkind='r' and a.relstorage='h' ) THEN
    v_get_cnt_sql := 'SELECT count(*) from '||v_full_tablename;
ELSE
    v_get_cnt_sql := 'select sum(total_tupcount)-sum(hidden_tupcount) from gp_toolkit.__gp_aovisimap_compaction_info('||quote_literal(v_full_tablename)||'::regclass)';
END IF;
  raise info '%',v_get_cnt_sql;
  EXECUTE v_get_cnt_sql INTO v_rowcount;
  RETURN v_rowcount;
END;
$BODY$
LANGUAGE 'plpgsql';

UDF用于count校验。

执行迁移命令

  • 单表或少量表迁移命令:

source /u01/adbpg/greenplum_path.sh
source ./source.td
perl /data/adbpg-migrant/bin/copy_main.pl -copy_batch_conf ./copy.conf  -automation_conf ./data/adbpg-migrant/conf/automation.conf
  • 多表并行迁移命令:

source /u01/adbpg/greenplum_path.sh
source ./source.td
perl /data/adbpg-migrant/bin/start_batch.pl -dump_conf ./tables.conf -file_conf ./copy.conf

这里的tables.conf是迁移多表的配置文件(格式:表名=where过滤条件):

test1= 1=1
test2= 1=1
test2= 1=1

更详细操作说明可参考/data/adbpg-migrant/dependency/README.md

数据比对工具

数据比对工具可用于全量和增量数据比对,该工具伴生于数据迁移工具,默认安装路径/data/adbpg-migrant/validator,安装依赖JDK8。

支持功能:

  • 单表抽样比对(count/sum/抽样全字段)

  • 多表批量比对(count/sum)

单表抽样全字段比对

执行命令:

## table validator
cd /data/adbpg-migrant/validator
java -jar -Xmx4g ./adbpg-td-validator.jar ./adbpg-td.properties >./td2gp.log 2>&1 &

adbpg-td.properties配置文件样本:

# teradata
tera.logonPath=../etc/tdlogon_encrypt
tera.db=DBTD1
tera.tableName=testt
tera.whereClause=1=1

# gp
gp.logonPath=../etc/gplogon_encrypt
gp.db=swdb
gp.schema=public
gp.tableName=testt
gp.whereClause=1=1

# table validate
# unsupported teradata types: GRAPHIC/VARGRAPHIC
# 1.count(); 2.sum(); 3.sample validate (sampleCount <= 1000000, orderBy columns <= 3)
common.sampleCount=10000
common.orderBy=id00 asc
common.notInFields=('ctid')
common.decimalPrecision=7
common.varcharLength=1024

多表COUNT/SUM比对

执行命令:

# batch table validator
cd /data/adbpg-migrant/validator
java -jar -Xmx4g ./adbpg-td-validator.jar ./adbpg-td-batch.properties >./td2gp_batch.log 2>&1 &

adbpg-td-batch.properties配置文件样本:

# teradata
tera.logonPath=../etc/tdlogon_encrypt
tera.db=DBTD1

# gp
gp.logonPath=../etc/gplogon_encrypt
gp.db=swdb
gp.schema=public

# batchMode
# 1.count(); 2.sum()
common.notInFields=('ctid')
# tables.conf与数据迁移工具多表并行迁移配置一致
common.dumpConf=../conf/tables.conf
common.decimalPrecision=7
common.varcharLength=1024

周边支撑工具

DBeaver

DBeaver 是一个基于 Java 开发,免费开源的通用数据库管理和开发工具,可以通过官方网站下载。

DBeaver 通过 JDBC 连接到数据库,可以支持几乎所有的数据库产品,包括:MySQL、PostgreSQL、MariaDB、SQLite、Oracle、Db2、SQL Server、Sybase、MS Access、Teradata、Firebird、Derby、Greenplum 等等。

DBeaver与ADB PG交互可通过配置Greenplum连接实现,参考下图示:

image.png
  1. 运行DBeaver,在上方菜单栏中单击数据库 > 新建连接。

  2. 在创建新建连接窗口,选择Greenplum数据库类型,单击下一步(N)。​如果是首次连接则需要下载对应数据库驱动文件。

  3. 在连接设置窗口的常规页签,输入以下参数:

说明

示例

主机

AnalyticDB PostgreSQL版的连接地址和端口。

gp-bp1g*************-master.gpdbmaster.rds.aliyuncs.com

端口

5432

数据库

需要管理的数据库。

postgres

用户名

AnalyticDB PostgreSQL版的数据库账号。

testuser

密码

AnalyticDB PostgreSQL版数据库账号对应的密码。

PassW0rd

  1. 单击测试链接(T),在返回的窗口中提示已连接即可。如果测试链接时遇到了org.postgresql.Driver报错,可以尝试下载/更新Greenplum驱动的库。

  2. 单击完成。

Sqoop

Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL,Oracle,PostgreSQL等)中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

下图展示Sqoop与AanlyticDB PostgreSQL之间的工作流:

image.png
  • Sqoop Import

import(导入)工具将各个表从ADB PG导入到HDFS。在HDFS中,表中的每一行都被视为一条记录。所有记录都存储为文本文件中的文本数据或Avro和Sequence文件中的二进制数据。语法参考:

$ sqoop import (generic-args) (import-args) 
$ sqoop-import (generic-args) (import-args)
  • Sqoop Export

export(导出)工具将一组数据文件从HDFS导出回ADB PG。作为Sqoop输入的文件,它们含有记录,这些记录在表中称为行。这些记录被读取并解析为一组记录,并用用户指定的分隔符进行分隔。

$ sqoop export (generic-args) (export-args)  
$ sqoop-export (generic-args) (export-args)

Sqoop与ADB PG进行交互依赖PostgreSQL JDBC驱动,相关驱动(版本推荐42.2.XX)可访问参考资料【5】,下载的PostgreSQL JDBC驱动包放在sqoop/lib目录下。

  • 显示ADB PG所有表,示例命令:

$ sqoop list-tables \
--connect jdbc:postgresql://<pi>:<port>/<database> \
--username <user> \
--password <password>
  • ADB PG到HDFS,示例命令:

#全库导入
$ sqoop import-all-tables \
--connect jdbc:postgresql://<pi>:<port>/<database> \
--username <user> \
--password <password>


# 全表导入
$ sqoop import \
--connect jdbc:postgresql://<pi>:<port>/<database> \
--username <user> \
--password <password> \
--table <table name> \
--where "1=1" \
--target-dir <hdfs path>


# 增量导入
$ sqoop import \
--connect jdbc:postgresql://<pi>:<port>/<database> \
--username <user> \
--password <password> \
--table <table name> \
--incremental append \
--check-column <id> \
--last-value <value>
  • HDFS到ADB PG,示例命令:

# 数据导出
$ sqoop export \
--connect jdbc:postgresql://<pi>:<port>/<database> \
--username <user> \
--password <password> \
--table <table name> \
--export-dir <hdfs path>

Kettle

Kettle(现称为Pentaho Data Integration,简称PDI)是一款非常受欢迎的开源ETL工具,主要用于数据整合、转换和迁移。Kettle除了支持各种关系型数据库、HBase、MongoDB这样的NoSQL数据源外,它还支持Excel、Access这类小型的数据源,并且通过这些插件扩展。

下图显示了Kettle和ADB PG之间的关系,数据源通过Kettle进行ETL或数据集成操作以后可以和ADB PG进行交互:

2bddd1b99e5117aea6c014c2773c0d7e04768aa8.png

目前,Kettle支持的数据导入到ADB PG的方式有:

导入方式

说明

表输出(INSERT方式)

采用JDBC作为导入方式

支持批量插入,批量插入使用JDBC的batch insert方法

批量加载(COPY方式)

采用COPY作为导入方式

对于大表,COPY方式性能达到批量插入性能的10倍左右

表输出(INSERT方式)导入会流过Master节点并做解析之后分布到对应的Segment节点上,这种方式相对较慢并且不适合导入大量数据。批量加载(COPY方式)导入方式比INSERT语句插入多行的效率更高。

具体使用说明可访问参考资料【2】。

参考资料

【1】http://rpm.alibaba-inc.com/find.php?q=t-adbpg-td

【2】https://developer.aliyun.com/article/701248

【3】https://help.aliyun.com/document_detail/53705.html

【4】https://help.aliyun.com/document_detail/159729.html

【5】https://mvnrepository.com/artifact/org.postgresql/postgresql

【6】https://dbeaver.io/

【7】http://rpm.alibaba-inc.com/find.php?q=t-olap-adbpg

常见问题

本节以问答的方式对数仓迁移相关的疑惑或痛点做必要解答。

第一问:典型场景验证是否为必要步骤?

典型场景验证是选择有代表性的业务场景(比如证券行业的交易流水),对涉及的ETL作业任务、表/视图结构、模型、数据、用户权限等进行迁移的论证工作,相当于缩小体量和范围的迁移论证。

典型场景验证,推荐实施,理由如下:

  1. 业务方尤其是业务一号位的身家性命都押在阿里云及ADB上,典型场景验证这种短平快的打法周期短容易出成绩,可提振士气增强业务方信心;

  2. 前期调研和方案设计,获取真实信息难免遗漏和不全,可以通过典型场景验证,反向验证调研详实度,论证迁移步骤合理性;

  3. 通过典型场景验证,可以暴露更多技术性问题(如SQL兼容性、SQL复杂度、作业复杂度等),用于更为准确评估迁移工作量;

  4. 测试环境交付通常会延后,进场人员可不用等待测试环境,提前搭建纯软环境开展工作,而存软环境支撑能力有限,可适配典型场景验证负荷但应付全量迁移验证难免捉襟见肘;

  5. 典型场景验证成果,可在测试环境迁移流程复用。

第二问:迁移的具体内容和技术手段?

迁移工作内容可参考上一节“方案设计”之“迁移方案”内容,包含数据集成、模型迁移、作业迁移、数据迁移、业务验证、外围适配、需求变更、用户权限迁移等。

数据集成、业务验证、外围适配偏业务侧,流程、技术等因项目而异,这里略过。

模型迁移、作业迁移、数据迁移等流程、技术参考下图示:

image.png

模型迁移和作业迁移依赖ADAM工具,ADAM工具语法转换覆盖率约80%,转换后的SQL脚本需要人工再修正方可在ADB正常执行。

数据迁移依赖ADB自研的数据迁移工具,支持两种迁移方式,一种不落地通过管道直接迁移,另一种落地成文件再导入到ADB中,数据迁移性能约300GB/h。

用户权限体系在Teradata与ADB PG之间存在差异性,必须做好映射关系梳理和用户权限设计,如下图示:

image.png

基于设计推导出权限相关SQL并在ADB PG执行即可。

第三问:需求变更对进度的影响?

数仓迁移不是静态的迁移,是持续动态的过程。

业务方会不定期变更(新增、修改、删除等)ETL任务/表结构/模型等,而且也会面临紧急变更需求。较大规模的需求变更会涉及上百脚本/模型/表结构调整,这些变更如何识别和同步到已迁移的ADB脚本/模型中且通过一致性验证是迁移项目实施过程中比较头疼的工作(目前主要靠人工);更为要命的是,受业务方非规范变更操作影响,一些“紧急变更”(如直接在线上环境调整脚本/作业任务/调度的动作)没有同步信息,势必会造成每天跑批结果差异性,排查这类问题会非常耗时耗力,如果问题定位不出来还会影响实施团队士气和团队和谐性。

所以,数仓迁移实施过程需要业务方参与进来,业务方参与进来就会有体感,双方共同商讨解决此问题才有基础,总结十六字措施,“同步信息,规范变更,参与适配,技术提效”:

  • 其一,确保变更信息同步;

  • 其二,实施方建议业务方管控/规范需求变更(但业务方不一定采纳或采纳但不一定立即推行),加强项目管理,力争节奏/队形不走样;

  • 其三,针对新增类需求变更(如新增表/模型/脚本/作业任务),业务方可以参与做部分ADB的适配工作;

  • 其四,对于非新增类需求变更,实施方需要开发一类工具来识别差异性,以提高工作效率。

保守估计,需求变更会增加1倍工作量,建议实施团队重点评估这一环节。

第四问:项目管理的挑战?

项目管理很重要,做得好能保障项目成功落地且节省成本,做得不好最坏就是烂尾。

数仓迁移涉及三(或二)方协调及各自内部管理,主要在人。项目规划阶段制定好管理架构、管理机制、协调机制等框架,配置好各方的项目经理(或技术经理),杜绝虚配,确保关键岗位人员全程跟踪项目。

组织架构落实后,执行层面或多或少还会出现各种问题,总结下来如下:

  • 对于阿里方:内部协同是第一挑战,涉及采购、GTS交付、GTS运维、ADB产研等,项目经理要处理好各小集体利益诉求,确保内部各环节顺利推进;延迟交付是第二挑战,采购延期造成GTS交付延期时有发生,产研发版延期大概率存在,项目经理要做好进度调整管理,同步协调ISV及业务方做适当调整;

  • 对于ISV:人员投入不足是第一大问题(成本控制),交付质量不佳是第二大问题(人员素质),建议ISV在利润可控情况下加大投入;

  • 对于业务方:实施方(ISV&ADB产研)把问题抛给业务方就是最大的问题,能协同解决的问题(如实施环境问题)给出专业调整方案,说服业务方来调整,短时不能解决的问题或雷区(业务方已存在的体制机制问题)需要绕道而行。

迁移实施过程难免会发生人员交替,做好人员交接/备份也很重要。

第五问:ADB数仓稳定性如何保证?

稳定性是业务方重点关切,ADB稳定性是评估迁移项目能否上线的第一考核指标,如何保障ADB稳定性?总结一句话“产品质量是基石,内核调配做加固,驻场运维保防线,最佳实践为促进”。

产品质量是基石。ADB产研/测试共同努力,提升内核/管控能力,降低“recovery mode”等概率。

内核调配做加固。内核参数分为Linux内核参数和数据库内核参数,这两部分需在实例生产前/后做相应调优,Linux内核参数涉及sysctl.conf、limits.conf修改,如sysctl.conf的net.ipv4.ip_local_ port_rangenet.ipv4.tcp_tw_reusenet.ipv4.tcp_tw_timeout,limits.conf的* hard nofile* soft nofile* soft nproc* hard nproc,数据库内核参数可通过杜康实例界面来调整,如何调整可参考阿里公共云AnalyticDB PG产品手册相关参数介绍。

驻场运维保防线。产品质量有瑕疵,驻场运维来保障,运维工作是关键一环,一方面要提升自动化运维能力建设,另一方面自动化运维不能覆盖的环节(如任务中断、日常巡检异常)需要人工介入,驻场人员此时就能发挥作用,做到及时止损。

最佳实践为促进。模型脚本难免会发生在Teradata数仓运行顺畅而在ADB数仓运行“慢”的现象,实施方需参考《ADB PG最佳实践指南》定位“慢”原因(数据倾斜、统计信息不准、数据膨胀、执行计划不佳等),对症下药,通过调整分布键、改写“Bad SQL”等等达成性能目标。

附录一:AnalyticDB与Teradata差异与兼容

重点描述ADB PG与Teradata在数据类型、DDL/DML等语法、函数、特殊关键字等差异性及ADB PG在兼容性方面的工作。包含:

  • 数据类型

  • DDL/DML

  • 特殊关键字

  • 系统函数

  • 其他

Teradata语法对大小写不敏感,ADB PG如无特殊处理(如字段加双引号),也对大小写不敏感。

数据类型

Teradata

ADB PG

说明

BIGINT

BIGINT

8byte -> 8byte

INTEGER

INTEGER

4byte -> 4byte

SMALLINT

SMALLINT

2byte -> 2byte

DECIMAL

NUMERIC

TD最大16bytes,precision of 38 -> no limit

NUMERIC

NUMERIC

TD最大16bytes,precision of 38 -> no limit

NUMBER

NUMERIC

TD最大16bytes,precision of 38 -> no limit

BYTEINT

SMALLINT

1byte -> 2byte

DOUBLE PRECISION

DOUBLE PRECISION

8byte -> 8byte

FLOAT

DOUBLE PRECISION

8byte -> 8byte

REAL

DOUBLE PRECISION

8byte -> 8byte

CHAR

CHARACTER

若过长可转成 CHARACTER VARYING/TEXT,若要使用CHARACTER,指定的长度不能小于TD

VARCHAR

CHARACTER VARYING/TEXT

TD最大长度为64000

LONG VARCAHR

CHARACTER VARYING/TEXT

TD最大长度为64000

DATE

DATE

4byte -> 4byte,

需注意TD默认格式为YY/MM/DD,ADB PG默认为YYYY-MM-DD

TIME

TIME WITHOUT TIME ZONE

TIME WITH TIME ZONE

TIME WITH TIME ZONE

TIMESTAMP

TIMESTAMP WITHOUT TIME ZONE

TIMESTAMP WITH TIME ZONE

TIMESTAMP WITH TIME ZONE

INTERVAL Day/Month/Year/Hour/Minute/Second ...

INTERVAL Day/Month/Year/Hour/Minute/Second ...

PERIOD(DATE)

DATERANGE

PERIOD(TIME)

TSRANGE

PERIOD(TIME WITH TIME ZONE)

TSTZRANGE

PERIOD(TIMESTAMP)

TSRANGE

PERIOD(TIMESTAMP WITH TIME ZONE)

TSTZRANGE

BYTE

BYTEA

VARBYTE

BYTEA

BLOB

BYTEA

TD最大长度(2097088000)

CLOB

CHARACTER VARYING/TEXT

TD最大长度(2097088000)

GRAPHIC

CHARACTER

CHAR(1) CHARACTER SET GRAPHIC CASESPECIFIC

VARGRAPHIC

CHARACTER VARYING/TEXT

CHAR(n) CHARACTER SET GRAPHIC CASESPECIFIC

JSON

JSON

DDL兼容性

CREATE TABLE

将Teradata表定义转换为ADB PG表定义,需要注意:

  • 数据类型及约束条件

  • 在ADB PG数据库,主键(PK)可以考虑作为分布键

  • 非标准化的Teradata SQL命令,即Teradata特有的SQL命令

转换语法对照如下表:

Teradata

ADB PG

CREATE <SET/MULTISET> TABLE <Table>

<Create Table Options>

<Column Definitions>

<PI Definitions>

<Partition Definitions>

<Table-level Constraints>

<Index Definitions>;

CREATE TABLE <Table>

<Column Definitions>

<DK Definitions>

<Partition Definitions>

<Table-level Constraints>;

<Index Definitions>;

定义

说明

定义

说明

SET/MULTISET

定义记录是否可重复

无,转换时直接删除

Create Table Options

表选项

定义表的物理属性

Fallback

Journaling

Freespace

无,转换时直接删除

Column Definitions字段定义

定义表的各个字段,可以通过format函数定义数据格式

Column Definitions

(字段定义)

ADB PG不支持通过to_char/to_date函数定义数据格式,默认日期格式为YYYY-MM-DD,即如果字段默认格式为YYYY-MM-DD,那么转换时直接删除即可

Table-level Constraints

表级约束

定义约束

Primary key

Unique

CHECK条件

Foreign key

Table-level Constraints

(表级约束)

ADB PG支持定义主键,但是不推荐;不支持CHECK条件及创建外键

Index Definitions索引定义

定义表索引

Index Definitions

(索引定义)

ADB PG支持,但是和TD类似,需要根据业务要求再次评估是否创建索引

PI Definitions索引定义

主索引

DK Definitions

(分布键定义)

根据Teradata主索引直接修改为分布键即可

Partition Definitions分区

表分区

Partition Definitions

(分区)

TD与ADB PG分区表语法不同,根据ADB PG的语法转换

举个例子,Teradata创建表DDL:

-- 这个Teradata的DDL有三处特殊地方:
-- 1)CHARACTER SET LATIN CASESPECIFIC
-- 2)DATE FORMAT 'YYYYMMDD'
-- 3)INTEGER FORMAT '99:99:99'
CREATE TABLE ON_BOARD_MATCH_EVT
(
      Evt_Id VARCHAR(200) CHARACTER SET LATIN CASESPECIFIC TITLE '编号' NOT NULL,
      Match_Dt DATE FORMAT 'YYYYMMDD' TITLE '成交日期' NOT NULL,
      Match_Tm INTEGER FORMAT '99:99:99' TITLE '成交时间' NOT NULL,
      Order_Dt TIMESTAMP(6) TITLE '委托日期' NOT NULL,
      Cust_Cd VARCHAR(80) CHARACTER SET LATIN CASESPECIFIC TITLE 'A代码' NOT NULL,
      Cust_No VARCHAR(80) CHARACTER SET LATIN CASESPECIFIC TITLE 'A编号' NOT NULL
)
PRIMARY INDEX ( Evt_Id )
PARTITION BY ( 
                        RANGE_N(Match_Dt  BETWEEN 
      DATE '2000-01-01' AND DATE '2013-12-31' EACH INTERVAL '1' YEAR ,
                        DATE '2014-01-01' AND DATE '2015-12-31' EACH INTERVAL '1' MONTH ,
                        DATE '2016-01-01' AND DATE '2030-12-31' EACH INTERVAL '1' DAY , 
      NO RANGE OR UNKNOWN) 
);

转换成ADB PG相关DDL(三处特殊地方无法兼容,需要评估对业务的影响):

CREATE TABLE ON_BOARD_MATCH_EVT
(
    Evt_Id VARCHAR(200) NOT NULL,
    Match_Dt DATE NOT NULL,
    Match_Tm INTEGER NOT NULL,
    Order_Dt TIMESTAMP(6) NOT NULL,
    Cust_Cd VARCHAR(80) NOT NULL,
    Cust_No VARCHAR(80) NOT NULL
) 
DISTRIBUTED BY(Evt_Id)
PARTITION BY RANGE(Match_Dt)
(
    START(DATE '2000-01-01') END(DATE '2013-12-31') INCLUSIVE EVERY(INTERVAL '1' YEAR),
    START(DATE '2014-01-01') END(DATE '2015-12-31') INCLUSIVE EVERY(INTERVAL '1' MONTH),
    START(DATE '2016-01-01') END(DATE '2030-12-31') INCLUSIVE EVERY(INTERVAL '1' DAY),
    DEFAULT PARTITION def__par
);
COMMENT ON COLUMN ON_BOARD_MATCH_EVT.Evt_Id IS '编号';
COMMENT ON COLUMN ON_BOARD_MATCH_EVT.Match_Dt IS '成交日期';
COMMENT ON COLUMN ON_BOARD_MATCH_EVT.Match_Tm IS '成交时间';
COMMENT ON COLUMN ON_BOARD_MATCH_EVT.Order_Dt IS '委托日期';
COMMENT ON COLUMN ON_BOARD_MATCH_EVT.Cust_Cd IS 'A代码';
COMMENT ON COLUMN ON_BOARD_MATCH_EVT.Cust_No IS 'A编号';

CREATE TABLE AS

Teradata

ADB PG

CREATE TABLE <new_table>

AS <old_table> WITH [NO] DATA;

CREATE TABLE <new_table>

(LIKE <old_table>);

DROP TABLE

Teradata与ADB PG删除表的语法一致。

DROP TABLE [IF EXISTS] name [, ...] [CASCADE | RESTRICT]

TEMP TABLE

Teradata支持全局临时表(Global Temporay Table)、可变临时表(Volatile Temporay Table)两种类型,而ADB PG只支持可变临时表(TEMP/TEMPORARY),转换对照规则如下:

  • Teradata的VOLATILE/GLOBAL TEMPORARY 直接转换为 ADB PG 的 TEMP/TEMPORARY;

  • 删除Teradata的NO LOG关键字;

  • ON COMMIT PRESERVE ROWS/ON COMMIT DELETE ROWS 在Teradata 和 ADB PG都适用。

转换示例参考:

Teradata

ADB PG

CREATE VOLATILE TABLE Emp_Vol, NO LOG

(

Emp_Id INTEGER,

Name CHAR(50),

Salary DECIMAL(10,2)

)

ON COMMIT PRESERVE ROWS;

CREATE TEMP TABLE Emp_Vol

(

Emp_Id INTEGER,

Name CHAR(50),

Salary DECIMAL(10,2)

)

ON COMMIT PRESERVE ROWS;

DML兼容性

SELECT

  • 在Teradata中,SELECT关键字可以简写为SEL;而在ADB PG中不支持这种简写;

  • 在Teradata中,字段的别名除了可以通过AS子句指定外,还可以通过NAMED子句指定;而在ADB PG中,不支持NAMED子句;

  • 在Teradata中,子查询不需要指定别名;在ADB PG中子查询需要给予别名;

  • 在Teradata中,字段引用别名后,直接可以引用别名进行其他操作;ADB PG不支持在定义字段别名后,直接通过别名对字段进行其他操作,可以通过嵌套子查询方式进行转换;

Teradata

ADB PG

SEL <COLUMN..>

SELECT <COLUMN..>

EXPRESSION/COLUMN NAMED ALIAS_1

EXPRESSION/COLUMN AS ALIAS_1

SELECT column1

FROM table1

WHERE column2 IN

(SELECT column1

FROM table2 );

SELECT column1

FROM table1

WHERE column2 IN

(SELECT column1

FROM table2 ) as alias_t;

SELECT

sum(column1) as alias1,

alias1/12 as alias2

FROM table1;

SELECT

sum(column1) as alias1,

sum(column1)/12 as alias2

FROM table1;

INSERT

Teradata与ADB PG在INSERT操作语法上没有任何区别,其中Teradata的INSERT-SELECT方式在ADB PG中也支持,无须进行转换。

DELETE

  • Teradata删除所有数据可使用DELETE FORM TABLE,该方式在ADB PG中不推荐使用;ADB PG采用TRUNCATE TABLE的方式来完成对整表的删除;

  • 在不关联其他表,Teradata删除与ADB PG的语法相同,无须转换;

  • 需要关联其他表删除符合条件的记录时,如果使用子查询的方式,则语法相同;如果通过直接连接的方式,则语法不同,需要进行相应的转换,转换语法对照如下:

Teradata

ADB PG

DELETE FROM table_1;

TRUNCATE table_1;

DELETE FROM table_1

WHERE table_1.id IN

(SELECT table_2.id

FROM table_2

WHERE table_2.name = 'Hannah');

DELETE FROM table_1

WHERE table_1.id IN

(SELECT table_2.id

FROM table_2

WHERE table_2.name = 'Hannah');

DELETE FROM table_1

WHERE table_1.id = table_2.id

ADN table_2.name = 'Hannah';

DELETE FROM table_1

USING table_2

WHERE table_1.id = table_2.id

ADN table_2.name = 'Hannah';

UPDATE

  • 在不关联其他表,Teradata与ADB PG的语法相同,无须转换;

  • 关联表进行更新时,Teradata的FROM子句在前,而SET子句在后, 而ADB PG刚好相反;Teradata需要在FROM子句声明更新表及被关联表,ADB PG在UPDATE后面申明更新表,在FROM子句后面申明被关联表,转换语法对照如下:

Teradata

ADB PG

UPDATE t1

[FROM table t1, table t2]

SET column = expression,

,...

,column = expression

[WHERE condition];

UPDATE [ONLY] table [[AS] alias]

SET {column={expression | DEFAULT} |

(column [,...]) = ({expression | DEFAULT} [,...])}[,...]

[FROM fromlist]

[WHERE condition];

示例参考

UPDATE a

FROM wanto_update a , source_data b

SET a.c1 = b.c1, a.c2 = b.c2

WHERE a.bsm = b.bsm;

UPDATE wanto_update a

SET a.c1 = b.c1, a.c2 = b.c2

FROM source_data b

WHERE a.bsm = b.bsm;

特殊关键字

ADB PG的保留关键字和表隐藏字段如果出现在Teradata DDL字段定义中,转换时需要特殊处理。

Key Word

Type

Key Word

Type

ALL

reserved

LATERAL

reserved

ANALYSE

reserved

LEADING

reserved

ANALYZE

reserved

LEFT

reserved

AND

reserved

LIMIT

reserved

ANY

reserved

LOCALTIME

reserved

ARRAY

reserved

LOCALTIMESTAMP

reserved

AS

reserved

LOG

reserved

ASC

reserved

NATURAL

reserved

ASYMMETRIC

reserved

NOT

reserved

BOTH

reserved

NOTNULL

reserved

CASE

reserved

NULL

reserved

CAST

reserved

OFFSET

reserved

CHECK

reserved

ON

reserved

COLLATE

reserved

ONLY

reserved

COLLATION

reserved

OR

reserved

COLUMN

reserved

ORDER

reserved

CONCURRENTLY

reserved

OUTER

reserved

CONSTRAINT

reserved

OVERLAPS

reserved

CREATE

reserved

PARTITION

reserved

CROSS

reserved

PLACING

reserved

CURRENT_CATALOG

reserved

PRECEDING

reserved

CURRENT_DATE

reserved

PRIMARY

reserved

CURRENT_ROLE

reserved

REFERENCES

reserved

CURRENT_SCHEMA

reserved

RETURNING

reserved

CURRENT_TIME

reserved

RIGHT

reserved

CURRENT_TIMESTAMP

reserved

SCATTER

reserved

CURRENT_USER

reserved

SELECT

reserved

DECODE

reserved

SESSION_USER

reserved

DEFAULT

reserved

SIMILAR

reserved

DEFERRABLE

reserved

SOME

reserved

DESC

reserved

SYMMETRIC

reserved

DISTINCT

reserved

TABLE

reserved

DISTRIBUTED

reserved

THEN

reserved

DO

reserved

TO

reserved

ELSE

reserved

TRAILING

reserved

END

reserved

TRUE

reserved

EXCEPT

reserved

UNBOUNDED

reserved

EXCLUDE

reserved

UNION

reserved

FALSE

reserved

UNIQUE

reserved

FETCH

reserved

USER

reserved

FOLLOWING

reserved

USING

reserved

FOR

reserved

VARIADIC

reserved

FOREIGN

reserved

VERBOSE

reserved

FROM

reserved

WHEN

reserved

GRANT

reserved

WHERE

reserved

GROUP

reserved

WINDOW

reserved

HAVING

reserved

WITH

reserved

ILIKE

reserved

GP_SEGMENT_ID

hidden

IN

reserved

TABLEOID

hidden

INITIALLY

reserved

CTID

hidden

INTERSECT

reserved

CMIN

hidden

INTO

reserved

CMAX

hidden

IS

reserved

XMIN

hidden

JOIN

reserved

XMAX

hidden

系统函数

系统函数Teradata与ADB PG差异性较大,ADB PG通过迭代开发tdfunc扩展模块来适配Teradata系统函数。SUPERUSER连接到ADB PG数据库执行SQL:

create extension tdfunc;

下面列举一些高频的差异性函数。

通用函数

Teradata

ADB PG

说明

zeroifnull

tdfunc扩展已兼容

对数据作累计处理时,将空值作零处理

nullifzero

tdfunc扩展已兼容

对数据作累计处理时,忽略零值,tdfunc扩展已兼容

nvl

tdfunc扩展已兼容

字符串或数值为空时用其他值替换,tdfunc扩展已兼容

index

tdfunc扩展已兼容

字符串定位函数,从1开始

char/characters

length

字符串长度

type

pg_typeof

数据类型

add_months

tdfunc扩展已兼容

从某日期增加或减少指定月份的日期

months_between

tdfunc扩展已兼容

计算两个日期之间的月份差(天换算成月)

oreplace

tdfunc扩展已兼容

字符替换

instr

tdfunc扩展已兼容

指定字符串在输入字符串中位置,支持正序逆序

translate_chk

tdfunc扩展已兼容

判断是否中文乱码(0不乱码,1乱码)

trim(numeric)

tdfunc扩展已兼容

字符串去除头尾空字符

random(integer, integer)

tdfunc扩展已兼容

随机函数

regexp_substr

tdfunc扩展已兼容

基于正则表达式匹配字符串

regexp_similar

tdfunc扩展已兼容

基于正则表达式判断字符串是否一致

tdfunc扩展会持续迭代以支持更多TD系统函数。

窗口函数

Teradata

ADB PG

说明

csum

通过子查询方式实现

计算一列的连续的累计的值

mavg

通过子查询方式实现

基于预定的行数(查询宽度)计算一列的移动平均值

msum

通过子查询方式实现

基于预定的查询宽度计算一列的移动汇总值

mdiff

通过子查询方式实现

基于预定的查询宽度计算一列的移动差分值

rank

通过子查询方式实现

排队函数对一列进行排队,可以按照生序或降序排队

qualify

通过其它方式实现

QUALIFY限制排队输出的最终结果

以QUALIFY为例,下面列举几种转换对照关系:

场景

Teradata

ADB PG

使用QUALIFY

SELECT c1, c2 FROM tab1

WHERE …

QUALIFY <Window Function> = ?

SELECT c1, c2 FROM (

SELECT c1, c2 , <Window Function> AS win

FROM tab1 WHERE … ) tab_alias

WHERE win = ?

使用QUALIFY,并指定GROUP BY

SELECT C1, C2 FROM tab1

WHERE ...

GROUP BY C3, C4

QUALIFY RANK(C5 ASC) <= ?

SELECT C1, C2 FROM (

SELECT C1, C2 , RANK() OVER ( PARTITION BY C3, C4 ORDER BY C5 ASC ) AS alias1

FROM tab1 WHERE … ) tab_alias

WHERE alias1 <= ?

使用QUALIFY,并指定窗口函数的ALIAS

SELECT c1, c2 , <Window Function> AS win

FROM tab1

WHERE ...

QUALIFY win > ?

SELECT * FROM (

SELECT c1, c2 , <Window Function> AS win

FROM tab1 WHERE ... ) tab_alias

WHERE win > ?

在QUALIFY表达式中指定ALIAS

SELECT expression1 AS alias1, c2

FROM tab1

WHERE ...

QUALIFY ROW_NUMBER ( )

OVER ( PARTITION BY c3 ORDER BY alias1 DESC ) = ?

SELECT alias1, c2 FROM (

SELECT expression1 AS alias1, c2 , ROW_NUMBER ( ) OVER ( PARTITION BY c3 ORDER BY expression1 DESC ) AS alias2

FROM tab1 WHERE … ) tab_alias

WHERE alias2 = ?

其他

语法转换

Teradata

ADB PG

说明

COLLECT STATS ON table1 COLUMN (col);

COLLECT STATS table1;

ANALYZE table1 (col);

ANALYZE table1;

收集统计信息转换。

CREATE TABLE table (

dt DATE FORMAT 'YYYYMMDD' NOT NULL,

tm INTEGER FORMAT '99:99:99',

nm VARCHAR(10) FORMAT 'X(8)'

)

CREATE TABLE table (

dt DATE NOT NULL,

tm INTEGER,

nm VARCHAR(10)

)

DDL中的FORMAT转换。

CREATE INDEX index1 (col1) ON tab1;

CREATE INDEX index1 ON tab1 (col1);

创建索引转换。

CAST (100068 AS INTEGER FORMAT '99:99:9999')

TO_CHAR (100068, '00:00:0000')

表达式中的FORMAT转换。

SELECT * FROM table1

WHERE name LIKE ANY ('1%','2%');

SELECT * FROM table1

WHERE name LIKE '1%' OR name LIKE '2%';

LIKE ANY转换,ADB PG用关键字OR。

SELECT * FROM table1

WHERE name LIKE ALL ('1%','2%');

SELECT * FROM table1

WHERE name LIKE '1%' AND name LIKE '2%';

LIKE ALL转换,ADB PG用关键字AND。

逻辑转换

类型

Teradata

ADB PG

.QUIT

.IF ERRORLEVEL <> 0 THEN .QUIT

psql -c "select * from tab;" -v ON_ERROR_STOP=1

IF ... THEN

.IF ACTIVITYCOUNT <> 0 THEN ...

SELECT :ROW_COUNT <> 0 AS "ISVALID";

\gset

\IF :ISVALID

...

\ENDIF

.GOTO

.IF INIT_FLAG=1 THEN .GOTO InitLoad;

...

.LABEL InitLoad

...

\IF ...

...

\ENDIF

..FOR

...

..DO

....

..END-FOR

..FOR

select TRADE_DATE

from temp_y;

..DO

....

..END-FOR

封装成存储过程供调用

LOCKING

LOCKING TABLE tab1 FOR ACCESS

删除

SESSION

SELECT SESSION;

删除

SQL改写建议

类型

Teradata

ADB PG

DISTINCT

SELECT DISTINCT ON (id, name) id, name FROM tab1;

建议改写:

SELECT id, name FROM tab1 GROUP BY 1, 2;

NOT IN / IN

SELECT * FROM tab1 WHERE (id,name) NOT IN (SELECT id,name FROM tab2) ;

如果ID与name非空,建议改写:

SELECT * FROM tab1 WHERE NOT EXISTS ( SELECT 1 FROM tab2 where (tab1.id, tab1.name) = (tab2.id, tab2.name)) ;

UNION

SELECT id,name FROM tab1

UNION

SELECT id,name FROM tab2;

建议改写:

SELECT id,name FROM (

SELECT id,name FROM tab1

UNION ALL

SELECT id,name FROM tab2

) tab GROUP BY 1, 2;

附录二:可复用迁移相关代码

提供有代表性、通用性的UDF/Shell等,包括多用户跨schema赋权、跨库查询、gpfdist服务管控、vacuum空间回收等可复用代码。

通用UDF

获取表字段:get_table_columns()

-- 获取表字段
CREATE OR REPLACE FUNCTION get_table_columns
(
  p_schema varchar,                      -- 表schema
  p_table varchar,                       -- 表名
  type_in boolean default false,         -- 是否输出字段类型
  col_names varchar[] default null,      -- 指定表字段
  col_in boolean default false,          -- 指定表字段是否输出
  tab_alias varchar default null         -- 指定表别名
)
    RETURNS varchar AS $$
DECLARE
    clause varchar;
    coaln varchar;
    result varchar;
BEGIN
    clause :=
            CASE
                WHEN $4 IS NULL THEN '1=1'
                ELSE CASE
                         WHEN $5 THEN 'a.attname IN (''' || lower(array_to_string($4, ''',''')) || ''')'
                         ELSE 'a.attname NOT IN (''' || lower(array_to_string($4, ''',''')) || ''')'
                    END
                END;
    coaln :=
            CASE
                WHEN $3 THEN 'attname||'' ''||atttype'
                ELSE
                    CASE WHEN $6 IS NULL THEN 'attname'
                    ELSE ''''||$6||'.''||attname'
                    END
                END;
    EXECUTE 'SELECT string_agg('||coaln||', '', '''||') FROM (
        SELECT a.attname::text, format_type(a.atttypid, a.atttypmod) atttype FROM pg_attribute a
        WHERE a.attnum > 0 AND not a.attisdropped AND '||clause||' AND a.attrelid = (
            select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace
            where n.nspname = '''||lower($1)||''' AND c.relname = '''||lower($2)||''' limit 1)
        ORDER BY a.attnum ASC) aa' INTO result;
    RETURN result;
END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- select get_table_columns('public','test',false,array[''],false,'ab');

获取表拼接的coalesce()字段:get_table_columns_coalesce()

-- 获取表拼接的coalesce()字段
CREATE OR REPLACE FUNCTION get_table_columns_coalesce
(
  p_schema varchar,                      -- 表schema
  p_table varchar,                       -- 表名
  col_in boolean default false,          -- 指定表字段是否输出
  col_names varchar[] default null,      -- 指定表字段
  tab_alias varchar default null         -- 指定表别名
)
    RETURNS varchar AS $$
DECLARE
    clause varchar;
    coaln varchar;
    result varchar;
BEGIN
    clause :=
            CASE
                WHEN $4 IS NULL THEN '1=1'
                ELSE CASE
                         WHEN $3 THEN 'a.attname IN (''' || lower(array_to_string($4, ''',''')) || ''')'
                         ELSE 'a.attname NOT IN (''' || lower(array_to_string($4, ''',''')) || ''')'
                    END
                END;
    coaln := CASE WHEN $5 IS NULL THEN
                '
                WHEN atttypid IN(18,25,1042,1043) THEN ''COALESCE('' || attname || '','''''''')''
                WHEN atttypid IN(20,21,23) THEN ''COALESCE('' || attname || '',0)''
                WHEN atttypid IN(700,701,1700) THEN ''COALESCE('' || attname || '',0.0)''
                WHEN atttypid IN(1082) THEN ''COALESCE('' || attname || '',DATE''''00040229'''')''
                WHEN atttypid IN(1114,1184) THEN ''COALESCE('' || attname || '',TIMESTAMP''''00040229'''')''
                WHEN atttypid IN(1083,1266) THEN ''COALESCE('' || attname || '',TIME''''00:00:00'''')''
                ELSE attname
                '
             ELSE
                '
                WHEN atttypid IN(18,25,1042,1043) THEN ''COALESCE(' || $5 ||'.''||attname||'','''''''')''
                WHEN atttypid IN(20,21,23) THEN ''COALESCE(' || $5 ||'.''||attname||'',0)''
                WHEN atttypid IN(700,701,1700) THEN ''COALESCE(' || $5 ||'.''||attname||'',0.0)''
                WHEN atttypid IN(1082) THEN ''COALESCE(' || $5 ||'.''||attname||'',DATE''''00040229'''')''
                WHEN atttypid IN(1114,1184) THEN ''COALESCE(' || $5 ||'.''||attname||'',TIMESTAMP''''00040229'''')''
                WHEN atttypid IN(1083,1266) THEN ''COALESCE(' || $5 ||'.''||attname||'',TIME''''00:00:00'''')''
                ELSE attname
                '
             END;
    EXECUTE 'SELECT string_agg(
    CASE
        '||coaln||'
    END, '', '') FROM (
        SELECT a.attname::text, a.atttypid FROM pg_attribute a
        WHERE a.attnum > 0 AND not a.attisdropped AND '||clause||' AND a.attrelid = (
            select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace
            where n.nspname = '''||lower($1)||''' AND c.relname = '''||lower($2)||''' limit 1)
        ORDER BY a.attnum ASC) aa' INTO result;
    RETURN result;
END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- select get_table_columns_coalesce('public','t1',false,array['id'],'a');

获取表行数:get_tuple_count()

-- 获取表count()
CREATE OR REPLACE FUNCTION get_tuple_count
(
  p_schema varchar, 
  p_table varchar
)
    RETURNS bigint AS $BODY$
DECLARE
    v_rowcount bigint;
    v_full_tablename varchar := $1 ||'.'||$2 ;
    v_get_cnt_sql varchar;
BEGIN
    --Partitioned tables
    IF EXISTS (select 1 from pg_class a,pg_namespace b  where b.nspname=lower($1) and a.relname=lower($2) and a.relnamespace = b.oid and a.relkind='r' and a.relstorage='h' ) THEN
        v_get_cnt_sql := 'SELECT count(*) from '||v_full_tablename;
    ELSE
        v_get_cnt_sql := 'select sum(total_tupcount)-sum(hidden_tupcount) from gp_toolkit.__gp_aovisimap_compaction_info('||quote_literal(v_full_tablename)||'::regclass)';
    END IF;
    RAISE INFO '%',v_get_cnt_sql;
    EXECUTE v_get_cnt_sql INTO v_rowcount;
    RETURN v_rowcount;
END;
$BODY$ LANGUAGE plpgsql IMMUTABLE;

用户赋权

主要解决问题:

  1. 修改表owner。

  2. 给A用户和B用户赋SCHEMA权限。

  3. A用户和B用户都有SCHEMA s1的权限,A用户新创建的表,B用户能够做DML操作。

更新宿主:pg_catalog.update_owner_func()

create or replace FUNCTION pg_catalog.update_owner_func
(
    in schema_name text,
    in owner_name text,
    in type_name text
)
  RETURNS void   as   $BODY$
DECLARE
    table_name       text;
    view_name        text;
    sql_table_string text;
    sql_view_string  text;
BEGIN

    if type_name = 'r'
    then
        for table_name in
            select pt.tablename
            from pg_tables pt
            where pt.schemaname = schema_name
              and pt.tablename not in (select pp.partitiontablename from pg_partitions pp)
            LOOP
                sql_table_string := 'alter  table ' || $1 || '.' || table_name || ' owner to ' || $2 || ' ;';
                EXECUTE sql_table_string;
            end loop;

    elsif type_name = 'v'
    then
        for view_name in
            select pv.viewname
            from pg_views pv
            where pv.schemaname = schema_name
            LOOP
                sql_view_string := 'alter  view ' || $1 || '.' || view_name || ' owner to ' || $2 || ' ;';
                EXECUTE sql_view_string;
            end loop;
    end if;
end
$BODY$
    LANGUAGE plpgsql;

对象(表/视图)赋权:pg_catalog.grant_privs_do()

create or replace FUNCTION pg_catalog.do_on_objects_in_schema
(
    IN action text,                -- 输入 'grant' 或 'revoke' 表示赋予或回收
    IN schema_name name,           -- 指定schema name,同时只能指定一个
    IN owner name,                 -- 指定对象的owner,同时只能指定一个
    IN target_user text,           -- 指定 'grant' 或 'revoke' 的目标用户,多个用户使用逗号分隔,如: 'u1,u2,u3'
    IN objtype text,               -- 对象类别: 堆表('r'), 物化视图('m'), 视图('v')
    IN exclude_objects text[],     -- 排除哪些对象, 只需要object name,不需要schema name,用数组表示: array['t1', 't2', ...]
    IN privileges text,            -- 权限列表, 用逗号分隔: 'select,insert,update,delete'
    IN debug_mode boolean          -- 是否调试模式,调试模式只NOTICE不执行
)
  RETURNS void AS $BODY$
DECLARE
    objname name;
    sql_string text;
    grant_nsp boolean := false;
BEGIN
    IF lower(action) = 'grant' OR lower(action) = 'revoke' THEN
        FOR objname IN SELECT relname
                       FROM pg_class a,
                            pg_namespace b
                       WHERE pg_catalog.pg_get_userbyid(relowner) = owner
                         AND a.relkind = objtype
                         AND b.nspname = schema_name
                         AND a.relnamespace = b.oid
            LOOP
                sql_string := '';
                IF NOT grant_nsp AND lower(action) = 'grant' THEN
                    -- auto grant schema to target_user user
                    sql_string := 'GRANT USAGE ON SCHEMA "' || schema_name || '" to ' || target_user || ';';
                    RAISE NOTICE '%', sql_string;
                    IF NOT debug_mode THEN
                        EXECUTE sql_string;
                    END IF;
                    grant_nsp := true;
                END IF;


                IF (exclude_objects IS NOT NULL AND objname = ANY (exclude_objects)) THEN
                    RAISE NOTICE '% excluded % .', action, '"' || schema_name || '"."' || objname || '"';
                ELSE
                    IF lower(action) = 'grant' THEN
                        sql_string := action || ' ' || privileges || ' on "' || schema_name || '"."' || objname ||
                                      '" to ' || target_user || ';';
                    ELSIF lower(action) = 'revoke' THEN
                        sql_string := action || ' ' || privileges || ' on "' || schema_name || '"."' || objname ||
                                      '" from ' || target_user || ';';
                    END IF;
                    RAISE NOTICE '%', sql_string;
                    IF NOT debug_mode THEN
                        EXECUTE sql_string;
                    END IF;
                END IF;
            END LOOP;
    ELSE
        RAISE NOTICE 'Invalid action: %', action;
    END IF;
END;
$BODY$
    LANGUAGE plpgsql;


create or replace FUNCTION pg_catalog.grant_future_privs
(
    in schema_name text,
    in owner_name text,
    in target_name text,
    in privileges text
)
  RETURNS void  as  $BODY$
DECLARE
    sql_string text;
BEGIN
    sql_string := 'GRANT USAGE ON SCHEMA "'||$1||'" to '||$3||';';  
    EXECUTE sql_string; 
    sql_string := 'alter default privileges for  user ' || $2 || ' in schema ' || $1 || ' grant ' || $4 ||
                  ' on tables to ' || $3 || ' ;';
    EXECUTE sql_string;
end
$BODY$
    LANGUAGE plpgsql;


create or replace  function pg_catalog.grant_privs_do
(
    in schema_name text,
    in owner_name text,
    in target_name text,
    in privileges text,
    in type_name text
)
    RETURNS void  as $BODY$
begin
    if type_name = 'r'
    then
        perform do_on_objects_in_schema('grant', $1, $2, $3, 'r', null, $4, false);
    elsif type_name = 'v'
    then
        perform do_on_objects_in_schema('grant', $1, $2, $3, 'v', null, $4, false);
    end if;
    perform grant_future_privs($1, $2, $3, $4);
end
$BODY$
    LANGUAGE plpgsql;

UDF使用示例:

1.修改schema的对象的属主
select  update_owner_func('schema_name','owner_name','type_name');
schema_name    为schema的名称
owner_name     为将schema改为的属主
type_name      为对象类型,表或视图 (表为r,视图为v)

使用示例:
-- #把schema1下的表的属主改为user1
select update_owner_func('schema1','user1','r');

-- #将schema2下的视图的属主改为user2
select update_owner_func('schema2','user2','v');


2.赋权使用方法:
select grant_privs_do('schema_name','owner_name','target_name','privileges','type_name');
schema_name    为schema的名称
owner_name     为schema的属主
target_name    为需要拥有权限的用户
privileges     为赋予的权限(select,insert,update,delete)
type_name      为对象类型,表或视图 (表为r,视图为v)

使用示例:
--#把user1用户的schema1下的表的读权限赋予给用户user2
select grant_privs_do('schema1','user1','user2','select','r');

--#把user1用户的schema1下的视图的读权限赋予给用户user2
select grant_privs_do('schema1','user1','user2','select','r');

--#把user1用户的schema1下的表的select,insert,delete,update权限赋予给用户user2
select grant_privs_do('schema1','user1','user2','select,insert,delete,update','r');

跨库查询

创建外表:do_external_table_func()

create or replace function do_external_table_func
(
    in server_name text ,    --为Foreign server
    in schema_name text,     --为远端数据库中表所属的schema名称
    in table_name text,      --为远端库的表名称
    in current_schema text,  --为当前数据库的schema
    in action text           --为对外部表的操作,add为创建,drop为删除
)
    returns void as
$BODY$
declare
    sql_table_string text;
    value            int;
begin
    value = (
        select 1
        from (
                 select replace(split_part(array_to_string(ftoptions, ','), '=', 2), ',table_name',
                                '')                                         as schema,
                        split_part(array_to_string(ftoptions, ','), '=', 3) as table
                 from pg_foreign_table) as result
        where result.schema = $2
          and result.table = $3);

    if value = 1 then
        if 'add' = $5 then
            raise notice 'Foreign table already existed !!!';
        else
            sql_table_string := 'DROP FOREIGN TABLE ' || $3 || ';';
            execute sql_table_string;
            raise notice 'drop the Foreign tables finish !!!';
        end if;
    else
        if 'add' = $5 then
            sql_table_string := 'import  foreign schema  ' || $2 || ' limit to (' || $3 || ') from server ' || $1 ||
                                '   into ' || $4 || ';';
            execute sql_table_string;
            raise notice 'create the Foreign tables finish !!!';
        else
            raise notice 'the Foreign tables not exists !!!';
        end if;
    end if;
end
$BODY$
    language PLPGSQL;

跨库查询示例:

-- # 跨库查询创建外部表使用方法,do_external_table_func脚本为方便跨库查询而创建的函数。
select  do_external_table_func('server_name','foreign_schema','table_name','current_schema','action');
server_name     为Foreign server
foreign_schema  为远端数据库中表所属的schema名称
table_name      为远端库的表名称
current_schema  为当前数据库的schema
action          为对外部表的操作,add为创建,drop为删除

-- # 在使用do_external_table_func这个函数前,需要在生产库(A)和历史库(B)中创建一些必要的服务,具体如下:

1.使用前的操作:

-- # 生产库(A):  -------以超级用户执行
create extension postgres_fdw;
grant usage on FOREIGN DATA WRAPPER  postgres_fdw to public;
create server b_server foreign data wrapper postgres_fdw options(host '192.168.0.1',port '5432',dbname 'B');
CREATE USER MAPPING FOR public SERVER b_server  OPTIONS (user 'user', password 'password');

-- # 历史库(B):  -------以超级用户执行
create extension postgres_fdw;
grant usage on FOREIGN DATA WRAPPER  postgres_fdw to public;
create server a_server foreign data wrapper postgres_fdw options(host '192.168.0.1',port '5432',dbname 'A');
CREATE USER MAPPING FOR public SERVER a_server  OPTIONS (user 'user', password 'password');


2.使用示例:

-- 生产库要创建历史库中pdm_data.t00_agmt_chan_rela_h_1_prt_102表的查询链接,并存放在当前生产库的schema为pdm_view中
select  do_external_table_func('b_server','pdm_data','t00_agmt_chan_rela_h','pdm_view','add');
-- 删除生产库对于历史库中表pdm_data.t00_agmt_chan_rela_h表的查询链接
select  do_external_table_func('b_server','pdm_data','t00_agmt_chan_rela_h','pdm_view','drop');

-- 历史库要创建生产库中的nwf_data.int_n_nwf_acct_profit表的查询链接,并存放在历史库的schema为ndw_data中
select  do_external_table_func('a_server','nwf_data','int_n_nwf_acct_profit','nwf_data','add');
-- 删除历史库对于生产库中表nwf_data.int_n_nwf_acct_profit表的查询链接
select  do_external_table_func('a_server','nwf_data','int_n_nwf_acct_profit','nwf_data','drop');


3.跨库关联查询举例:在生产库A下的pdm_data.tab1表和历史库B下的pdm_data.tab1进行join关联查询

-- #在生产库A中执行:
select  do_external_table_func('b_server','pdm_data','pdm_tab1','pdm_his','add');

-- #则在pdm_his下创建了历史库中pdm_tab1的查询链接。关联查询:
select data.name,data.age,his.created_time from  pdm_data.tab1 as data inner join  pdm_his.tab1 as his on data.id=his.id;

GPFDIST

批量拉起GPFDIST进程Shell脚本:

#/bin/bash

p1=$1
p2=$2
p3=$3
p4=$4

start_gpfdist()
{
for ((seq =1;seq <= $p2;seq++))
do
  port=$(expr $p1 + $seq)
  sn=`netstat -lntp 2>/dev/null | grep $port |awk '{print $4}' | sed -n '1,3p' | awk -F ":" '{print $2}'`
  if [ $sn ] ; then
     echo "port $sn is already used! please shutdown the process first!"
     continue
  fi
  echo " port $port process is shutdown,It will startup ......"
  sleep 1
  gpfdist -p $port -d $p3 -m 1048576 -t 300 -l  /tmp/gpfdist.$port.log >> /tmp/gpfdist.$port.log1 2>&1 & 
done
}


kill_gpfdist()
{
ps -ef | grep -v grep | grep gpfdist |awk '{print $2}' |  xargs kill -9 
}

check_gpfdist()
{
for ((seq =1;seq <= $p2;seq++))
do
  port=$(expr $p1 + $seq)
  sn=`netstat -lntp 2>/dev/null | grep $port |awk '{print $4}' | sed -n '1,3p' | awk -F ":" '{print $2}'`
if [ $sn ] ; then
  echo "port $sn is already used! please shutdown the process first!"
fi
done
}


if [[ $# -lt 4 ]]; then
    echo "Usage: $0 <start_port> <process_num> <datadir> <operation check|start|stop>"
    exit 1
fi

if   [ "$p4"  ==  check ] ;  then
   check_gpfdist
elif [ "$p4"  == start ] ;  then
   start_gpfdist
elif [ "$p4"  ==  stop ] ;  then
   kill_gpfdist
else
  echo "input not correct !"
fi

监控GPFDIST进程Shell脚本(定时任务调起,注意脚本里的p1/p2参数需根据拉起进程脚本定义而修改):

#/bin/bash
dir=/home/etl/DATA/process/
p1=8000
p2=30

for ((seq =1;seq <= $p2;seq++))
do
port=$(expr $p1 + $seq)
flag=`ps -ef|grep -v 'grep'|grep gpfdist|grep $port|awk -F " " '{print $2}'`
sn=`netstat -lntp 2>/dev/null | grep $port |awk '{print $4}' | sed -n '1,3p' | awk -F ":" '{print $4}'`

if [ *$sn* ];  then
  echo "port $sn already used ! please shutdown the process first!"
  continue

elif [ -z *$flag* ];  then
  echo " port $port process is shutdown,It will startup ......"
  sleep 1
  gpfdist -p $port -d $dir -m 1048576 -t 300 -l  /tmp/gpfdist.$port.log >> /tmp/gpfdist.$port.log 2>&1 &

fi
done

监控GPFDIST进程日志Shell脚本(定时任务调起,注意脚本里的p1/p2参数需根据拉起进程脚本定义而修改):

#/bin/bash
dd1=`date +%Y-%m-%d`
dd3=`date -d "3 days ago" +%Y-%m-%d`  
dd6=`date -d "6 days ago" +%Y-%m-%d`  
p1=8000
p2=30
dir=/tmp

for ((seq =1;seq <= $p2;seq++))
do
port=$(expr $p1 + $seq)
cp ${dir}/gpfdist.${port}.log  ${dir}/gpfdist.${port}_${dd1}.log
echo ' ' > ${dir}/gpfdist.${port}.log 
if [ -f ${dir}/gpfdist.${port}_${dd6}.log ]
then
   rm -rf  ${dir}/gpfdist.${port}_${dd6}.log  2> /dev/null
else
   continue
fi
done

空间回收

编写Shell文件,命名为vacuum_sys.sh,修改文件头logoninfo/PGPASSWORD/AUTO_HOME等信息。

#!/bin/bash

export PGPASSWORD=...
logoninfo='-h <host> -p <port> -U <user> -d <database> -v ON_ERROR_STOP=1'
AUTO_HOME=/home/etl

mkdir -p ${AUTO_HOME}/log
cp /dev/null  ${AUTO_HOME}/log/tables_sys.sql
cat > $AUTO_HOME/log/tables_sys.sql << EOF
VACUUM  FULL   ANALYZE   pg_rewrite     ;
VACUUM  FULL   ANALYZE   pg_class       ; 
VACUUM  FULL   ANALYZE   pg_constraint  ;
VACUUM  FULL   ANALYZE   pg_type        ;
VACUUM  FULL   ANALYZE   pg_index       ;
VACUUM  FULL   ANALYZE   pg_statistic   ;
VACUUM  FULL   ANALYZE   pg_inherits    ;
VACUUM  FULL   ANALYZE   pg_partition   ;
VACUUM  FULL   ANALYZE   pg_partition_rule  ;
VACUUM  FULL   ANALYZE   pg_attribute   ;
REINDEX   TABLE          pg_rewrite     ;
REINDEX   TABLE          pg_class       ; 
REINDEX   TABLE          pg_constraint  ;
REINDEX   TABLE          pg_type        ;
REINDEX   TABLE          pg_index       ;
REINDEX   TABLE          pg_statistic   ;
REINDEX   TABLE          pg_inherits    ;
REINDEX   TABLE          pg_partition   ;
REINDEX   TABLE          pg_partition_rule  ;
REINDEX   TABLE          pg_attribute   ;
EOF

kill_vacuum_job()
 {
  ps -ef | grep psql | grep vacuum_sys | grep -v grep |awk '{print $2}'| xargs kill -9 > /dev/null 2>&1 & 
 }


schedule_vacuum_job()
 {   
     ###kill idle session
     psql $logoninfo -c "SELECT  pg_terminate_backend(pid) from pg_catalog.pg_stat_activity where state='idle' and now()-query_start > interval '7 day'"

     ###echo 'collecting sys tables from gp_bloat_diag...'
     psql $logoninfo -c "copy (select 'vacuum full analyze '||bdinspname||'.'||bdirelname||';' from gp_toolkit.gp_bloat_diag) to stdout;" >> $AUTO_HOME/log/tables_sys.sql
     psql $logoninfo -c "copy (select 'reindex table '||bdinspname||'.'||bdirelname||';' from gp_toolkit.gp_bloat_diag) to stdout;" >> $AUTO_HOME/log/tables_sys.sql

     ###echo 'vacuuming sys tables...'
     nohup psql $logoninfo -f $AUTO_HOME/log/tables_sys.sql -L $AUTO_HOME/log/vacuum_sys.log > /dev/null 2>&1 &

  for (( j=0; j<5; j++ ))
    do
     sleep 1
     echo 'VACUUM JOB  starting ......'${j}s  
    done

  num=`ps -ef|grep psql|grep vacuum_sys|grep -v grep|wc -l`

  if [ $num -eq 1 ]
    then
        echo 'VACUUM SYS JOB ALL schedule success !!!'
  else
        echo 'VACUUM SYS JOB ALL schedule failed !!!'
  fi

}

job_num=`ps -ef|grep psql|grep tables_sys|grep -v grep|wc -l`

if [ $job_num -gt 0 ]
  then
    kill_vacuum_job
    schedule_vacuum_job
else
    schedule_vacuum_job
fi