本文为您介绍如何基于MaxCompute创建面向Hologres的元数据映射和数据同步。
背景信息
传统数仓架构将上游实时或批量数据写入数仓,并使用OLAP引擎进行分析(图示上半部分)。但部分场景需要在MaxCompute中读取Hologres的数据(图示下半部分),例如:
实时数据透出与归档:数据需要从实时数据源快速透出到业务,完成实时数仓业务需求后,数据归档到企业级数仓对应分层和主题域。
业务先行与数据回流:没有经过数仓统一处理,先满足业务需求,但是数据集市稳定后,仍然需要回流到企业级数仓,和DWD、DWS等融合。
这两种场景的数据访问方式包括:
数仓模型迭代完善过程中浏览实时数仓中的数据。
实时数仓或数据集市的数据定期归档入企业级数仓。
企业级数仓加工后的数据写入Hologres ADS层,供业务消费。
功能介绍
本教程构建了从MaxCompute到Hologres的元数据映射链路,具体功能如下:
MaxCompute和Hologres之间的数据类型映射存在差异,Hologres部分数据类型不支持同步至MaxCompute。
Schema级别元数据映射:可基于RAMRole权限认证方式,通过外部Schema实时读取Hologres元数据和数据,进行Schema级别的数据访问。
单表级元数据映射:可在Hologres数据目录中选择目标表,一键自动创建映射Hologres表的MaxCompute外部表。
数据同步:可将需要周期同步的表一键配置为数据同步任务,满足周期性地向企业级数仓同步的需求。
前提条件
已创建RAM角色并配置信任策略。
业务场景操作流程
本教程基于使用Flink+Hologres搭建的实时数仓,在Hologres实时数仓的DWD层,增加了MaxCompute映射Hologres Schema、Table,并从DWD层表中同步数据的流程。具体如下:
您可通过External Schema映射Hologres的Schema,通过External Table映射Hologres的Table,还可指定Hologres的表进行一次性或周期数据同步。
上游数据准备
准备工作
创建RDS MySQL实例并准备数据源。本教程使用DMS登录RDS MySQL实例,其中重点参数配置如下:
配置项
描述
实例名称
test_dbs_rds
MySQL数据库名称
test_dbs_mysqldb
数据库账号
test_dbs_mysqluser
账号类型
选择高权限账号。
创建Hologres实例和计算组。本教程中,重点参数配置如下:
配置项
描述
购买Hologres
商品类型
独享实例(按量付费)
实例类型
计算组型
计算组预留计算资源
64 CU
实例名称
test_dbs_holo
创建数据库
数据库名称
order_dw
默认Schema
public
新增计算组
read_warehouse_1
预留计算资源:配置为32 CU。
创建Flink工作空间和Catalog。本教程中创建的Flink工作空间名称为
test-dbs-flink
。
在Flink中创建RDS MySQL整库同步任务(ODS层表)
在Flink中创建RDS MySQL整库同步任务,将RDS MySQL中的数据同步至Hologres数据库order_dw的public
Schema下,并使用Hologres的read_warehouse_1计算组查询ODS数据。
MySQL需要提前开启Binlog,您可在RDS MySQL的数据库中执行show variables like "log_bin";
命令,查看Binlog是否开启。详情请参见MySQL服务器配置要求。
创建CDAS同步作业ODS。
在
页面,新建名为ODS的SQL流作业,并将如下代码拷贝到SQL编辑器。CREATE DATABASE IF NOT EXISTS dw.order_dw -- 创建catalog时设置了table_property.binlog.level参数,因此通过CDAS创建的所有表都开启了binlog。 AS DATABASE mysqlcatalog.test_dbs_mysqldb INCLUDING all tables -- 可以根据需要选择上游数据库需要入仓的表。 /*+ OPTIONS('server-id'='8001-8004') */ ; -- 指定mysql-cdc实例server-id范围。
说明本示例默认将数据同步到数据库order_dw的public Schema下。您也可以将数据同步到Hologres目标库的指定Schema中,详情请参见作为CDAS的目标端Catalog,指定后使用Catalog时的表名格式也会发生变化,详情请参见使用Hologres Catalog。
如果源表的数据结构发生变化,则需要等待源表的数据出现变更(删除、插入、更新),结果表的数据结构才会看到变化。
单击右上方的部署,进行作业部署。
单击左侧导航栏的
,单击刚刚部署的ODS作业操作列的启动,选择无状态启动后单击启动。
向计算组加载数据。
Table Group是Hologres中数据的载体。使用read_warehouse_1查询order_dw数据库中Table Group(本示例为order_dw_tg_default)的数据时,为计算组read_warehouse_1加载order_dw_tg_default,以实现使用
init_warehouse
计算组写入数据,使用read_warehouse_1
计算组进行服务查询。在HoloWeb开发页单击SQL编辑器,确认实例名和数据库名称后,执行如下命令。更多详情请参见创建新计算组实例。加载后,可以查看到read_warehouse_1已经加载了order_dw_tg_default Table Group的数据。
--查看当前数据库有哪些Table Group SELECT tablegroup_name FROM hologres.hg_table_group_properties GROUP BY tablegroup_name; --为计算组加载Table Group CALL hg_table_group_load_to_warehouse ('order_dw.order_dw_tg_default', 'read_warehouse_1', 1); --查看计算组加载Table Group的情况 SELECT * FROM hologres.hg_warehouse_table_groups;
在右上角切换计算组为read_warehouse_1,后续使用read_warehouse_1进行查询分析。
在HoloWeb中执行如下命令,查看MySQL同步到Hologres的3张表数据。
---查orders中的数据。 SELECT * FROM orders; ---查orders_pay中的数据。 SELECT * FROM orders_pay; ---查product_catalog中的数据。 SELECT * FROM product_catalog;
在Flink中创建DWD层表
通过Flink Catalog功能在Hologres中建DWD层的宽表dwd_orders。
在
页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。-- 宽表字段要nullable,因为不同的流写入到同一张结果表,每一列都可能出现null的情况。 CREATE TABLE dw.order_dw.dwd_orders ( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int comment 'platform 0: phone, 1: pc', pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ); -- 支持通过catalog修改Hologres物理表属性。 ALTER TABLE dw.order_dw.dwd_orders SET ( 'table_property.binlog.ttl' = '604800' --修改binlog的超时时间为一周。 );
实现实时消费ODS层orders、orders_pay表的binlog。
在
页面,新建名为DWD的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。通过如下SQL作业,orders表会与product_catalog表进行维表关联,将最终结果写入dwd_orders表中,实现数据的实时打宽。BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dwd_orders ( order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state, order_product_catalog_name ) SELECT o.*, dim.catalog_name FROM dw.order_dw.orders as o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id; INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time) SELECT * FROM dw.order_dw.orders_pay; END;
查看宽表dwd_orders数据。
在HoloWeb开发页面连接Hologres实例并登录目标数据库后,在SQL编辑器上执行如下命令。
SELECT * FROM dwd_orders;
在DataWorks中绑定MaxCompute和Hologres计算资源
您可在DataWorks工作空间中绑定MaxCompute和Hologres计算资源,并在数据目录中查看相应的Catalog。
创建新版DataWorks工作空间,详情请参见创建工作空间。本教程中设置为
DBS_DW_TEST
。说明创建工作空间时,需要将参加数据开发(Data Studio)(新版)公测设置为开启状态。
绑定计算资源。
绑定MaxCompute和Hologres计算资源,具体操作请参见进入计算资源绑定页。本教程中设置的计算资源名称如下:
计算资源类型
计算资源名称
MaxCompute
dbs_mc
Hologres
order_dw
在DataWorks的Data Studio页面,选择目标工作空间后,在数据目录中可查看已绑定的MaxCompute项目和Hologres实例。
在数据目录中添加Hologres Catalog时,不同的添加方式,对应的行为不同。具体如下:
添加Hologres Catalog方式
适用场景
说明
通过DataWorks 数据源模式添加
适用于数据集成场景。
例如:从Hologres中定时同步某张表的数据至MaxCompute。
DataWorks数据源指定到具体的Database,携带执行人的身份,不可越权查看其他Database。
说明若您在创建DataWorks新版工作空间后,绑定了Hologres计算资源,则数据目录中默认通过DataWorks数据源模式添加Hologres Catalog。
通过Hologres-实例模式添加
适用于跨Database浏览数据的场景。
可以使用计算资源携带身份查看有权限的Database,此种方式便于跨Database浏览其他Database下的数据。
可创建元数据映射任务,但无法创建同步任务。
将鼠标悬浮于MaxCompute项目名或Hologres实例名上,可查看数据源或实例的详细信息。
数据源模式
Hologres-实例模式
创建映射Hologres Schema的MaxCompute外部Schema
外部Schema映射的表,元数据和数据实时从Hologres中获取,无需在MaxCompute内创建含有DDL元信息的表,Hologres源端表结构变化或数据变化,都可以实时感知并查询获取。
通过该方式,您可浏览实时数仓中ODS层、DWD层的表,按需查询或为后续周期同步做准备。
在MaxCompute控制台的 页面,创建连接Hologres Database的外部数据源。具体操作请参见步骤一:创建Hologres外部数据源。
本教程中创建的外部数据源名称为
dbs_holo_external
。重要创建Hologres外部数据源时:
认证方式需选择RAMRole。
Host仅支持配置为经典网络类型的Hologres实例域名。
在Hologres的HoloWeb开发页面,将RAMRole添加为Hologres实例的用户,以确保其具备Hologres的表权限。具体操作请参见用户管理。
在Hologres的HoloWeb开发页面,为该RAM角色授予实例的Developer权限(仅支持SPM模式)。操作详情请参见DB管理。
映射Hologres实例的Schema到MaxCompute项目的Schema,并在MaxCompute中浏览Hologres中的表。
在DataWorks的Data Studio页面中单击左侧的
图标,进入数据目录页面。
展开Hologres数据目录,右键单击目标实例的Schema(本教程中为
public
),选择元数据映射至 MaxCompute。在元数据映射至 MaxCompute页面中配置Hologres源端和MaxCompute目标端的参数。
本教程中重点参数配置如下,其余参数保持默认即可。
参数名
描述
项目查找方式
选择来自 DataWorks 数据源。
数据源
选择已绑定至DataWorks的MaxCompute计算资源名称。
本教程中为
dbs_mc
。外部 Schema 名称
指定源端Hologres Schema下的元数据映射至目标MaxCompute的外部Schema名称。
本教程中配置为
public
。外部数据源
选择MaxCompute中已创建的Hologres联邦数据源名称。
本教程中为
dbs_holo_external
。单击Hologres 源端上方的运行。
运行成功后,可看到和Hologres Schema(public)同名的MaxCompute外部Schema。
您可直接浏览Hologres中的表,并在MaxCompute中使用如下SQL命令查询数据。
SET odps.namespace.schema=true; SELECT * FROM public.dwd_orders;
返回结果如下:
重要若Schema级映射运行成功,但在数据目录的MaxCompute目录下无法显示映射后的表名,显示查询失败,请确认您创建的RAM角色权限是否配置正确。详情请参见创建RAM角色。
创建映射Hologres Table的MaxCompute外部表
不同于外部Schema,外部表需要将Hologres表在MaxCompute内建为外部表。外部表支持RAMRole和双签名两种认证方式:
RAMRole:支持跨账号角色扮演。您需在Hologres侧,完成以下操作:
双签名:使用当前执行任务的用户身份鉴权。即当前用户在Hologres中拥有哪些表的权限,通过MaxCompute外部表,也可使用此身份访问Hologres数据。详情请参见Hologres外部表。
您可挑选部分或全部字段进行映射,映射规则请参见参数说明的tblproperties参数部分。
操作步骤如下:
在DataWorks的Data Studio页面中单击左侧的
图标,进入数据目录页面。
展开Hologres数据目录,右键单击目标实例public Schema下的dwd_orders表,选择元数据映射至 MaxCompute。
在元数据映射至 MaxCompute页面中配置Hologres源端和MaxCompute目标端的参数。
本教程中重点参数配置如下,其余参数保持默认即可。参数详情请参见单表级元数据映射。
参数名
描述
实例查找方式
选择来自 DataWorks 数据源。
数据源
选择已绑定至DataWorks的MaxCompute数据源名称。
本教程中为
dbc_mc
。Schema
指定源端Hologres Schema下的元数据映射至目标MaxCompute的外部Schema名称。
本教程中配置为
default
。External Table
指定MaxCompute中新创建的外表名称,源端表数据将会被映射至此表中,默认与Hologres中表名称保持一致。
说明创建外表为一次性动作,不会自动刷新元数据,如需刷新元数据,需要删除当前外表并重新手动创建元数据映射。
MaxCompute 外表权限
选择双签名。
说明选择RamRole方式时,需要在Hologres侧添加用户,并进行DB授权。
生命周期
设置表的生命周期。
单击Hologres 源端上方的运行。
运行成功后,即可在左侧MaxCompute Schema下显示新建的外部表。
您可使用如下语句在MaxCompute中查询该外部表的数据。
SET odps.namespace.schema=true; SELECT * FROM dwd_orders;
返回结果如下:
创建同步Hologres Table的周期任务
若需要周期性地将Hologres实时数仓中的DWD表数据归档到MaxCompute云数仓的一张内部表中,可以使用数据同步任务,并配置周期调度实现。
在DataWorks的Data Studio页面单击左侧的
图标,进入数据开发页面,并新建项目目录。
单击左侧的
图标,在数据目录页面展开Hologres数据目录,右键单击目标实例public Schema下的dwd_orders表,选择数据同步至 MaxCompute。
在选择创建节点所属的路径弹框中,为云数仓内的表命名
dwd_holo_orders
,并按下回车键。进入同步Hologres数据至MaxCompute的配置页面,配置Hologres源端和MaxCompute目标端的参数。本教程中重点参数配置如下,更多参数说明请参见配置同步节点。
参数名
描述
数据源
选择已绑定至DataWorks的MaxCompute数据源名称。
本教程中为
dbs_mc
。Schema
选择您想要存储的Schema。
Table
自定义MaxCompute内部表名称。
本教程中配置为
dwd_holo_orders
。生命周期
设置表的生命周期。
导入方式
选择数据写入MaxCompute内表的方式:
追加:当您需要删除原有数据,将新数据写入到目标表时,可以选择覆盖写入的方式。
覆写:当您需要保留原有数据,将新数据追加到目标表时,可以选择追加写入的方式。
访问 Hologres 权限
您可以根据实际情况选择以下方式访问Hologres实例:
双签名访问方式:通过当前身份进行Hologres权限校验。
您需确保在MaxCompute项目下,拥有MaxCompute表读取权限的同时,也需要有该MaxCompute表对应的Hologres源表权限。MaxCompute侧权限控制参见:湖仓一体2.0使用指南、Hologres侧权限控制参见:权限管理概述。
RamRole访问方式:通过指定RAM角色进行访问身份校验。
为RAM用户授权AliyunSTSAssumeRoleAccess权限策略。详情请参见RAM角色授权模式。授权完成后,在RamRole中配置您所指定的RAM角色。
单击页面右侧的调度配置,在调度配置页面中配置工作流和调度周期,操作详情请参见节点调度。
单击Hologres 源端上方的运行。
运行成功后,即可在左侧MaxCompute下显示新建的内部表,您可使用如下SQL语句在MaxCompute中查询该外部表的数据。
SET odps.namespace.schema=true; SELECT * FROM default.dwd_holo_orders;
返回结果如下: