Hologres基于Delta Lake实现湖仓一体

本文为您介绍Hologres基于Delta Lake实现湖仓一体的背景、架构、环境准备及使用说明等信息。

背景信息

  • Delta Lake是DataBricks公司推出的一种数据湖方案。Delta Lake以数据为中心,围绕数据流走向(数据从流入数据湖、数据组织管理和数据查询到流出数据湖)推出了一系列功能特性,协助您搭配第三方上下游工具,搭建快捷、易用和安全的数据湖。详情请参见Delta Lake概述

  • EMR是阿里云提供的云原生开源大数据平台,向您提供简单易集成的Hadoop、Hive、Spark、Flink等开源大数据计算和存储引擎,便于您使用Hadoop和Spark生态系统中的其他周边系统分析和处理数据。详情请参见什么是E-MapReduce

  • DLF是一款全托管的帮助您构建云上数据湖及Lakehouse的服务,为您提供了统一的元数据管理、统一的权限与安全管理、便捷的数据入湖能力以及一键式数据探索能力。详情请参见DLF产品简介

  • Hologres作为一站式实时数仓,与DLF、EMR无缝集成,打破数据湖与数据仓库割裂的体系,构建完整的湖仓一体解决方案,将数据湖的灵活性、生态丰富性与实时数仓的高性能在线复杂分析、企业级能力相结合,为您提供一站式实时湖仓解决方案。详情请参见OSS数据湖加速

整体架构

本解决方案通过EMR Spark来进行数据加工与处理,元数据存储在DLF中,数据存储在OSS上,Hologres可以利用DLF对OSS元数据的管理能力,对OSS多种格式的湖数据(Hudi、Delta、CSV、Parquet、ORC、SequenceFile)进行加速查询和湖仓融合分析,将数据提供给BI报表、可视化大屏和上层应用进一步消费使用。image

环境准备

数据源准备

说明

