物化视图加速Paimon数据湖分析

更新时间:

AnalyticDB MySQL可以通过物化视图直接分析存储在OSS上的Paimon数据湖表。

背景与挑战

云原生数据仓库 AnalyticDB MySQL 版支持直接分析存储在 OSS 上的 Paimon 数据湖表,无需数据导入。然而,在实际业务中,高频访问的高价值数据集(如广告点击、用户行为日志)常以 Paimon 表形式存于数据湖。若每次查询都直接扫描原始 Paimon 文件,性能难以满足交互式分析需求。传统做法与弊端:

  • 用 Spark 预处理数据 → 生成 Parquet → 导入 MPP 数仓

  • 架构复杂、延迟高、运维成本大

本文提供一种更高效的方案:在 AnalyticDB for MySQL构建增量物化视图(Incremental Materialized View),实现:

  • 零数据迁移:直接基于 Paimon 表创建物化视图。

  • 自动增量刷新:仅处理新增/变更数据,降低计算开销。

  • 查询加速:物化视图基于列存引擎(XUANWU_V2),查询性能远超直接访问 Paimon。

image

前提条件

构建Paimon数据湖表

步骤一:使用SparkSQL创建Paimon

说明
  • 仅 Paimonappend only(无主键)表能构建增量物化视图。

  • 如已有Paimon表,必须是使用filesystemHive catalog管理元数据。

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

  2. 左侧导航栏选择集群管理 > 资源管理 > 资源组管理,创建Spark引擎的Interactive类型资源组。

    新增以下Spark配置,将spark.sql.catalog.paimon.warehouse替换为实际OSS路径,路径需与后续外表配置一致。

    spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog;
    spark.sql.catalog.paimon.warehouse oss://your-bucket/paimon-warehouse;
    spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
  3. 创建完成后,在左侧导航栏选择作业开发 > SQL开发,在SQLConsole中,选择Spark和上一步创建的Interactive类型资源组。

    -- 创建数据库与表
    USE paimon.default;
    CREATE DATABASE IF NOT EXISTS paimon_db;
    USE paimon_db;
    
    DROP TABLE IF EXISTS user_event_log;
    CREATE TABLE user_event_log (
      day STRING,
      hour STRING,
      event_time STRING,
      city STRING,
      account_id STRING,
      ad_id STRING,
      event_count INT,
      event_name STRING
      )
      USING paimon
      PARTITIONED BY (day, hour)
      TBLPROPERTIES (
      'execution.checkpointing.timeout' = '60 min',
      'file.compression' = 'ZSTD',
      'file.format' = 'Parquet',
      'num-sorted-run.stop-trigger' = '2147483647',
      'snapshot.time-retained' = '2 h',
      'sort-spill-threshold' = '10',
      'spill-compression' = 'ZSTD',
      'write-buffer-size' = '1536 mb',
      'write-buffer-spillable' = 'true'
      );
    重要

    创建后需要在OSS文件中查看,会出现对应的.db文件。

步骤二:模拟数据持续写入

  1. 创建一个.py脚本文件。

    根据实际路径修改warehouse配置,以及spark.sql中插入语句的库表名。

    # 每分钟写入1w条数据
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import (
    col, lit, rand, round, concat, when, array, current_timestamp,
    date_format, current_date, expr, floor
    )
    import datetime
    import time
    
    # 初始化 SparkSession
    spark = SparkSession.builder \
    .appName("Generate Mock Data for paimon_base_table") \
    .config("spark.adb.version", "3.5") \
    .config("spark.sql.extensions", "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") \
    .config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog") \
    .config("spark.sql.catalog.paimon.warehouse", "oss://your-bucket/paimon-warehouse") \
    .getOrCreate()
    
    # 10次数据写入,每次写入1w行
    for i in range(10):
      print(f"开始第 {i + 1} 次数据写入")
      time_offset_expr = expr("current_timestamp() - interval 1 minutes")
      # 构造随机数据的 DataFrame
      # range(0, 10000)表示生成1w行
      mock_data = spark.range(0, 10000).select(
        lit(date_format(current_date(), "yyyy-MM-dd")).alias("day"),
        lit(date_format(current_timestamp(), "HH")).alias("hour"),
        time_offset_expr.alias("event_time"),
        array(lit("HZ"), lit("SH"))[floor(rand() * 2)].alias("city"),
        concat(lit("acct_"), floor(rand() * 25)).alias("account_id"),
        concat(lit("ad_"), floor(rand() * 25)).alias("ad_id"),
        floor(rand() * 1000).alias("event_count"),
        lit(400 + floor(rand() * 101)).alias("event_name")
      )
      # 注册临时表
      mock_data.createOrReplaceTempView("temp_mock_data")
      # 执行插入操作
      spark.sql("INSERT INTO paimon.paimon_db.user_event_log SELECT * FROM temp_mock_data")
      time.sleep(60)
    可以使用notebook运行上面的脚本代码。如果使用Spark Jar开发提交该脚本,请继续以下步骤的操作。
  2. 将脚本上传至OSS,详情请参见简单上传

  3. Maven仓库中下载 paimon-spark JAR包,上传至OSS。

  4. 在左侧导航栏选择作业开发 > Spark Jar开发,选择Spark引擎的Job类型资源组,Batch模式运行以下作业。

    根据实际情况替换对应的filejars的路径地址。

    {
     "name": "Spark Python Test",
     "file": "oss://your-bucket/paimon-warehouse/write_auto.py",
     "jars": ["oss://your-bucket/paimon-warehouse/paimon-spark-3.5-1.1.0.jar"],
     "conf": {
     "spark.driver.resourceSpec": "large",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "large",
     "spark.adb.version": "3.5"
     }
    }
    运行脚本写入数据需要一定时间,可以继续进行下面的步骤。

