开源湖仓

更新时间:
复制为 MD 格式

概述

在数字化运营时代,游戏公司亟需对玩家行为进行精细化分析,以提升留存、付费与活跃度。传统数仓架构存在成本高、扩展难、多引擎割裂等问题。

本方案基于 阿里云 OpenLake 开源湖仓体系,结合以下核心技术组件:

  • EMR Serverless Spark:无服务器化 Spark 引擎,用于高效 ETL。

  • Paimon:流批一体的湖存储格式,支持 ACID、Schema Evolution、Time Travel。

  • DLF(Data Lake Formation):统一元数据管理,打通 Spark、StarRocks、Flink 等引擎。

  • StarRocks:极速统一分析引擎,支持高并发点查与复杂 OLAP。

通过该方案,您可实现:

  • 从 OSS 原始日志到 DWD/ADS 分层建模。

  • 利用 Spark 进行灵活、强大的批处理 ETL。

  • 将加工结果写入 Paimon 湖表,供多引擎共享。

  • 通过 StarRocks 直接查询湖表或内表,支撑 BI 报表与即席分析。

架构图

image.png

前提条件

请确保已完成以下准备工作:

产品

操作

EMR Serverless

已创建 Spark 计算集群,并绑定 DLF 权限

DLF

已开通服务,在目标 Region 创建 Paimon Catalog,获取 Catalog ID

StarRocks

已部署 EMR Serverless StarRocks 实例(可选,若仅用 Spark + Paimon 可不依赖)

DataWorks

用于运行 Spark ETL 脚本

操作步骤

步骤 1:配置环境变量(在 Notebook 中执行)

%emr_serverless_spark

DLF_CATALOG_ID = "clg-paimon-e62c8d1e8fa04ee097be4870af155296"  # ← 替换为您的 DLF Catalog ID
REGION = "cn-hangzhou"                                           # ← 替换为您的 Region

print(f"DLF Catalog ID: {DLF_CATALOG_ID}")
print(f"Region: {REGION}")

步骤 2:初始化 Spark Session 并验证数据源

from pyspark.sql import SparkSession

OSS_PUBLIC_BUCKET = f"emr-starrocks-benchmark-resource-{REGION}"
PROFILE_SRC_GLOB = f"oss://{OSS_PUBLIC_BUCKET}/sr_game_demo_v2/user_profile/*.parquet"
EVENT_SRC_GLOB   = f"oss://{OSS_PUBLIC_BUCKET}/sr_game_demo_v2/user_event/*.parquet"

spark = (
    SparkSession.builder
    .appName("DLF-Paimon-Ingest-sr_game_demo_v2")
    .config("spark.dlf.catalog.id", DLF_CATALOG_ID)
    .config("spark.dlf.region", REGION)
    .config("spark.hadoop.fs.oss.endpoint", f"oss-{REGION}-internal.aliyuncs.com")
    .enableHiveSupport()
    .getOrCreate()
)

# 验证文件是否存在
def glob_count(path_glob: str) -> int:
    from py4j.java_gateway import java_import
    jvm = spark._jvm
    hconf = spark.sparkContext._jsc.hadoopConfiguration()
    Path = jvm.org.apache.hadoop.fs.Path
    p = Path(path_glob)
    fs = p.getFileSystem(hconf)
    stats = fs.globStatus(p)
    return 0 if stats is None else len(stats)

print("profile matched:", glob_count(PROFILE_SRC_GLOB))
print("event matched:  ", glob_count(EVENT_SRC_GLOB))

步骤 3:读取原始数据并写入 Paimon ODS 层

优势:Paimon 自动管理文件合并、索引、Schema 演进,无需手动维护 Parquet 分区
df_profile = spark.read.parquet(PROFILE_SRC_GLOB)
df_event   = spark.read.parquet(EVENT_SRC_GLOB)

spark.sql("CREATE DATABASE IF NOT EXISTS game_db")

# 写入 Paimon 表(ODS 层)
(df_profile.write
    .format("paimon")
    .mode("overwrite")
    .saveAsTable("game_db.ods_user_profile"))

(df_event.write
    .format("paimon")
    .mode("overwrite")
    .saveAsTable("game_db.ods_user_event"))

步骤 4:构建 DWD 明细数据层(Spark SQL ETL)

4.1 用户明细表 dwd_user_details

DROP TABLE IF EXISTS game_db.dwd_user_details;
CREATE TABLE game_db.dwd_user_details
USING paimon
AS
WITH p AS (
  SELECT
    user_id,
    decode(gender, 'UTF-8') AS gender,
    decode(os_version, 'UTF-8') AS os_version,
    CAST(decode(current_level, 'UTF-8') AS INT) AS current_level,
    decode(device_type, 'UTF-8') AS device_type,
    TO_DATE(decode(last_login_date, 'UTF-8')) AS last_login_date,
    decode(favorite_game_mode, 'UTF-8') AS favorite_game_mode,
    decode(language_preference, 'UTF-8') AS language_preference,
    decode(active_time, 'UTF-8') AS active_time,
    TO_DATE(decode(registration_date, 'UTF-8')) AS registration_date,
    CAST(decode(total_deaths, 'UTF-8') AS INT) AS total_deaths,
    CAST(decode(game_hours, 'UTF-8') AS DOUBLE) AS game_hours,
    decode(location, 'UTF-8') AS location,
    decode(play_frequency, 'UTF-8') AS play_frequency,
    ROW_NUMBER() OVER (
      PARTITION BY user_id
      ORDER BY TO_DATE(decode(last_login_date, 'UTF-8')) DESC NULLS LAST
    ) AS rn
  FROM game_db.ods_user_profile
)
SELECT * EXCEPT (rn)
FROM p
WHERE rn = 1;  -- 去重:每个 user_id 仅保留 last_login_date 最新的记录

