Delta Lake和Hudi是数据湖方案中常用的存储机制,为数据湖提供流处理、批处理能力。MaxCompute基于阿里云DLF、RDS、OSS产品提供了支持Delta或Hudi存储机制的湖仓一体架构。您可以通过MaxCompute查询到实时数据,即时洞察业务数据变化。本文为您介绍如何构建基于Delta Lake或Hudi存储机制的湖仓一体方案。

背景信息

通常,企业构建和应用数据湖需要经历数据入湖、数据湖存储与管理、数据探索与分析过程。MaxCompute基于阿里云DLF(Data Lake Formation)、RDS(Relational Database Service)、OSS(Object Storage Service)产品提供了支持Delta Lake或Hudi存储机制的湖仓一体架构,架构图如下。

数据湖架构
模块 对应阿里云产品 说明
在线数据库 云数据库RDS 准备的入湖数据来源,即数据湖的数据来源。
数据入湖 数据湖构建DLF 对接在线数据库,管理入湖数据来源。
数据湖存储与管理 将在线数据库中的源数据引入数据湖时,OSS会作为数据湖的统一存储,存储机制包含Delta Lake和Hudi两种。同时,DLF采用元数据管理功能管理元数据库和元数据表。
数据湖探索与分析 对数据湖数据进行分析。

前提条件

已开通如下服务或已创建实例、项目:
  • 已开通OSS服务。

    更多开通OSS服务操作,请参见开通OSS服务

  • 已开通DTS服务。

    您可以单击此处一键授权,为后续操作MaxCompute项目的阿里云账号授予AliyunDTSDefaultrole角色,授权即表示开通了DTS服务。

  • 已创建RDS MySQL实例。
    更多创建RDS MySQL实例操作,请参见创建RDS MySQL实例。假设已创建的RDS MySQL实例如下,所属地域为华东2(上海)。RDS MySQL实例
  • 已开通DataWorks服务。

    更多开通DataWorks操作,请参见开通DataWorks

  • 已开通DLF
  • 已创建MaxCompute项目(非External Project)。
    更多创建MaxCompute项目信息,请参见创建MaxCompute项目。假设已创建的MaxCompute项目名称为doc_test_prod,所属地域为华东2(上海)。MaxCompute项目

使用限制

基于Delta Lake或Hudi存储机制的湖仓一体方案的使用限制如下:
  • 仅华东1(杭州)、华东2(上海)、华北2(北京)、华南1(深圳)和新加坡地域支持构建湖仓一体能力。
  • MaxCompute可以部署在与DLF、OSS和RDS不同的地域,但DLF、OSS和RDS必须部署在同一地域。

操作流程

  1. 步骤一:授予MaxCompute访问DLF和OSS的权限
    为操作MaxCompute项目的账号授予访问DLF和OSS的权限。
  2. 步骤二:准备入湖数据
    创建RDS数据库,准备入湖数据。
  3. 步骤三:在OSS中创建存储空间及目录
    创建OSS存储空间作为数据湖的统一存储路径。
  4. 步骤四:在DLF中添加入湖数据源并创建元数据库
    通过创建数据源对接RDS与DLF。
  5. 步骤五:在DLF上创建并启动入湖任务
    基于DLF创建数据入湖任务,将RDS数据库中的表数据实时同步并回放到数据湖中。
  6. 步骤六:基于MaxCompute实时分析数据湖数据
    在MaxCompute项目中创建External Project,对数据湖数据进行分析。

步骤一:授予MaxCompute访问DLF和OSS的权限

操作MaxCompute项目的账号未经授权无法访问DLF、OSS服务,您需要执行授权操作。授权方式包含如下两种:
  • 方式一(推荐):一键式授权。当创建MaxCompute项目的账号和部署DLF、OSS的账号相同时,推荐使用该方式。您可以单击此处完成一键式授权
  • 方式二:自定义授权。当需要自定义授权信息时,使用该方式。创建MaxCompute项目的账号和部署DLF、OSS的账号相同或不相同,都可以使用该方式。

