构建基于阿里云大数据产品的实时数据分析平台,实现玩家行为日志的实时处理、分析及通过Quick BI进行数据可视化展示,本文为您介绍具体的操作流程。
背景信息
基于多款阿里云大数据产品构建一个实时数据分析平台,该平台能够收集玩家的行为日志,进行实时处理和分析,并最终将分析结果通过图表的形式展现给业务人员。由DLF提供底层元数据管理和表的数据读写能力,通过EMR Serverless StarRocks实现实时数据处理和分析,最后使用Quick BI完成数据可视化。
前提条件
操作流程
步骤1:载入Notebook案例
找到对应的案例卡片,单击卡片中的载入案例。
选择载入到的工作空间和实例,单击确认,进入DataWorks数据开发页面。
步骤2:参数初始化
# 参数初始化
# 1. 在DLF中创建Catalog,通过DLF控制台页面创建,获取[your_dlf_catalog_id}]
# DLF控制台地址:https://dlf-next.console.aliyun.com/
DLF_CATALOG_ID="[your_dlf_catalog_id]"
# 2.将[your-region]替换为您当前Demo的Region,比如 cn-beijing,cn-hangzhou,cn-shanghai,cn-shenzhen
REGION="[your-region]"
# 切记,一定要执行该脚本,以使得变量生效。
步骤3:创建StarRocks表,用于接收导入的OSS数据
运行以下SQL,创建用户画像(user_profile)与用户行为表(user_event)。
CREATE DATABASE IF NOT EXISTS game_db;
use game_db;
--用户信息表
CREATE TABLE IF NOT EXISTS ods_user_profile (
user_id INT NOT NULL,
registration_date DATE NOT NULL,
last_login_date DATE,
age_group VARCHAR(20),
gender VARCHAR(10),
location VARCHAR(50),
game_hours INT,
favorite_game_mode VARCHAR(20),
play_frequency VARCHAR(20),
device_type VARCHAR(20),
os_version VARCHAR(20),
current_level INT,
total_deaths INT,
active_time VARCHAR(20),
language_preference VARCHAR(10)
)
PRIMARY KEY (user_id)
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
"replication_num" = "1"
);
-- 用户事件表
CREATE TABLE IF NOT EXISTS ods_user_event (
`user_id` INT,
`event_type` STRING,
`timestamp` datetime,
`location` STRING,
`level` INT,
`event_details` STRING
)
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
"replication_num" = "1"
);
步骤4:使用Broker Load将OSS数据导入到StarRocks表中
运行以下SQL,进行数据导入。
use game_db;
--导入新的数据
LOAD LABEL game_db.user_profile_20240902_22
(
DATA INFILE("oss://emr-starrocks-benchmark-resource-${REGION}/sr_game_demo/user_profile/*")
INTO TABLE ods_user_profile
FORMAT AS "parquet"
)
WITH BROKER
(
"fs.oss.endpoint" = "oss-${REGION}-internal.aliyuncs.com"
)
PROPERTIES
(
"timeout" = "3600"
);
LOAD LABEL game_db.user_event_20240902_22
(
DATA INFILE("oss://emr-starrocks-benchmark-resource-${REGION}/sr_game_demo/user_event/*")
INTO TABLE ods_user_event
FORMAT AS "parquet"
)
WITH BROKER
(
"fs.oss.endpoint" = "oss-${REGION}-internal.aliyuncs.com"
)
PROPERTIES
(
"timeout" = "3600"
);
步骤5:即席查询分析玩家留存率
StarRocks是极速的湖仓新范式计算引擎,针对ODS层的海量数据查询整体查询性能极高,有时候一些场景可以直接即席查询ODS表,直接进行日常分析。
USE game_db;
WITH daily_new_users AS (
SELECT
user_id,
registration_date
FROM
ods_user_profile
WHERE
registration_date BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) AND CURRENT_DATE()
),
daily_login_events AS (
SELECT
user_id,
DATE(timestamp) AS login_date
FROM
ods_user_event
WHERE
timestamp BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 31 DAY) AND CURRENT_DATE()
),
retention AS (
SELECT
n.user_id,
n.registration_date,
l.login_date
FROM
daily_new_users n
LEFT JOIN
daily_login_events l ON n.user_id = l.user_id AND l.login_date = DATE_ADD(n.registration_date, INTERVAL 1 DAY)
)
SELECT
registration_date,
COUNT(DISTINCT user_id) AS new_users,
COUNT(DISTINCT CASE WHEN login_date IS NOT NULL THEN user_id END) AS retained_users,
COUNT(DISTINCT CASE WHEN login_date IS NOT NULL THEN user_id END) / COUNT(DISTINCT user_id) * 100.0 AS retention_rate
FROM
retention
GROUP BY
registration_date
ORDER BY
registration_date;
步骤6:使用StarRocks物化视图,自动化构建数仓DWD和ADS层
构建DWD层
为简化逻辑,此处直接将ODS层数据插入DWD。然而,实际情况中应考虑更多业务逻辑的处理。
use game_db;
DROP MATERIALIZED VIEW IF EXISTS dwd_mv_user_profile;
CREATE MATERIALIZED VIEW IF NOT EXISTS dwd_mv_user_profile
DISTRIBUTED BY RANDOM
REFRESH ASYNC EVERY(INTERVAL 1 HOUR) -- 每隔小时刷新一次
AS
SELECT * FROM ods_user_profile;
DROP MATERIALIZED VIEW IF EXISTS dwd_mv_user_event;
CREATE MATERIALIZED VIEW IF NOT EXISTS dwd_mv_user_event
DISTRIBUTED BY RANDOM
REFRESH ASYNC EVERY(INTERVAL 1 HOUR) -- 每隔小时刷新一次
AS
SELECT * FROM ods_user_event;
构建ADS层
use game_db;
--1. 创建ADS_MV_USER_RETENTION (用户留存率)
CREATE MATERIALIZED VIEW IF NOT EXISTS ADS_MV_USER_RETENTION
DISTRIBUTED BY RANDOM
REFRESH ASYNC EVERY(INTERVAL 1 HOUR)
AS
SELECT
DATE_TRUNC('day', registration_date) AS registration_day,
DATE_TRUNC('day', last_login_date) AS last_login_day,
COUNT(DISTINCT user_id) AS users_retained
FROM dwd_mv_user_profile
GROUP BY
DATE_TRUNC('day', registration_date),
DATE_TRUNC('day', last_login_date);
-- 2. ADS_MV_USER_GEOGRAPHIC_DISTRIBUTION (用户地理分布)
CREATE MATERIALIZED VIEW IF NOT EXISTS ADS_MV_USER_GEOGRAPHIC_DISTRIBUTION
DISTRIBUTED BY RANDOM
REFRESH ASYNC EVERY(INTERVAL 1 HOUR)
AS
SELECT
location AS geographic_location,
COUNT(DISTINCT user_id) AS total_users
FROM dwd_mv_user_profile
GROUP BY
location;
-- 3. ADS_MV_USER_DEVICE_PREFERENCE (设备使用习惯)
CREATE MATERIALIZED VIEW IF NOT EXISTS ADS_MV_USER_DEVICE_PREFERENCE
DISTRIBUTED BY RANDOM
REFRESH ASYNC EVERY(INTERVAL 1 HOUR)
AS
SELECT
device_type,
COUNT(DISTINCT user_id) AS total_users
FROM dwd_mv_user_profile
GROUP BY
device_type;
-- 4. ADS_MV_USER_PURCHASE_TRENDS (用户购买趋势)
-- 该视图用于分析玩家每天的购买趋势变化
CREATE MATERIALIZED VIEW IF NOT EXISTS ADS_MV_USER_PURCHASE_TRENDS
DISTRIBUTED BY RANDOM
REFRESH ASYNC EVERY(INTERVAL 1 HOUR)
AS
SELECT
DATE(timestamp) AS purchase_date,
COUNT(user_id) AS daily_purchase_events
FROM dwd_mv_user_event
WHERE event_type = '购买'
GROUP BY
purchase_date
ORDER BY
purchase_date;
步骤7:向数据湖中写入数据(Paimon格式)
在StarRocks中创建External Catalog。
-- myfirstcatalog可以根据您的实际情况调整。 -- DROP CATALOG `myfirstcatalog`; CREATE EXTERNAL CATALOG `myfirstcatalog` PROPERTIES ( "type" = "paimon", "paimon.catalog.type" = "dlf-paimon", "dlf.catalog.id" = "${DLF_CATALOG_ID}" ); -- 如出现:Unexpected exception: Catalog 'myfirstcatalog' doesn't exist,您可以注释掉 -- DROP CATALOG `myfirstcatalog`; 重新执行再试一次。
dlf.catalog.id
为您在数据湖构建控制台中创建的Catalog ID。写数据到数据湖中(Paimon格式)。
CREATE DATABASE IF NOT EXISTS myfirstcatalog.game_db; CREATE TABLE IF NOT EXISTS myfirstcatalog.game_db.ADS_USER_PURCHASE_TRENDS( purchase_date DATE COMMENT '购买日期', daily_purchase_events INT COMMENT '每日购买事件数量' ); -- ADS:ETL加工数据 INSERT INTO myfirstcatalog.game_db.ADS_USER_PURCHASE_TRENDS SELECT * from ADS_MV_USER_PURCHASE_TRENDS;
步骤8:通过Quick BI进行报表分析和展示
通过Quick BI可以直接查询StarRocks中最终ADS层的数据,进行报表页面展示。
登录Quick BI控制台。
配置StarRocks数据源,详情请参见阿里云数据源StarRocks。
创建数据集并分析数据,详情请参见创建并管理数据集。
数据集SQL1:
select * from game_db.ADS_MV_USER_RETENTION;
数据集SQL2:
select * from game_db.ADS_MV_USER_GEOGRAPHIC_DISTRIBUTION;
数据集SQL3:
select * from game_db.ADS_MV_USER_DEVICE_PREFERENCE;
数据集SQL4:
select * from game_db.ADS_MV_USER_PURCHASE_TRENDS;