基于Paimon表构建物化视图

AnalyticDB for MySQL需通过外部表映射 Paimon 数据。

  1. 创建外部数据库

    在左侧导航栏选择作业开发 > SQL开发,在SQLConsole中,选择XIHE引擎的Interactive类型资源组。

    • 外部数据库名必须与 Spark 中创建的库名一致(如 paimon_db)。

    • location路径格式为:oss://<bucket>/<warehouse>/<db_name>.db/

    CREATE EXTERNAL DATABASE IF NOT EXISTS paimon_db
    WITH DBPROPERTIES (
      catalog = 'paimon',
      adb.paimon.warehouse = 'oss://your-bucket/paimon-warehouse/',
      location = 'oss://your-bucket/paimon-warehouse/paimon_db.db/'
    );
  2. 创建外部表

    USE paimon_db;
    
    -- 创建外表
    DROP TABLE IF EXISTS user_event_log;
    CREATE TABLE user_event_log (
      `day`        STRING,
      `hour`       STRING,
      event_time   STRING,
      city         STRING,
      account_id   STRING,
      ad_id        STRING,
      event_count  INT,
      event_name   STRING
    )
    STORED AS paimon
    LOCATION 'oss://your-bucket/paimon-warehouse/paimon_db.db/user_event_log/';
  3. 验证查询一致性

    本文提供了三种常见的查询场景,可以验证Query语义一致的情况,查询结果一致,且性能更优。

    注意查询条件变更,脚本生成数据的日期是运行当日的本地时间。

    单表聚合查询

    SELECT
        coalesce(`day`, '') AS `day`,
        coalesce(account_id, '') AS account_id,
        coalesce(event_time, '') AS event_time,
        coalesce(city, '') AS city,
        coalesce(ad_id, '') AS ad_id,
        SUM(CASE WHEN event_name = '401' THEN event_count ELSE 0 END) AS count_401,
        SUM(CASE WHEN event_name = '402' THEN event_count ELSE 0 END) AS count_402,
        SUM(CASE WHEN event_name = '403' THEN event_count ELSE 0 END) AS count_403,
        SUM(CASE WHEN event_name = '404' THEN event_count ELSE 0 END) AS count_404,
        SUM(CASE WHEN event_name = '405' THEN event_count ELSE 0 END) AS count_405,
        SUM(CASE WHEN event_name = '406' THEN event_count ELSE 0 END) AS count_406,
        SUM(CASE WHEN event_name = '407' THEN event_count ELSE 0 END) AS count_407,
        SUM(CASE WHEN event_name = '408' THEN event_count ELSE 0 END) AS count_408,
        SUM(CASE WHEN event_name = '409' THEN event_count ELSE 0 END) AS count_409,
        SUM(CASE WHEN event_name = '410' THEN event_count ELSE 0 END) AS count_410
    FROM user_event_log
    WHERE day='2025-10-30'
    GROUP BY 1,2,3,4,5
    LIMIT 1000;

    Top N排序查询

    SELECT
        coalesce(`day`, '') AS `day`,
        coalesce(account_id, '') AS account_id,
        coalesce(event_time, '') AS event_time,
        coalesce(city, '') AS city,
        coalesce(ad_id, '') AS ad_id,
        SUM(CASE WHEN event_name = '401' THEN event_count ELSE 0 END) AS count_401,
        SUM(CASE WHEN event_name = '402' THEN event_count ELSE 0 END) AS count_402,
        SUM(CASE WHEN event_name = '403' THEN event_count ELSE 0 END) AS count_403,
        SUM(CASE WHEN event_name = '404' THEN event_count ELSE 0 END) AS count_404,
        SUM(CASE WHEN event_name = '405' THEN event_count ELSE 0 END) AS count_405,
        SUM(CASE WHEN event_name = '406' THEN event_count ELSE 0 END) AS count_406,
        SUM(CASE WHEN event_name = '407' THEN event_count ELSE 0 END) AS count_407,
        SUM(CASE WHEN event_name = '408' THEN event_count ELSE 0 END) AS count_408,
        SUM(CASE WHEN event_name = '409' THEN event_count ELSE 0 END) AS count_409,
        SUM(CASE WHEN event_name = '410' THEN event_count ELSE 0 END) AS count_410
    FROM user_event_log
    WHERE day='2025-10-30'
    GROUP BY 1,2,3,4,5
    ORDER BY 1,2,3,4,5
    limit 1000000;

    filter圈选

    SELECT
        coalesce(`day`, '') AS `day`,
        coalesce(account_id, '') AS account_id,
        coalesce(event_time, '') AS event_time,
        coalesce(city, '') AS city,
        coalesce(ad_id, '') AS ad_id,
        SUM(CASE WHEN event_name = '401' THEN event_count ELSE 0 END) AS count_401,
        SUM(CASE WHEN event_name = '402' THEN event_count ELSE 0 END) AS count_402,
        SUM(CASE WHEN event_name = '403' THEN event_count ELSE 0 END) AS count_403,
        SUM(CASE WHEN event_name = '404' THEN event_count ELSE 0 END) AS count_404,
        SUM(CASE WHEN event_name = '405' THEN event_count ELSE 0 END) AS count_405,
        SUM(CASE WHEN event_name = '406' THEN event_count ELSE 0 END) AS count_406,
        SUM(CASE WHEN event_name = '407' THEN event_count ELSE 0 END) AS count_407,
        SUM(CASE WHEN event_name = '408' THEN event_count ELSE 0 END) AS count_408,
        SUM(CASE WHEN event_name = '409' THEN event_count ELSE 0 END) AS count_409,
        SUM(CASE WHEN event_name = '410' THEN event_count ELSE 0 END) AS count_410
    FROM user_event_log
    WHERE day='2025-10-30'
       and account_id = 'acct_3' 
       and ad_id = 'ad_15'
    GROUP BY 1,2,3,4,5
    ORDER BY 1,2,3,4,5