此步骤主要针对初次使用EMR或者OSS服务的用户。如您在实际业务中,已经有大量业务数据通过EMR服务写入OSS Bucket,可直接使用DLF元数据抽取功能自动生成元数据信息,供Hologres来查询访问。元数据抽取方式请参见元数据抽取

  1. 开通EMR数据湖集群,选择需要的服务和存储格式,选择DLF来管理元数据。本文以Spark+Hive+Delta为例,开通方式请参见E-MapReduce快速入门image

  2. 开通OSS服务,创建存储空间用于存储数据,详情请参见开通OSS服务

  3. 使用EMR Spark构建数据。

    1. 登录EMR集群,可选择SSH方式登录集群主节点或者免密登录集群Core节点,详情请参见登录集群

    2. 构建TPC-H 100GB测试数据,命令如下。

      说明

      本文的TPC-H的实现基于TPC-H的基准测试,并不能与已发布的TPC-H基准测试结果相比较,本文中的测试并不符合TPC-H基准测试的所有要求。

      # 执行yum update更新所有库
      
      yum update
      
      # 安装 git 和 gcc
      
      yum install git
      yum install gcc
      
      #下载TPC-H数据生成代码
      
      git clone https://github.com/gregrahn/tpch-kit.git
      
      #进入数据生成工具代码目录
      
      cd tpch-kit/dbgen
      
      # 编译数据生成工具代码
      
      make
      
      # 运行如下代码生成数据
      
      ./dbgen -vf -s 100
      
    3. 进入Hive交互界面,创建数据库和表,导入上述生成的数据。

      # 进入hive交互界面
      
      hive
      
      # 创建数据库
      
      CREATE DATABASE IF NOT EXISTS testdb_textfile location 'oss://oss-bucket-dlftest/testdb_textfile';
      
      # 切换至刚创建的数据库
      
      USE testdb_textfile;
      
      # 创建表
      
      CREATE TABLE IF NOT EXISTS nation_textfile (
          n_nationkey integer ,
          n_name char(25) ,
          n_regionkey integer ,
          n_comment varchar(152)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS region_textfile (
          r_regionkey integer ,
          r_name char(25) ,
          r_comment varchar(152)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS part_textfile (
          p_partkey integer  ,
          p_name varchar(55)  ,
          p_mfgr char(25)  ,
          p_brand char(10)  ,
          p_type varchar(25)  ,
          p_size integer  ,
          p_container char(10) ,
          p_retailprice decimal(15,2) ,
          p_comment varchar(23)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS supplier_textfile (
          s_suppkey integer  ,
          s_name char(25)  ,
          s_address varchar(40) ,
          s_nationkey integer  ,
          s_phone char(15) ,
          s_acctbal decimal(15,2) ,
          s_comment varchar(101)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS partsupp_textfile (
          ps_partkey integer ,
          ps_suppkey integer ,
          ps_availqty integer ,
          ps_supplycost decimal(15,2) ,
          ps_comment varchar(199)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS customer_textfile (
          c_custkey integer  ,
          c_name varchar(25)  ,
          c_address varchar(40) ,
          c_nationkey integer  ,
          c_phone char(15)  ,
          c_acctbal decimal(15,2)  ,
          c_mktsegment char(10)  ,
          c_comment varchar(117)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS orders_textfile (
          o_orderkey integer ,
          o_custkey integer ,
          o_orderstatus char(1) ,
          o_totalprice decimal(15,2) ,
          o_orderdate date  ,
          o_orderpriority char(15)  ,
          o_clerk char(15)  ,
          o_shippriority integer  ,
          o_comment varchar(79)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      CREATE TABLE IF NOT EXISTS lineitem_textfile (
          l_orderkey integer  ,
          l_partkey integer  ,
          l_suppkey integer  ,
          l_linenumber integer  ,
          l_quantity decimal(15,2)  ,
          l_extendedprice decimal(15,2)  ,
          l_discount decimal(15,2)  ,
          l_tax decimal(15,2)  ,
          l_returnflag char(1)  ,
          l_linestatus char(1)  ,
          l_shipdate date  ,
          l_commitdate date  ,
          l_receiptdate date  ,
          l_shipinstruct char(25)  ,
          l_shipmode char(10)  ,
          l_comment varchar(44)
      ) ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|';
      
      # 导入数据
      
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/nation.tbl*' OVERWRITE INTO TABLE nation_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/region.tbl*' OVERWRITE INTO TABLE region_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/supplier.tbl*' OVERWRITE INTO TABLE supplier_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/customer.tbl*' OVERWRITE INTO TABLE customer_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/part.tbl*' OVERWRITE INTO TABLE part_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/partsupp.tbl*' OVERWRITE INTO TABLE partsupp_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/orders.tbl*' OVERWRITE INTO TABLE orders_textfile;
      LOAD DATA LOCAL INPATH '${YOUR_PATH}/lineitem.tbl*' OVERWRITE INTO TABLE lineitem_textfile;
      
    4. 输入spark-sql命令进入交互界面,创建数据库和delta格式的表。

      # 进入spark-sql交互界面
      
      spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.delta.mergeSchema=true' --conf 'autoMerge.enable=true' --conf 'spark.sql.parquet.writeLegacyFormat=true'
      
      # 创建数据库
      
      CREATE DATABASE IF NOT EXISTS test_spark_delta LOCATION 'oss://oss-bucket-dlftest/test_spark_delta';
      
      # 切换至刚创建的数据库并创建表
      
      USE test_spark_delta;
      
      CREATE TABLE nation_delta
      USING delta
      AS SELECT * FROM ${SOURCE}.nation_textfile;
      
      CREATE TABLE region_delta
      USING delta
      AS SELECT * FROM ${SOURCE}.region_textfile;
      
      CREATE TABLE supplier_delta
      USING delta
      AS SELECT * FROM ${SOURCE}.supplier_textfile;
      
      CREATE TABLE customer_delta
      USING delta
      partitioned BY (c_mktsegment)
      AS SELECT * FROM ${SOURCE}.customer_textfile;
      
      CREATE TABLE part_delta
      USING delta
      partitioned BY (p_brand)
      AS SELECT * FROM ${SOURCE}.part_textfile;
      
      CREATE TABLE partsupp_delta
      USING delta
      AS SELECT * FROM ${SOURCE}.partsupp_textfile;
      
      CREATE TABLE orders_delta
      USING delta
      partitioned BY (o_orderdate)
      AS SELECT * FROM ${SOURCE}.orders_textfile;
      
      CREATE TABLE lineitem_delta
      USING delta
      partitioned BY (l_shipdate)
      AS SELECT * FROM ${SOURCE}.lineitem_textfile;

Hologres开启数据加速配置

前往Hologres管理控制台,在实例列表页单击对应实例操作列的数据湖加速即可开启。image

使用说明

Hologres的数据湖加速能力,可以满足实际业务中以下两种使用场景,您可以根据业务需要选择合适的场景。

场景一:使用Hologres直接加速查询OSS上的表数据

示例:

-- 创建DLF外部表插件

CREATE EXTENSION IF NOT EXISTS dlf_fdw;

-- 创建外部服务器

CREATE SERVER IF NOT EXISTS dlf_server FOREIGN data wrapper dlf_fdw options 
(
    dlf_region 'cn-beijing',
    dlf_endpoint 'dlf-share.cn-beijing.aliyuncs.com',
    oss_endpoint 'oss-cn-beijing-internal.aliyuncs.com'
);

-- 导入外部表定义

IMPORT FOREIGN SCHEMA "test_spark_delta" LIMIT TO 
(
		customer_delta,
		lineitem_delta,
		nation_delta,
		orders_delta,
		part_delta,
		partsupp_delta,
		region_delta,
		supplier_delta
)
FROM SERVER dlf_server INTO oss_ext_tables options (if_table_exist 'update');

-- 查询表数据,以Q22为例

SELECT
        cntrycode,
        count(*) AS numcust,
        sum(c_acctbal) AS totacctbal
FROM
        (
                SELECT
                        substring(c_phone FROM 1 FOR 2) AS cntrycode,
                        c_acctbal
                FROM
                        customer_delta
                WHERE
                        substring(c_phone FROM 1 FOR 2) IN
                                ('24', '32', '17', '18', '12', '14', '22')
                        AND c_acctbal > (
                                SELECT
                                        avg(c_acctbal)
                                FROM
                                        customer_delta
                                WHERE
                                        c_acctbal > 0.00
                                        AND substring(c_phone FROM 1 FOR 2) IN
                                                ('24', '32', '17', '18', '12', '14', '22')
                        )
                        AND NOT EXISTS (
                                SELECT
                                        *
                                FROM
                                        orders_delta
                                WHERE
                                        o_custkey = c_custkey
                        )
        ) AS custsale
GROUP BY
        cntrycode
ORDER BY
        cntrycode;

返回结果:

+------------+-------------+---------------+
| cntrycode  | numcust     | totacctbal    | 
+------------+-------------+---------------+
| 12         | 90805       | 681136537.68  |
| 14         | 91459       | 685826271.21  |
| 17         | 91313       | 685025263.11  |
| 18         | 91292       | 684588251.63  |
| 22         | 90399       | 677402363.79  |
| 24         | 90635       | 680033065.67  |
| 32         | 90668       | 680459221.16  |
+------------+-------------+---------------+

场景二:导入Hologres标准存储以获取更好的查询性能

Hologres标准存储采用SSD(NVME)硬盘,随机读写性能更好。将OSS外表导入Hologres内部标准存储,可通过创建索引、设置适合的Shard数、选择合适的分布列等手段优化查询性能,以Q2为例,可获得18倍以上的性能提升。详情请参见优化内部表性能化

  • Hologres中创建相同结构的内部表并导入数据。

    示例如下。

    --创建内部表
    
    BEGIN;
    CREATE TABLE region
    (
        R_REGIONKEY INT  NOT NULL PRIMARY KEY,
        R_NAME      TEXT NOT NULL,
        R_COMMENT   TEXT
    );
    CALL set_table_property('region', 'distribution_key', 'R_REGIONKEY');
    CALL set_table_property('region', 'bitmap_columns', 'R_REGIONKEY,R_NAME,R_COMMENT');
    CALL set_table_property('region', 'dictionary_encoding_columns', 'R_NAME,R_COMMENT');
    CALL set_table_property('region', 'time_to_live_in_seconds', '31536000');
    COMMIT;
    
    --导入数据
    
    INSERT INTO public.region SELECT * FROM region_delta ;
    
  • 内表查询结果。

    SELECT
            cntrycode,
            count(*) AS numcust,
            sum(c_acctbal) AS totacctbal
    FROM
            (
                    SELECT
                            substring(c_phone FROM 1 FOR 2) AS cntrycode,
                            c_acctbal
                    FROM
                            customer
                    WHERE
                            substring(c_phone FROM 1 FOR 2) IN
                                    ('24', '32', '17', '18', '12', '14', '22')
                            AND c_acctbal > (
                                    SELECT
                                            avg(c_acctbal)
                                    FROM
                                            customer
                                    WHERE
                                            c_acctbal > 0.00
                                            AND substring(c_phone FROM 1 FOR 2) IN
                                                    ('24', '32', '17', '18', '12', '14', '22')
                            )
                            AND NOT EXISTS (
                                    SELECT
                                            *
                                    FROM
                                            orders
                                    WHERE
                                            o_custkey = c_custkey
                            )
            ) AS custsale
    GROUP BY
            cntrycode
    ORDER BY
            cntrycode;

性能对比

以32 Core独享实例为例,可以看到Hologres内部表查询速度比OSS外部表会高出约100倍:

  • OSS外部表

    • 查询耗时:17.24s。

    • 执行计划:1676023339501-3397ef74-631b-4de2-9bfb-7072ecc4c6de

  • Hologres内部表

    • 查询耗时:106.67ms。

    • 执行计划:1676024468942-f9e1b7c6-9a51-466b-b775-104d234e1338