自定义授权的操作步骤如下:

  1. 登录RAM控制台创建可信实体为阿里云账号的RAM角色。例如AliyunODPSRoleForDLF。
    更多创建RAM角色信息。请参见创建可信实体为阿里云账号的RAM角色RAM角色
  2. 通过RAM控制台修改新建RAM角色的信任策略。
    注意 如果创建MaxCompute项目的账号和部署DLF的账号不相同,此处需要使用部署DLF的账号进行操作。
    更多修改RAM角色信任策略信息,请参见修改RAM角色的信任策略
    修改信任策略为如下内容。
    --当创建MaxCompute项目的账号和部署DLF的账号是同一个账号时,设置如下。
    {
    "Statement": [
    {
     "Action": "sts:AssumeRole",
     "Effect": "Allow",
     "Principal": {
       "Service": [
         "odps.aliyuncs.com"
       ]
     }
    }
    ],
    "Version": "1"
    }
    --当创建MaxCompute项目的账号和部署DLF的账号不是同一个账号时,设置如下。
    {
    "Statement": [
    {
     "Action": "sts:AssumeRole",
     "Effect": "Allow",
     "Principal": {
       "Service": [
         "<MaxCompute项目的Owner云账号id>@odps.aliyuncs.com"  
       ]
     }
    }
    ],
    "Version": "1"
    }
    MaxCompute的Owner云账号id:可以在个人信息中获取。
  3. 通过RAM控制台为新建RAM角色自定义权限策略。配置模式脚本配置
    更多自定义权限策略信息,请参见创建自定义策略
    自定义权限策略内容如下。
    {
    "Version": "1",
    "Statement": [
    {
     "Action": [
       "oss:ListBuckets",
       "oss:GetObject",
       "oss:ListObjects",
       "oss:PutObject",
       "oss:DeleteObject",
       "oss:AbortMultipartUpload",
       "oss:ListParts"
     ],
     "Resource": "*",
     "Effect": "Allow"
    },
    {
     "Action": [
     "dlf:CreateFunction",
    "dlf:BatchGetPartitions",
    "dlf:ListDatabases",
    "dlf:CreateLock",
    "dlf:UpdateFunction",
    "dlf:BatchUpdateTables",
    "dlf:DeleteTableVersion",
    "dlf:UpdatePartitionColumnStatistics",
    "dlf:ListPartitions",
    "dlf:DeletePartitionColumnStatistics",
    "dlf:BatchUpdatePartitions",
    "dlf:GetPartition",
    "dlf:BatchDeleteTableVersions",
    "dlf:ListFunctions",
    "dlf:DeleteTable",
    "dlf:GetTableVersion",
    "dlf:AbortLock",
    "dlf:GetTable",
    "dlf:BatchDeleteTables",
    "dlf:RenameTable",
    "dlf:RefreshLock",
    "dlf:DeletePartition",
    "dlf:UnLock",
    "dlf:GetLock",
    "dlf:GetDatabase",
    "dlf:GetFunction",
    "dlf:BatchCreatePartitions",
    "dlf:ListPartitionNames",
    "dlf:RenamePartition",
    "dlf:CreateTable",
    "dlf:BatchCreateTables",
    "dlf:UpdateTableColumnStatistics",
    "dlf:ListTableNames",
    "dlf:UpdateDatabase",
    "dlf:GetTableColumnStatistics",
    "dlf:ListFunctionNames",
    "dlf:ListPartitionsByFilter",
    "dlf:GetPartitionColumnStatistics",
    "dlf:CreatePartition",
    "dlf:CreateDatabase",
    "dlf:DeleteTableColumnStatistics",
    "dlf:ListTableVersions",
    "dlf:BatchDeletePartitions",
    "dlf:ListCatalogs",
    "dlf:UpdateTable",
    "dlf:ListTables",
    "dlf:DeleteDatabase",
    "dlf:BatchGetTables",
    "dlf:DeleteFunction"
     ],
     "Resource": "*",
     "Effect": "Allow"
    }
    ]
    }
  4. 将自定义的权限策略授权给新建的RAM角色。
    更多为RAM角色授权信息,请参见为RAM角色授权
  5. RAM角色管理页面,单击新建的RAM角色名称,获取ARN信息。
    ARN