构建增量物化视图(核心加速方案)

  1. 创建资源组。

    左侧导航栏选择集群管理 > 资源管理 > 资源组管理,新增资源组。

    说明

    需要选择Xihe引擎,使用Interactive类型资源组规格 256 ACU,用于运行刷新任务。

  2. 启用增量刷新能力。

    返回SQL开发,选择使用新增的资源组,启用增量刷新。

    set adb_config O_MV_IVM_V2_ENABLED = true;
    set adb_config MV_FAST_REFRESH_PLAN_CACHE_ENABLED=false;
  3. 创建物化视图

    1. 创建原生数据库

      create database paimon_mv;
    2. 在左侧数据库目录刷新后,选择创建好的paimon_mv库。

      CREATE MATERIALIZED VIEW paimon_test_view (
          PRIMARY KEY (
            `day`,
            account_id,
            event_time,
            city,
            ad_id
        )
      )
      INDEX_ALL='N' 
      ENGINE='XUANWU_V2' 
      COMPRESSION='lz4' 
      TABLE_PROPERTIES='{"format":"columnstore"}' 
      -- define your interactive resource group name in MV_PROPERTIES
      MV_PROPERTIES='{"mv_resource_group":"paimon_refresh"}'
      -- 表示此物化表会在创建后每五分钟增量刷新一次。
      REFRESH FAST NEXT now() + interval 5 minute
      AS SELECT
          coalesce(`day`, '') AS `day`,
          coalesce(account_id, '') AS account_id,
          coalesce(event_time, '') AS event_time,
          coalesce(city, '') AS city,
          coalesce(ad_id, '') AS ad_id,
          SUM(CASE WHEN event_name = '401' THEN event_count ELSE 0 END) AS count_401,
          SUM(CASE WHEN event_name = '402' THEN event_count ELSE 0 END) AS count_402,
          SUM(CASE WHEN event_name = '403' THEN event_count ELSE 0 END) AS count_403,
          SUM(CASE WHEN event_name = '404' THEN event_count ELSE 0 END) AS count_404,
          SUM(CASE WHEN event_name = '405' THEN event_count ELSE 0 END) AS count_405,
          SUM(CASE WHEN event_name = '406' THEN event_count ELSE 0 END) AS count_406,
          SUM(CASE WHEN event_name = '407' THEN event_count ELSE 0 END) AS count_407,
          SUM(CASE WHEN event_name = '408' THEN event_count ELSE 0 END) AS count_408,
          SUM(CASE WHEN event_name = '409' THEN event_count ELSE 0 END) AS count_409,
          SUM(CASE WHEN event_name = '410' THEN event_count ELSE 0 END) AS count_410
      FROM paimon_db.user_event_log
      GROUP BY 1,2,3,4,5;
  4. 监控物化视图刷新状态。

    -- 获取最新刷新任务的 process_id
    SELECT process_id
    FROM INFORMATION_SCHEMA.mv_auto_refresh_jobs
    WHERE mv_name = 'user_event_mv' AND mv_schema = 'paimon_mv'
    ORDER BY job_id DESC
    LIMIT 1;

    process_id输入控制台 SQL 诊断优化页面,可查看扫描数据,ETL写表行数,以此判断增量物化视图的刷新吞吐。

    image