4.2 用户事件明细表 dwd_user_event

DROP TABLE IF EXISTS game_db.dwd_user_event;
CREATE TABLE game_db.dwd_user_event
USING paimon
AS
WITH e AS (
  SELECT
    user_id,
    LOWER(decode(event_type, 'UTF-8')) AS event_type,
    TRIM(decode(timestamp, 'UTF-8')) AS ts_str,
    decode(event_details, 'UTF-8') AS event_details,
    decode(location, 'UTF-8') AS event_location
  FROM game_db.ods_user_event
),
e_ts AS (
  SELECT *,
    CASE
      WHEN ts_str RLIKE '^[0-9]{13}$' THEN TO_TIMESTAMP(FROM_UNIXTIME(CAST(ts_str AS BIGINT)/1000))
      WHEN ts_str RLIKE '^[0-9]{10}$' THEN TO_TIMESTAMP(FROM_UNIXTIME(CAST(ts_str AS BIGINT)))
      ELSE TO_TIMESTAMP(ts_str)
    END AS event_ts
  FROM e
)
SELECT
  e.user_id,
  e.event_ts,
  TO_DATE(e.event_ts) AS event_date,
  e.event_type,
  COALESCE(NULLIF(e.event_location,''), d.location) AS location,
  -- 从 JSON 提取金额
  COALESCE(CAST(get_json_object(event_details, '$.amount') AS DOUBLE), 0.0) AS amount,
  d.gender, d.device_type
FROM e_ts e
LEFT JOIN game_db.dwd_user_details d ON e.user_id = d.user_id
WHERE e.event_ts IS NOT NULL;

步骤 5:构建 ADS 应用数据层(指标聚合)

5.1 日留存率表 ads_retention_daily

DROP TABLE IF EXISTS game_db.ads_retention_daily;
CREATE TABLE game_db.ads_retention_daily
USING paimon
AS
WITH dau AS (
  SELECT event_date AS dt, user_id
  FROM game_db.dwd_user_event
  GROUP BY event_date, user_id
)
SELECT
  base.dt,
  base.dau,
  COALESCE(d1.d1_retained, 0) AS d1_retained,
  ROUND(COALESCE(d1.d1_retained,0) * 1.0 / base.dau, 4) AS d1_retention_rate,
  COALESCE(d7.d7_retained, 0) AS d7_retained,
  ROUND(COALESCE(d7.d7_retained,0) * 1.0 / base.dau, 4) AS d7_retention_rate
FROM (
  SELECT dt, COUNT(DISTINCT user_id) AS dau FROM dau GROUP BY dt
) base
LEFT JOIN (
  SELECT a.dt, COUNT(DISTINCT a.user_id) AS d1_retained
  FROM dau a JOIN dau b ON a.user_id = b.user_id AND b.dt = DATE_ADD(a.dt, 1)
  GROUP BY a.dt
) d1 ON base.dt = d1.dt
LEFT JOIN (
  SELECT a.dt, COUNT(DISTINCT a.user_id) AS d7_retained
  FROM dau a JOIN dau b ON a.user_id = b.user_id AND b.dt = DATE_ADD(a.dt, 7)
  GROUP BY a.dt
) d7 ON base.dt = d7.dt;

5.2 其他 ADS 表(略,参考原代码)

  • ads_purchase_trends_daily:每日 GMV、付费用户数

  • ads_device_preference_daily:设备分布及占比

  • ads_region_distribution_daily:省市 DAU 分布

步骤 6:对接 StarRocks

创建 Starrocks 节点 直接查询湖表。

-- 直接查询湖表
SELECT * FROM paimon_catalog.game_db.ads_retention_daily LIMIT 10;

image.png

步骤 7:可视化(Quick BI)

  1. Quick BI 控制台 创建 StarRocks 数据源。

  2. 新建数据集,SQL 如下:

    1. 数据集SQL1:SELECT * FROM game_db.ADS_MV_USER_RETENTION;

    2. 数据集SQL2:SELECT * FROM game_db.ADS_MV_USER_GEOGRAPHIC_DISTRIBUTION;

    3. 数据集SQL3:SELECT * FROM game_db.ADS_MV_USER_DEVICE_PREFERENCE;

    4. 数据集SQL4:SELECT * FROM game_db.ADS_MV_USER_PURCHASE_TRENDS;

  3. 制作折线图、地图、仪表盘等,监控核心指标

方案优势总结

维度

传统方案

本方案

存储成本

HDFS 成本高

OSS 低成本 + Paimon 自动小文件合并

计算弹性

需常驻集群

EMR Serverless 按需计费

数据一致性

手动维护分区/版本

Paimon ACID + Time Travel

多引擎协同

数据孤岛

DLF 统一元数据,Spark/Flink/StarRocks 共享

开发效率

复杂调度依赖

Notebook 一站式 ETL + SQL 建模