步骤二:准备入湖数据

基于阿里云RDS构造数据库,并创建表、准备入湖数据。

  1. 访问RDS实例列表,在上方选择地域,例如华东2(上海),然后单击目标实例ID。
  2. 创建RDS数据库账号。在目标实例信息页面的左侧导航栏单击账号管理后,在用户账号页签单击创建账号,并在创建账号面板配置下表所列参数信息,单击确定
    创建账号
    参数名称 说明 样例值
    数据库账号 访问RDS数据库的账号。创建RDS数据库时会绑定此信息。 datalake_mcdbuser
    账号类型 账号包含如下两种:
    • 普通账号:后续需要绑定至RDS数据库。
    • 高权限账号:无需选择要授权的数据库,拥有实例中所有数据库的权限。
    普通账号
    密码 账号对应的密码信息。
    确认密码 再次确认账号对应的密码信息。
    更多参数解释,请参见创建账号
  3. 创建RDS数据库。在目标实例信息页面的左侧导航栏单击数据库管理后,在右侧区域单击创建数据库,并在创建数据库对话框配置下表所列参数信息,单击创建
    创建数据库
    参数名称 说明 样例值
    数据库(DB)名称 新建RDS数据库的名称。 datalake_mcdb
    支持字符集 在下拉列表选择RDS数据库支持的字符集类型,由于MaxCompute采用的数据集类型为UTF-8,为便于后续连通,此处固定选择为utf8 utf8
    授权账号 在下拉列表选择可以访问RDS数据库的账号。 datalake_mcdbuser
    账号类型 在下拉列表选择要授予账号的权限。 读写
    更多参数解释,请参见创建数据库
  4. 通过DMS登录创建的RDS数据库。
    1. 数据库管理页面右上角,单击登录数据库
    2. 登录实例对话框,填写下表所列参数信息后,单击登录
      登录实例
      参数名称 说明 样例值
      数据库类型 RDS数据库的类型,固定选择MySQL datalake_mcdb
      实例地区 在下拉列表选择已创建的RDS数据库所属地域。 华东2(上海)
      实例ID 在下拉列表选择已创建的RDS实例。 rm-uf6912****
      数据库账号 在下拉列表选择可以访问RDS数据库的账号。后续可通过该账号登录RDS数据库。 datalake_mcdbuser
      数据库密码 数据库账号对应的密码。
    3. 登录实例对话框左下角单击测试连接,确保数据库连通性正常。
      连通正常状态如下。连通正常
      说明 如果单击测试连接后,提示白名单问题,按照界面指引,单击设置白名单即可解决该问题。
    4. 登录实例对话框,单击登录
  5. 进入DMS的SQL Console界面后,在命令执行区域,输入SQL语句并单击执行(F8)
    为RDS数据库创建表并插入少量测试数据。例如表名为anti_fraud_result。命令示例如下。
    CREATE TABLE `anti_fraud_result` (
      `transactionid` varchar(32) NOT NULL,
      `uid` varchar(32) DEFAULT NULL,
      `card` varchar(32) DEFAULT NULL,
      `longitude` double(12,8) DEFAULT '12.00000000',
      `latitude` double(12,8) DEFAULT '12.00000000',
      PRIMARY KEY (`transactionid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    ;
    
    INSERT INTO `anti_fraud_result` values ("12489571","82005","123123",3.14592040,101.12315432);
    INSERT INTO `anti_fraud_result` values ("12489572","82005","123123",3.14592040,101.12315432);
    INSERT INTO `anti_fraud_result` values ("12489573","82005","123123",3.14592040,101.12315432);
    INSERT INTO `anti_fraud_result` values ("12489574","82005","123123",3.14592040,101.12315432);
    建表并插入数据

步骤三:在OSS中创建存储空间及目录

创建OSS存储空间作为数据湖的统一存储路径。

  1. 登录OSS管理控制台
  2. 在左侧导航栏,单击Bucket列表,在Bucket列表页面,单击创建Bucket
  3. 创建Bucket面板填写Bucket名称,例如mc-dlf-oss,并选择地域,例如华东2(上海)后,单击确定
    创建Bucket
  4. 在左侧导航栏,单击文件管理后,在右侧区域单击新建目录并填写目录名,例如covid-19,单击确定
    创建目录

步骤四:在DLF中添加入湖数据源并创建元数据库

通过创建数据源对接RDS与DLF。

  1. 登录元数据库管理控制台,在左上角选择地域,例如华东2(上海)
  2. 新建RDS数据源。
    1. 在左侧导航栏,选择数据入湖 > 数据源管理。在数据源管理页面,单击新建数据源
    2. 新建数据源对话框,配置连接属性参数后,单击下一步
      链接属性
      参数 描述 示例
      连接名称 新建数据连接的名称,用于区分不同数据库的连接。此处添加的是RDS数据源。 dlf_rds
      连接类型 在下拉列表选择目标数据源类型。固定选择数据库 数据库
      数据库引擎 在下拉列表选择目标数据库的引擎类型。固定选择mysql mysql
      实例类型 在下拉列表选择目标数据库对应的实例类型。固定选择RDS实例 RDS实例
    3. 新建数据源对话框,配置访问设置参数。
      访问设置
      参数 描述 示例
      请选择实例类型 在下拉列表选择当前阿里云账号下的RDS 当前阿里云账号下的RDS
      RDS实例 在下拉列表选择待连接的目标RDS实例。 rm-uf6912****
      用户名 登录RDS实例下目标数据库的账号。即步骤二中创建的RDS数据库账号。 datalake_mcdbuser
      密码 数据库账号对应的密码。
      VPC 在下拉列表选择RDS实例所属VPC ID。

      您也可以登录RDS管理控制台,在实例列表页面目标RDS实例对应的网络类型处获取VPC ID。

      vpc-uf6arc****
      VSwitch 在下拉列表选择VPC ID绑定的交换机ID。

      您也可以登录专有网络管理控制台,在左侧导航栏单击交换机,即可在右侧通过VPC ID获取到交换机ID。

      vsw-uf68po****
      Security Group 在下拉列表选择RDS实例绑定的安全组信息。

      您也可以登录RDS管理控制台,在实例列表页面单击目标实例名称后,在左侧导航栏单击数据安全性,即可在安全组页签获取到绑定的安全组信息。

      sg-uf6b8m****
    4. 在左下角单击连接测试,确保数据库连通性正常后,单击下一步
      连通正常状态如下。连通正常
      说明 如果连通显示异常,请重新确认并修改填写信息无误后再测试连接。
    5. 单击确定,完成数据源添加。
      添加数据源成功
  3. 创建元数据库。例如covid_19。
    1. 在左侧导航栏,选择元数据管理 > 元数据库。在元数据库页面,单击新建元数据库
    2. 新建元数据库对话框,配置元数据库参数。
      新建元数据库
      参数名称 描述 示例
      元数据库名称 新建元数据库的名称。 covid_19
      元数据库描述 元数据库的备注信息。 湖仓一体
      选择路径 OSS数据路径。格式为oss://<Bucket名称>/<OSS目录名称>/ oss://mc-dlf-oss/covid-19/
    3. 单击确定,完成元数据库创建。
      完成元数据库创建

步骤五:在DLF上创建并启动入湖任务

基于DLF创建数据入湖任务,将RDS数据库中的表数据实时同步并回放到数据湖中。

  1. 在DLF控制台的左侧导航栏,选择数据入湖 > 入湖任务管理。在入湖任务管理页面,单击新建入湖任务
  2. 在配置向导页,选择任务类型为关系数据库实时入湖,单击下一步
  3. 在配置向导页,配置数据源和目标信息后,单击下一步
    配置数据源和目标信息
    类别 参数名称 描述 示例
    配置数据源 数据湖连接 在DLF中添加的数据连接名称。即步骤四中创建的RDS数据源连接名称。 dlf_rds
    表路径 目标RDS数据源中表的路径,格式为RDS数据库名称/表名称 datalake_mcdb/anti_fraud_result
    DTS数据订阅 DTS数据订阅名称。作业通过DTS订阅功能来消费表的binlog。 from_dlf
    配置目标数据湖信息 目标元数据库 目标数据湖的元数据库名称,即步骤四中创建的元数据库。 covid_19
    目标元数据表名称 目标数据湖的元数据表名称。 sh_rds
    存储格式 目标元数据表的存储格式。可以在下拉列表选择DeltaHudi Delta
    数据湖存储位置 目标元数据表数据在OSS中的存储位置。 oss://mc-dlf-oss/covid-19/sh_rds/
  4. 在配置向导页,配置任务信息后,单击下一步
    配置任务
    参数名称 描述 示例
    任务实例名称 新建数据入湖实例的名称。 rds_dlf_mc
    RAM角色 DLF服务通过扮演该角色来访问用户资源,可选择系统默认AliyunDLFWorkFlowDefaultRole角色。 AliyunDLFWorkFlowDefaultRole
    最大资源使用量 入湖任务使用的最大资源量,单位为CU。 10
  5. 确认配置的入湖任务信息无误后,单击确定
    入湖任务
  6. 入湖任务管理页面,在新建入湖任务的操作列单击运行后,单击确定,启动入湖任务。
    数据入湖任务属于全量及增量入湖,您需要等待大约3~5分钟后,数据会完成导入,随后入湖任务自动进入实时监听状态。如果有数据更新,则会自动更新至Delta Lake或Hudi数据格式中。运行入湖任务

步骤六:基于MaxCompute实时分析数据湖数据

基于已创建的MaxCompute项目、DLF元数据库、OSS存储空间,创建External Project,用于关联DLF和OSS,并映射至已创建的MaxCompute项目。后续可通过映射的MaxCompute项目对External Project进行数据分析操作。仅MaxCompute项目的所有者(Project Owner)或具备Admin、Super_Administrator角色的用户可以创建External Project。操作步骤如下:

  1. 进入临时查询界面,执行create externalproject命令创建External Project。
    create externalproject的语法如下。
    create externalproject -source dlf -name <external_project_name> 
        -ref <project_name> 
       [-comment <comment>]
        -region <dlf_region> 
        -db <dlf_database_name>
        -endpoint "<dlf_endpoint>" 
       [-ramRoleArn "<ram_role_arn>"]
        -ossEndpoint "<oss_endpoint>" 
       [-T <table_properties>];
    • external_project_name:必填。待创建External Project的名称。
    • project_name:必填。已创建的MaxCompute项目名称。
    • comment:可选。External Project的注释信息。
    • dlf_region:必填。DLF所在地域的RegionID。各地域RegionID信息,请参见获取RegionID及VPC ID
    • dlf_database_name:必填。DLF元数据库的名称。
    • dlf_endpoint:必填。DLF的Endpoint信息。当前支持的地域对应的Endpoint信息如下:
      • 华东1(杭州):dlf-share.cn-hangzhou.aliyuncs.com
      • 华东2(上海):dlf-share.cn-shanghai.aliyuncs.com
      • 华北2(北京):dlf-share.cn-beijing.aliyuncs.com
      • 华南1(深圳):dlf-share.cn-shenzhen.aliyuncs.com
      • 新加坡:dlf-share.ap-southeast-1.aliyuncs.com
    • ram_role_arn:可选。RAM角色的ARN信息。采用自定义授权方式时,需要配置该参数。该参数还可以通过-D odps.properties.rolearn=<ram_role_arn>方式进行配置。
      说明 如果创建MaxCompute项目的账号和部署DLF、OSS的账号是同一个账号,且使用了一键授权,则不需要指定该参数。
    • oss_endpoint:必填。OSS所属地域的Endpoint。更多OSS Endpoint信息,请参见访问域名和数据中心
    • table_properties:可选。指定External Project中表的相关属性。可根据OSS数据文件格式配置的属性如下:
      属性名称 描述 取值范围
      file_format 指定存储时所使用的文件格式。设置后会忽略input_formatoutput_format配置项。
      • SEQUENCEFILE
      • TEXTFILE
      • RCFILE
      • ORC
      • ORCFILE
      • PARQUET
      • AVRO
      input_format 指定文件存储时所使用的InputFormat对应的类名。
      • SEQUENCEFILE:org.apache.hadoop.mapred.SequenceFileInputFormat
      • TEXTFILE:org.apache.hadoop.mapred.TextInputFormat
      • RCFILE:org.apache.hadoop.hive.ql.io.RCFileInputFormat
      • ORC:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
      • ORCFILE:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
      • PARQUET:org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
      • AVRO:org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat
      • JSON:org.apache.hadoop.mapred.TextInputFormat
      • CSV:org.apache.hadoop.mapred.TextInputFormat
      output_format 指定文件存储时所使用的OutputFormat对应的类名。
      • SEQUENCEFILE:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
      • TEXTFILE:org.apache.hadoop.hive.serde2.OpenCSVSerdeorg.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      • RCFILE:org.apache.hadoop.hive.ql.io.RCFileOutputFormat
      • ORC:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
      • ORCFILE:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
      • PARQUET:org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
      • AVRO:org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat
      • JSON:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      • CSV:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      serialization_lib 指定数据在进行序列化及反序列化时所使用的类名。
      • SEQUENCEFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      • TEXTFILE:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      • RCFILE:org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
      • ORC:org.apache.hadoop.hive.ql.io.orc.OrcSerde
      • ORCFILE:org.apache.hadoop.hive.ql.io.orc.OrcSerde
      • PARQUET:org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
      • AVRO:org.apache.hadoop.hive.serde2.avro.AvroSerDe
      • JSON:org.apache.hive.hcatalog.data.JsonSerDe
      • CSV:org.apache.hadoop.hive.serde2.OpenCSVSerde
      serde_properties 用于给serialization_lib指定相关属性。 无。
    命令示例如下。
    create externalproject
         -source dlf
         -name external_dlf
         -ref doc_test_prod
         -comment "DLF"
         -region "cn-shanghai"
         -db covid_19
         -endpoint "dlf-share.cn-shanghai.aliyuncs.com"
         -ossEndpoint "oss-cn-shanghai-internal.aliyuncs.com";
    说明 更多关于External Project的操作信息,例如更新、删除以及语法示例,请在MaxCompute客户端中执行help external命令查看详细信息。
  2. 执行show tables命令,查看External Project下的表。
    命令示例如下。
    show tables in external_dlf;
    返回结果如下。
    ALIYUN$***@aliyunid.com:sh_rds
  3. 在临时查询界面,查询External Project中的表数据。
    命令示例如下。
    set odps.sql.hive.compatible=true;
    set odps.sql.split.hive.bridge=true;
    select * from external_dlf.sh_rds;
    返回结果如下。运行结果
  4. 通过DMS登录创建的RDS数据库,进入DMS的SQL Console界面后,在命令执行区域,输入SQL语句更新RDS数据库中的数据并单击执行(F8)
    通过DMS登录创建的RDS数据库操作,请参见步骤二更新RDS数据库数据命令示例如下。
    UPDATE `anti_fraud_result` set card="999999"  where transactionid="12489571";
    UPDATE `anti_fraud_result` set card="999999"  where transactionid="12489572";
    UPDATE `anti_fraud_result` set card="999999"  where transactionid="12489573";
    UPDATE `anti_fraud_result` set card="999999"  where transactionid="12489574";
  5. 进入进入临时查询界面,查看数据更新结果。
    命令示例如下。
    set odps.sql.hive.compatible=true;
    set odps.sql.split.hive.bridge=true;
    select * from external_dlf.sh_rds where transactionid="12489541";
    返回结果如下。返回结果