用户行为数据通常具有庞大的数据量,存储成本较高,且缺乏统一的格式,导致处理难度较大。常用的宽表模型虽查询效率高,但冗余度高、存储空间大、维护复杂,更新慢。本文基于Flink+MongoDB+Hologres更好地实现宽表的数据分析,以游戏行业的用户行为数据分析为示例,构建用户行为数据宽表进行数据分析的方案。
方案架构和优势
架构
实时计算Flink版是强大的流式计算引擎,支持对海量实时数据高效处理。云数据库MongoDB版是一种文档型的NoSQL数据库,具有数据存储结构灵活、读写性能高、支持复杂的查询条件等特点。实时数仓Hologres是一站式实时数仓,支持数据实时写入与更新,实时数据写入即可查。三者结合相辅相成,能够将复杂多变的数据进行联合计算处理,实现对大型数据即时更新、查询与分析。本文的示例方案架构如下:
MongoDB的业务数据更新后,将数据主键通过消息队列Kafka采用Upsert的方式更新写入。
无论是事实表或维度表的更新,都会将所影响的数据主键通过Flink写入消息队列Kafka。
Flink通过消费消息队列Kafka中的主键信息,根据主键获取MongoDB的完整的业务数据后更新写入到Hologres。
优势
该方案有如下优势:
云数据库MongoDB版适用于高并发读写的场景的分片集群架构,针对高频写入的海量数据,可无限扩展性能及存储空间,解决写入效率低和存储空间不足的问题。
无论是事实表还是维度表的更新,都会让结果表的最新数据进行更新。这一过程确保了数据更新的及时性,并且在处理海量数据时,仅对发生变更的数据进行更新,从而有效解决了更新效率低下的问题。
实时数仓Hologres支持高效更新与修正,写入即可查,由其对外提供宽表模型的数据查询,有效提升数据查询与分析的效率。
实践场景
本文以某游戏厂商为例,实现了将用户在平台上购买游戏的行为数据进行实时预处理并写入Hologres即时查询的业务场景。
game_sales(游戏销售表)
sale_id(销售ID) | game_id(游戏ID) | platform_id(平台ID) | sale_date(销售日期) | units_sold(销售量) | sale_amt(销售金额) |
game_dimension(游戏维度表)
game_id(游戏ID) | game_name(游戏名称) | release_date(发售日期) | developer(开发者) | publisher(发行商) |
gameplatform_dimension(平台维度表)
platform_id(平台ID) | platform_name(平台名称) | type(终端类型) |
game_sales_details(游戏销售明细表)
sale_id(销售ID) | game_id(游戏ID) | platform_id(平台ID) | sale_date(销售日期) | units_sold(销售量) | sale_amt(销售金额) |
game_name(游戏名称) | release_date(发售日期) | developer(开发者) | publisher(发行商) | platform_name(平台名称) | type(终端类型) |
MongoDB业务数据更新:
业务数据更新,无论是游戏销售表或维度表更新,都将同步更新游戏销售明细表数据。
Kafka数据更新记录:
游戏销售表或维度表更新,其对应所影响数据的主键将更新到消息队列。
Hologres更新:
通过消费消息队列的主键信息,将其与三张表相互关联,从而得到完整的用户行为数据以更新游戏销售明细表。
前提条件
已开通Flink工作空间,仅实时计算引擎VVR 8.0.5及以上版本支持该方案,详情请参见开通实时计算Flink版。
已创建云数据库MongoDB,仅云数据库 MongoDB 4.0及以上版本支持该方案,详情请参见创建MongoDB分片集群实例。
已创建实时数仓Hologres,仅1.3及以上版本的独享Hologres实例支持该方案,详情请参见购买实时数仓Hologres。
已创建消息队列Kafka,详情请参见部署消息队列Kafka实例。
实时计算Flink版、云数据库 MongoDB、实时数仓Hologres和云消息队列Kafka需要在同一VPC下。如果不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见如何访问跨VPC的其他服务?和实时计算Flink版如何访问公网?
通过RAM用户或RAM角色等身份访问对应资源时,需要其具备对应资源的权限。
步骤一:准备数据
在MongoDB创建数据库和三张业务表并插入数据。
登录MongoDB控制台,选择单击创建好的分片集群实例。
单击右上角登录数据库,选择单击目标实例。
创建mongo_test数据库。
use mongo_test;
在mongo_test创建game_sales(游戏销售表),game_dimension(游戏维度表)和platform_dimension(平台维度表)并插入数据。
//游戏销售表 db.game_sales.insert( [ {sale_id:0,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500}, ] ); //游戏维度表 db.game_dimension.insert( [ {game_id:101,"game_name":"SpaceInvaders","release_date":"2023-06-15","developer":"DevCorp","publisher":"PubInc"}, {game_id:102,"game_name":"PuzzleQuest","release_date":"2023-07-20","developer":"PuzzleDev","publisher":"QuestPub"}, {game_id:103,"game_name":"RacingFever","release_date":"2023-08-10","developer":"SpeedCo","publisher":"RaceLtd"}, {game_id:104,"game_name":"AdventureLand","release_date":"2023-09-05","developer":"Adventure","publisher":"LandCo"}, ] ); //平台维度表 db.platform_dimension.insert( [ {platform_id:1,"platform_name":"PCGaming","type":"PC"}, {platform_id:2,"platform_name":"PlayStation","type":"Console"}, {platform_id:3,"platform_name":"Mobile","type":"Mobile"} ] );
查询创建的表信息。
db.game_sales.find(); db.game_dimension.find(); db.platform_dimension.find();
Hologres创建数据分析宽表。
登录Hologres控制台,在实例列表页面,单击目标实例,单击右上角的登录实例。
单击上方导航栏的新建库,新建数据库名称为test,本示例权限策略选择SPM,详情请参见使用Hologres管理控制台创建数据库。
选择上方导航栏中的SQL编辑器,单击左侧导航栏上方的SQL图标,新建SQL查询,选择对应实例名和数据库,填入下方代码创建销售明细表。
CREATE TABLE game_sales_details( sale_id INT not null primary key, game_id INT, platform_id INT, sale_date VARCHAR(50), units_sold INT, sale_amt INT, game_name VARCHAR(50), release_date VARCHAR(50), developer VARCHAR(50), publisher VARCHAR(50), platform_name VARCHAR(50), type VARCHAR(50) );
消息队列Kafka创建Topic。
登录Kafka控制台,在实例列表页面,单击目标实例名称。
单击左侧导航栏的白名单管理,添加或修改白名单分组,将Flink工作空间网段设置为白名单。
左侧导航栏选择Topic管理,单击创建Topic,名称为
game_sales_fact
,其余选择默认设置,单击确认。
步骤二:创建流作业
作业一:将销售表主键写入消息队列
游戏销售表将表主键(sale_id)存入消息队列Kafka,流程如下图:
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击
。单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步。
修改文件名称为MongoDB_to_Kafka,单击创建。
下列代码中使用MongoDB连接器创建了源表game_sales,使用Upsert Kafka连接器创建了Kafka的Topic为game_sales_fact。为避免作业中出现明文密码,造成安全隐患,可以参见变量管理来配置密码和地址等变量。
//创建MongoDB游戏销售表 CREATE TEMPORARY TABLE game_sales ( `_id` STRING, --MongoDB自生成Id sale_id INT, --销售Id game_id INT, --游戏Id platform_id INT, --平台Id sale_date STRING, --销售日期 units_sold INT, --销售量 sale_amt INT, --销售金额 PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', --使用的连接器 'hosts' = '${secret_values.MongoDB-hosts}', --MongoDB连接地址 'username' = '${secret_values.MongoDB-username}', --MongoDB用户名 'password' = '${secret_values.MongoDB-password}', --MongoDB密码 'database' = 'mongo_test', --数据库名称 'collection' = 'game_sales' --数据库表名 ); //创建Kafka Topic存储主键信息的事实表 CREATE TEMPORARY TABLE game_sales_fact ( sale_id INT, PRIMARY KEY (sale_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', --使用的连接器 'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}', --Kafka连接地址 'topic' = 'game_sales_fact', --Topic名称 'key.format' = 'json', --key部分数据格式 'value.format' = 'json' --value部分数据格式 ); BEGIN STATEMENT SET; --事务块起始,下列操作全部成功,或者全部回滚 // 将主键信息插入Kafka的Topic INSERT INTO game_sales_fact ( sale_id ) SELECT sale_id FROM game_sales; END;
说明本示例使用的是Upsert Kafka连接器,以Upsert方式向Kafka写入数据,其与Kafka连接器的区别请参见Kafka、Upsert Kafka或Kafka JSON catalog的选择。
单击右上方的部署,单击确认。
更多部署参数详情请参见部署作业。
作业二:同步更新主键信息到消息队列
维度表与销售表相互进行连接(维表JOIN)。若维度表或销售表发生变更,将会把销售表中受影响数据的主键(sale_id)更新至消息队列Kafka。作业流程如下图:
维表JOIN:通常用于使用从外部系统查询的数据来丰富表。join 要求一个表具有处理时间属性,还需要一个强制的相等连接条件,详情请参见维表JOIN语句。在下列作业中,带有process time属性的FOR SYSTEM_TIME AS OF 子句在确保联接处理game_sales表每一行时,都能与join条件匹配的维表行连接。它还防止连接的维表在未来发生更新时变更连接的结果。强制的连接条件则是gd.game_id = gsf.game_id和pd.platform_id = gsf.platform_id。
参考作业一新建并部署作业(MongoDB_joinTo_Kafka)。
//创建游戏维度表
CREATE TEMPORARY TABLE game_dimension
(
`_id` STRING,
game_id INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_dimension'
);
//创建平台维度表
CREATE TEMPORARY TABLE platform_dimension
(
`_id` STRING,
platform_id INT,
platform_name STRING,
type STRING,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'platform_dimension'
);
//创建游戏销售表
CREATE TEMPORARY TABLE game_sales
(
`_id` STRING,
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_sales'
);
//创建Kafka Topic存储主键信息的事实表
CREATE TEMPORARY TABLE game_sales_fact (
sale_id INT,
PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
'topic' = 'game_sales_fact',
'key.format' = 'json',
'value.format' = 'json'
);
BEGIN STATEMENT SET;
//游戏维度表与游戏销售表关联,有数据更新则将所影响的销售表主键插入Kafka Topic
INSERT INTO game_sales_fact (
sale_id
)
select
gs.sale_id
from game_dimension as gd
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gd.game_id = gs.game_id;
//平台维度表与销售表关联,有数据更新则将所影响的销售主键插入Kafka事实表
INSERT INTO game_sales_fact (
sale_id
)
select
gs.sale_id
from platform_dimension as pd
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on pd.platform_id = gs.platform_id;
END;
作业三:同步更新Hologres明细表
消费Kafka Topic当中的主键信息(sale_id),通过三个维表Join操作得到销售明细信息,写入最终的明细表。作业流程如下图:
参考作业一新建并部署作业(MongoDB_joinKafkaTo_Holo)。下列代码中使用Hologres连接器创建了结果表game_sales_details。
//创建Kafka Topic存储主键的事实表,消费主键信息
CREATE TEMPORARY TABLE game_sales_fact (
sale_id INT,
PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
'topic' = 'game_sales_fact', --消费的topic
'format' = 'json', --数据格式
'properties.group.id' = 'game_sales_fact', --消费者组
'properties.auto.offset.reset' = 'earliest' --如果消费者组首次使用,则从最早位点开始消费
);
//游戏销售表
CREATE TEMPORARY TABLE game_sales
(
`_id` STRING,
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_sales'
);
//游戏维度表
CREATE TEMPORARY TABLE game_dimension
(
`_id` STRING,
game_id INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_dimension'
);
//平台维度表
CREATE TEMPORARY TABLE platform_dimension
(
`_id` STRING
,platform_id INT
,platform_name STRING
,type STRING
,PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'platform_dimension'
);
//游戏销售明细表
CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_details
(
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
platform_name STRING,
type STRING,
PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
'connector' = 'hologres',
'dbname' = 'test', --Hologres的数据库名称
'tablename' = 'public.game_sales_details', --Hologres用于接收数据的表名称
'username' = '${secret_values.AccessKeyID}', --当前阿里云账号的AccessKey ID
'password' = '${secret_values.AccessKeySecret}', --当前阿里云账号的AccessKey Secret
'endpoint' = '${secret_values.Hologres-endpoint}', --当前Hologres实例VPC网络的Endpoint
'ignoredelete' = 'false', -- 是否忽略撤回消息。宽表merge需要关闭。
'mutatetype' = 'insertorupdate', -- 更新模式。宽表merge需要开启此参数,实现部分列更新。
'partial-insert.enabled' = 'true', -- 是否只插入INSERT语句中定义的字段。宽表merge需要开启此参数,实现部分列更新。
'ignoreNullWhenUpdate' = 'true' --是否忽略更新写入数据中的Null值。
);
INSERT INTO game_sales_details (
sale_id,
game_id,
platform_id,
sale_date,
units_sold,
sale_amt,
game_name,
release_date,
developer,
publisher,
platform_name,
type
)
select
gsf.sale_id,
gs.game_id,
gs.platform_id,
gs.sale_date,
gs.units_sold,
gs.sale_amt,
gd.game_name,
gd.release_date,
gd.developer,
gd.publisher,
pd.platform_name,
pd.type
from game_sales_fact as gsf
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gsf.sale_id = gs.sale_id
join game_dimension FOR SYSTEM_TIME AS OF PROCTIME() as gd
on gs.game_id = gd.game_id
join platform_dimension FOR SYSTEM_TIME AS OF PROCTIME() as pd
on gs.platform_id = pd.platform_id;
步骤三:启动作业
完成三个作业开发部署后,在左侧导航选择
,启动三个作业。作业运行后,前往Hologres控制台,对明细表game_sales_details进行查询。
SELECT * FROM game_sales_details;
此时game_sales_details表中插入了一条数据。
步骤四:数据更新和查询
对于销售表和维度表的数据变更,能直接更新反馈到明细表上进行查询分析。下面将列举几种常见的数据变更操作,以展示数据实时更新的情况。
销售表数据变更
对MongoDB的game_sales表插入五条数据,并观察Hologres的game_sale_details表的结果。
db.game_sales.insert( [ {sale_id:1,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500}, {sale_id:2,game_id:102,platform_id:2,"sale_date":"2024-08-02",units_sold:400,sale_amt:2000}, {sale_id:3,game_id:103,platform_id:1,"sale_date":"2024-08-03",units_sold:300,sale_amt:1500}, {sale_id:4,game_id:101,platform_id:3,"sale_date":"2024-08-04",units_sold:200,sale_amt:1000}, {sale_id:5,game_id:104,platform_id:2,"sale_date":"2024-08-05",units_sold:100,sale_amt:3000} ] );
查询Hologres的game_sale_details表,可以看到,该表增加了五条相应的数据。
将MongoDB表game_sales中sale_date=2024-01-01的数据均修改为2024-08-01。
db.game_sales.updateMany({"sale_date": "2024-01-01"}, {$set: {"sale_date": "2024-08-01"}});
而后查询Hologres的game_sale_details表。可以看到,该表中sale_date=2024-01-01的数据都被修改成2024-08-01了。
对MongoDB的game_sales表删除数据,将日期为2024-08-01的数据删除。
db.game_sales.remove({"sale_date": "2024-08-01"});
而后查询Hologres的game_sale_details表。可以看到,该表中删除了两条日期为2024-08-01的数据。
维度表数据变更
对MongoDB的game_dimension表和platform_dimension表插入数据,插入新的游戏和新的平台数据。
//游戏维度表 db.game_dimension.insert( [ {game_id:105,"game_name":"HSHWK","release_date":"2024-08-20","developer":"GameSC","publisher":"GameSC"}, {game_id:106,"game_name":"HPBUBG","release_date":"2018-01-01","developer":"BLUE","publisher":"KK"} ] ); //平台维度表 db.platform_dimension.insert( [ {platform_id:4,"platform_name":"Steam","type":"PC"}, {platform_id:5,"platform_name":"Epic","type":"PC"} ] );
新增游戏数据和新增平台数据,并不会增加明细表数据。用户需要产生购买或者下载行为才可以,所以继续添加game_sales表的数据。
// 游戏销售表 db.game_sales.insert( [ {sale_id:6,game_id:105,platform_id:4,"sale_date":"2024-09-01",units_sold:400,sale_amt:2000}, {sale_id:7,game_id:106,platform_id:1,"sale_date":"2024-09-01",units_sold:300,sale_amt:1500} ] );
而后查询Hologres的game_sale_details表,可以看到,明细表新增了两条关于新游戏和新平台的数据记录。
对MongoDB的game_dimension表和platform_dimension表更新数据,更新游戏信息和平台信息。
//更新开发日期 db.game_dimension.updateMany({"release_date": "2018-01-01"}, {$set: {"release_date": "2024-01-01"}}); //更新平台类型 db.platform_dimension.updateMany({"type": "PC"}, {$set: {"type": "Swich"}});
可以看到相关的字段信息已经被修改了。