开源湖仓
概述
在数字化运营时代,游戏公司亟需对玩家行为进行精细化分析,以提升留存、付费与活跃度。传统数仓架构存在成本高、扩展难、多引擎割裂等问题。
本方案基于 阿里云 OpenLake 开源湖仓体系,结合以下核心技术组件:
EMR Serverless Spark:无服务器化 Spark 引擎,用于高效 ETL。
Paimon:流批一体的湖存储格式,支持 ACID、Schema Evolution、Time Travel。
DLF(Data Lake Formation):统一元数据管理,打通 Spark、StarRocks、Flink 等引擎。
StarRocks:极速统一分析引擎,支持高并发点查与复杂 OLAP。
通过该方案,您可实现:
从 OSS 原始日志到 DWD/ADS 分层建模。
利用 Spark 进行灵活、强大的批处理 ETL。
将加工结果写入 Paimon 湖表,供多引擎共享。
通过 StarRocks 直接查询湖表或内表,支撑 BI 报表与即席分析。
架构图

前提条件
请确保已完成以下准备工作:
产品 | 操作 |
EMR Serverless | 已创建 Spark 计算集群,并绑定 DLF 权限 |
DLF | 已开通服务,在目标 Region 创建 Paimon Catalog,获取 |
StarRocks | 已部署 EMR Serverless StarRocks 实例(可选,若仅用 Spark + Paimon 可不依赖) |
DataWorks | 用于运行 Spark ETL 脚本 |
操作步骤
步骤 1:配置环境变量(在 Notebook 中执行)
%emr_serverless_spark
DLF_CATALOG_ID = "clg-paimon-e62c8d1e8fa04ee097be4870af155296" # ← 替换为您的 DLF Catalog ID
REGION = "cn-hangzhou" # ← 替换为您的 Region
print(f"DLF Catalog ID: {DLF_CATALOG_ID}")
print(f"Region: {REGION}")步骤 2:初始化 Spark Session 并验证数据源
from pyspark.sql import SparkSession
OSS_PUBLIC_BUCKET = f"emr-starrocks-benchmark-resource-{REGION}"
PROFILE_SRC_GLOB = f"oss://{OSS_PUBLIC_BUCKET}/sr_game_demo_v2/user_profile/*.parquet"
EVENT_SRC_GLOB = f"oss://{OSS_PUBLIC_BUCKET}/sr_game_demo_v2/user_event/*.parquet"
spark = (
SparkSession.builder
.appName("DLF-Paimon-Ingest-sr_game_demo_v2")
.config("spark.dlf.catalog.id", DLF_CATALOG_ID)
.config("spark.dlf.region", REGION)
.config("spark.hadoop.fs.oss.endpoint", f"oss-{REGION}-internal.aliyuncs.com")
.enableHiveSupport()
.getOrCreate()
)
# 验证文件是否存在
def glob_count(path_glob: str) -> int:
from py4j.java_gateway import java_import
jvm = spark._jvm
hconf = spark.sparkContext._jsc.hadoopConfiguration()
Path = jvm.org.apache.hadoop.fs.Path
p = Path(path_glob)
fs = p.getFileSystem(hconf)
stats = fs.globStatus(p)
return 0 if stats is None else len(stats)
print("profile matched:", glob_count(PROFILE_SRC_GLOB))
print("event matched: ", glob_count(EVENT_SRC_GLOB))
步骤 3:读取原始数据并写入 Paimon ODS 层
优势:Paimon 自动管理文件合并、索引、Schema 演进,无需手动维护 Parquet 分区
df_profile = spark.read.parquet(PROFILE_SRC_GLOB)
df_event = spark.read.parquet(EVENT_SRC_GLOB)
spark.sql("CREATE DATABASE IF NOT EXISTS game_db")
# 写入 Paimon 表(ODS 层)
(df_profile.write
.format("paimon")
.mode("overwrite")
.saveAsTable("game_db.ods_user_profile"))
(df_event.write
.format("paimon")
.mode("overwrite")
.saveAsTable("game_db.ods_user_event"))
步骤 4:构建 DWD 明细数据层(Spark SQL ETL)
4.1 用户明细表 dwd_user_details
DROP TABLE IF EXISTS game_db.dwd_user_details;CREATE TABLE game_db.dwd_user_details
USING paimon
AS
WITH p AS (
SELECT
user_id,
decode(gender, 'UTF-8') AS gender,
decode(os_version, 'UTF-8') AS os_version,
CAST(decode(current_level, 'UTF-8') AS INT) AS current_level,
decode(device_type, 'UTF-8') AS device_type,
TO_DATE(decode(last_login_date, 'UTF-8')) AS last_login_date,
decode(favorite_game_mode, 'UTF-8') AS favorite_game_mode,
decode(language_preference, 'UTF-8') AS language_preference,
decode(active_time, 'UTF-8') AS active_time,
TO_DATE(decode(registration_date, 'UTF-8')) AS registration_date,
CAST(decode(total_deaths, 'UTF-8') AS INT) AS total_deaths,
CAST(decode(game_hours, 'UTF-8') AS DOUBLE) AS game_hours,
decode(location, 'UTF-8') AS location,
decode(play_frequency, 'UTF-8') AS play_frequency,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY TO_DATE(decode(last_login_date, 'UTF-8')) DESC NULLS LAST
) AS rn
FROM game_db.ods_user_profile
)
SELECT * EXCEPT (rn)
FROM p
WHERE rn = 1; -- 去重:每个 user_id 仅保留 last_login_date 最新的记录4.2 用户事件明细表 dwd_user_event
DROP TABLE IF EXISTS game_db.dwd_user_event;CREATE TABLE game_db.dwd_user_event
USING paimon
AS
WITH e AS (
SELECT
user_id,
LOWER(decode(event_type, 'UTF-8')) AS event_type,
TRIM(decode(timestamp, 'UTF-8')) AS ts_str,
decode(event_details, 'UTF-8') AS event_details,
decode(location, 'UTF-8') AS event_location
FROM game_db.ods_user_event
),
e_ts AS (
SELECT *,
CASE
WHEN ts_str RLIKE '^[0-9]{13}$' THEN TO_TIMESTAMP(FROM_UNIXTIME(CAST(ts_str AS BIGINT)/1000))
WHEN ts_str RLIKE '^[0-9]{10}$' THEN TO_TIMESTAMP(FROM_UNIXTIME(CAST(ts_str AS BIGINT)))
ELSE TO_TIMESTAMP(ts_str)
END AS event_ts
FROM e
)
SELECT
e.user_id,
e.event_ts,
TO_DATE(e.event_ts) AS event_date,
e.event_type,
COALESCE(NULLIF(e.event_location,''), d.location) AS location,
-- 从 JSON 提取金额
COALESCE(CAST(get_json_object(event_details, '$.amount') AS DOUBLE), 0.0) AS amount,
d.gender, d.device_type
FROM e_ts e
LEFT JOIN game_db.dwd_user_details d ON e.user_id = d.user_id
WHERE e.event_ts IS NOT NULL;步骤 5:构建 ADS 应用数据层(指标聚合)
5.1 日留存率表 ads_retention_daily
DROP TABLE IF EXISTS game_db.ads_retention_daily;CREATE TABLE game_db.ads_retention_daily
USING paimon
AS
WITH dau AS (
SELECT event_date AS dt, user_id
FROM game_db.dwd_user_event
GROUP BY event_date, user_id
)
SELECT
base.dt,
base.dau,
COALESCE(d1.d1_retained, 0) AS d1_retained,
ROUND(COALESCE(d1.d1_retained,0) * 1.0 / base.dau, 4) AS d1_retention_rate,
COALESCE(d7.d7_retained, 0) AS d7_retained,
ROUND(COALESCE(d7.d7_retained,0) * 1.0 / base.dau, 4) AS d7_retention_rate
FROM (
SELECT dt, COUNT(DISTINCT user_id) AS dau FROM dau GROUP BY dt
) base
LEFT JOIN (
SELECT a.dt, COUNT(DISTINCT a.user_id) AS d1_retained
FROM dau a JOIN dau b ON a.user_id = b.user_id AND b.dt = DATE_ADD(a.dt, 1)
GROUP BY a.dt
) d1 ON base.dt = d1.dt
LEFT JOIN (
SELECT a.dt, COUNT(DISTINCT a.user_id) AS d7_retained
FROM dau a JOIN dau b ON a.user_id = b.user_id AND b.dt = DATE_ADD(a.dt, 7)
GROUP BY a.dt
) d7 ON base.dt = d7.dt;5.2 其他 ADS 表(略,参考原代码)
ads_purchase_trends_daily:每日 GMV、付费用户数ads_device_preference_daily:设备分布及占比ads_region_distribution_daily:省市 DAU 分布
步骤 6:对接 StarRocks
创建 Starrocks 节点 直接查询湖表。
-- 直接查询湖表
SELECT * FROM paimon_catalog.game_db.ads_retention_daily LIMIT 10;
步骤 7:可视化(Quick BI)
在 Quick BI 控制台 创建 StarRocks 数据源。
新建数据集,SQL 如下:
数据集SQL1:
SELECT * FROM game_db.ADS_MV_USER_RETENTION;数据集SQL2:
SELECT * FROM game_db.ADS_MV_USER_GEOGRAPHIC_DISTRIBUTION;数据集SQL3:
SELECT * FROM game_db.ADS_MV_USER_DEVICE_PREFERENCE;数据集SQL4:
SELECT * FROM game_db.ADS_MV_USER_PURCHASE_TRENDS;
制作折线图、地图、仪表盘等,监控核心指标
方案优势总结
维度 | 传统方案 | 本方案 |
存储成本 | HDFS 成本高 | OSS 低成本 + Paimon 自动小文件合并 |
计算弹性 | 需常驻集群 | EMR Serverless 按需计费 |
数据一致性 | 手动维护分区/版本 | Paimon ACID + Time Travel |
多引擎协同 | 数据孤岛 | DLF 统一元数据,Spark/Flink/StarRocks 共享 |
开发效率 | 复杂调度依赖 | Notebook 一站式 ETL + SQL 建模 |