基于Flink+MongoDB+Hologres的游戏行业用户行为分析

用户行为数据通常具有庞大的数据量,存储成本较高,且缺乏统一的格式,导致处理难度较大。常用的宽表模型虽查询效率高,但冗余度高、存储空间大、维护复杂,更新慢。本文基于Flink+MongoDB+Hologres更好地实现宽表的数据分析,以游戏行业的用户行为数据分析为示例,构建用户行为数据宽表进行数据分析的方案。

方案架构和优势

架构

实时计算Flink版是强大的流式计算引擎,支持对海量实时数据高效处理。云数据库MongoDB版是一种文档型的NoSQL数据库,具有数据存储结构灵活、读写性能高、支持复杂的查询条件等特点。实时数仓Hologres是一站式实时数仓,支持数据实时写入与更新,实时数据写入即可查。三者结合相辅相成,能够将复杂多变的数据进行集合计算处理,实现对大型数据即时更新、查询与分析。本文的示例方案架构如下:

  1. MongoDB的业务数据更新后,通过Flink将数据写入Hologres,更新宽表部分列,将数据主键通过消息队列Kafka采用Upsert的方式更新写入。

  2. Flink通过消费消息队列Kafka中的主键信息,根据主键获取MongoDB的完整的业务数据后更新写入到Hologres。

  3. 由实时数仓Hologres对外提供应用查询分析。

image

优势

该方案有如下优势:

  • 云数据库MongoDB版适用于高并发读写的场景的分片集群架构,针对高频写入的海量数据,可无限扩展性能及存储空间,解决写入效率低和存储空间不足的问题。

  • 实时计算Flink版监控数据发生变更后,将主键信息写入消息队列Kafka,并及时进行消费,将完整数据更新写入实时数仓Hologres。这一过程确保了数据更新的及时性,并且在处理海量数据时,仅对发生变更的数据进行更新,从而有效解决了更新效率低下的问题。

  • 实时数仓Hologres支持高效更新与修正,写入即可查,由其对外提供宽表模型的数据查询,有效提升数据查询与分析的效率。

实践场景

本文以某游戏厂商为例,实现了将用户在平台上购买游戏的行为数据进行实时预处理并写入Hologres及时查询的业务场景。

image
  1. MongoDB业务数据更新:

    MongoDB有3张表,包括主体表game_sales(游戏销售表)、维表game_dimension(游戏维度表)、维表platform_dimension(平台维度表)。业务数据更新,将同步更新游戏销售事实表数据。

  2. Kafka数据更新记录:

    主体表或维表更新,其对应所影响数据的主键将更新到消息队列。

  3. Hologres更新:

    通过消费消息队列的主键信息,将其与三张表相互关联,从而得到完整的用户行为数据以更新事实表。

前提条件

步骤一:准备数据

  1. 在MongoDB创建数据库和三张业务表并插入数据。

    1. 登录MongoDB控制台,选择单击创建好的分片集群实例。

    2. 将Flink工作空间网段设置为白名单,详情请参见设置白名单如何设置白名单

    3. 单击右上角登录数据库,选择单击目标实例。

    4. 创建mongo_test数据库。

      use mongo_test;
    5. 在mongo_test创建game_sales(游戏销售表),game_dimension(游戏维度表),platform_dimension(平台维度表)并插入数据。

      //游戏销售表
      db.game_sales.insert(
        [
      	{sale_id:0,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500},
        ]
      );
      
      //游戏维度表
      db.game_dimension.insert(
        [
      	{game_id:101,"game_name":"SpaceInvaders","release_date":"2023-06-15","developer":"DevCorp","publisher":"PubInc"},
      	{game_id:102,"game_name":"PuzzleQuest","release_date":"2023-07-20","developer":"PuzzleDev","publisher":"QuestPub"},
      	{game_id:103,"game_name":"RacingFever","release_date":"2023-08-10","developer":"SpeedCo","publisher":"RaceLtd"},
      	{game_id:104,"game_name":"AdventureLand","release_date":"2023-09-05","developer":"Adventure","publisher":"LandCo"},
        ]
      );
      
      //平台维度表
      db.platform_dimension.insert(
        [
      	{platform_id:1,"platform_name":"PCGaming","type":"PC"},
      	{platform_id:2,"platform_name":"PlayStation","type":"Console"},
      	{platform_id:3,"platform_name":"Mobile","type":"Mobile"}
        ]
      );
    6. 查询创建的表信息。

      db.game_sales.find();
      db.game_dimension.find();
      db.platform_dimension.find();

      image

  2. Hologres创建数据分析宽表。

    1. 登录Hologres控制台,在实例列表页面,单击目标实例,单击右上角的登录实例

    2. 单击上方导航栏的新建库,新建数据库名称为test,本示例权限策略选择SPM,详情请参见使用Hologres管理控制台创建数据库

      image

    3. 选择上方导航栏中的SQL编辑器,选择对应实例名和数据库,填入下方代码创建数据分析宽表。

      CREATE TABLE game_sales_fact(
        sale_id INT not null primary key,
        game_id INT,
        platform_id INT,
        sale_date VARCHAR(50),
        units_sold INT,
        sale_amt INT,
        game_name VARCHAR(50),
        release_date VARCHAR(50),
        developer VARCHAR(50),
        publisher VARCHAR(50),
        platform_name VARCHAR(50),
        type VARCHAR(50)
      );
  3. 消息队列Kafka创建Topic。

    1. 登录Kafka控制台,在实例列表页面,单击目标实例名称。

    2. 单击左侧导航栏的白名单管理,添加或修改白名单分组,将Flink工作空间网段设置为白名单。

    3. 左侧导航栏选择Topic管理,单击创建Topic,名称为game_sales_fact,其余选择默认设置,单击确认

步骤二:创建流作业

作业一:销售表同步数据到Hologres和消息队列

游戏销售表的部分数据直接更新到销售事实表中,将表主键(sale_id)存入消息队列Kafka用于增量更新销售事实表,流程如下图:

image
  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据开发 > ETL

  4. 单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步。

    下列代码中使用MongoDB连接器创建了源表game_sales,使用Hologres连接器创建了结果表game_sales_fact,使用Upsert Kafka连接器创建了Kafka的Topic结果表game_sales_fact。为避免作业中出现明文密码,造成安全隐患,可以参见变量管理来配置密码和地址等变量。

    //创建MongoDB游戏销售表
    CREATE TEMPORARY TABLE mongo_game_sales
    (
      `_id`       STRING,    --MongoDB自生成Id
      sale_id     INT,       --销售Id
      game_id     INT,       --游戏Id
      platform_id INT,       --平台Id
      sale_date   STRING,    --销售日期
      units_sold  INT,       --销售量
      sale_amt    INT,       --销售金额
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',      --使用的连接器
      'hosts' = '${secret_values.MongoDB-hosts}',        --MongoDB连接地址
      'username' = '${secret_values.MongoDB-username}',  --MongoDB用户名
      'password' = '${secret_values.MongoDB-password}',  --MongoDB密码
      'database' = 'mongo_test',    --数据库名称
      'collection' = 'game_sales'   --数据库表名
    );
    
    //创建Hologres结果宽表(游戏销售事实表)
    CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_fact
    (
      sale_id       INT,       --销售Id
      game_id       INT,       --游戏Id
      platform_id   INT,       --平台Id
      sale_date     STRING,    --销售日期
      units_sold    INT,       --销售量
      sale_amt      INT,       --销售金额
      game_name     STRING,    --游戏名称
      release_date  STRING,    --发售日期
      developer     STRING,    --开发者
      publisher     STRING,    --发行商
      platform_name STRING,    --平台名称
      type          STRING,    --终端类型
      PRIMARY KEY (sale_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'hologres', --使用的连接器
      'dbname' = 'test', --Hologres的数据库名称
      'tablename' = 'public.game_sales_fact', --Hologres用于接收数据的表名称
      'username' = '${secret_values.AccessKeyID}', --当前阿里云账号的AccessKey ID
      'password' = '${secret_values.AccessKeySecret}', --当前阿里云账号的AccessKey Secret
      'endpoint' = '${secret_values.Hologres-endpoint}', --当前Hologres实例VPC网络的Endpoint
      'ignoredelete' = 'false', -- 是否忽略撤回消息。宽表merge需要关闭。
      'mutatetype' = 'insertorupdate', -- 更新模式。宽表merge需要开启此参数,实现部分列更新。
      'partial-insert.enabled' = 'true', -- 是否只插入INSERT语句中定义的字段。宽表merge需要开启此参数,实现部分列更新。
      'ignoreNullWhenUpdate' = 'true' --是否忽略更新写入数据中的Null值。
    );
    
    
    //创建Kafka存储主键信息的事实表
    CREATE TEMPORARY TABLE kafka_game_sales_fact (
      sale_id     INT,
      PRIMARY KEY (sale_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',    --使用的连接器
      'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',   --Kafka连接地址
      'topic' = 'game_sales_fact',    --Topic名称
      'key.format' = 'json',          --key部分数据格式
      'value.format' = 'json'         --value部分数据格式
    );
    
    BEGIN STATEMENT SET;   --事务块起始,下列操作全部成功,或者全部回滚
    
    // 插入Hologres游戏事实表的部分列
    INSERT INTO game_sales_fact(
      sale_id,
      game_id,
      platform_id,
      sale_date,
      units_sold,
      sale_amt
    )
    SELECT
      sale_id,
      game_id,
      platform_id,
      sale_date,
      units_sold,
      sale_amt
    FROM mongo_game_sales;
    
    // 将主键信息插入Kafka的Topic
    INSERT INTO kafka_game_sales_fact (
      sale_id
    )
    SELECT
      sale_id
    FROM mongo_game_sales;
    
    END;
    
    
    说明

    本示例使用的是Upsert Kafka连接器,以Upsert方式向Kafka写入数据,其与Kafka连接器的区别请参见Kafka、Upsert Kafka或Kafka JSON catalog的选择

  5. 单击右上方的部署,单击确认

    更多部署参数详情请参见部署作业

作业二:同步更新主键信息到消息队列

维表与销售表相互进行连接(join)。若维表或销售表发生变更,将会把销售表中受影响数据的主键(sale_id)更新至消息队列Kafka。作业流程如下图:

image

参考作业一新建并部署作业。

//创建游戏维度表
CREATE TEMPORARY TABLE mongo_game_dimension
(
  `_id`        STRING,
  game_id      INT,
  game_name    STRING,
  release_date STRING,
  developer    STRING,
  publisher    STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_dimension'
);

//创建平台维度表
CREATE TEMPORARY TABLE mongo_platform_dimension
(
  `_id`         STRING,
  platform_id   INT,
  platform_name STRING,
  type          STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'platform_dimension'
);

//创建游戏销售表
CREATE TEMPORARY TABLE mongo_game_sales
(
  `_id`       STRING,
  sale_id     INT,
  game_id     INT,
  platform_id INT,
  sale_date   STRING,
  units_sold  INT,
  sale_amt    INT,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_sales'
);

//创建Kafka存储主键信息的事实表
CREATE TEMPORARY TABLE kafka_game_sales_fact (
  sale_id      INT,
  PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
  'topic' = 'game_sales_fact',
  'key.format' = 'json',
  'value.format' = 'json'
);


BEGIN STATEMENT SET;

//游戏维度表与销售表关联,有数据更新则将所影响的销售主键插入Kafka事实表
INSERT INTO kafka_game_sales_fact (
   sale_id
)
select
  gsf.sale_id
from mongo_game_dimension as gd
inner join mongo_game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gsf
on gd.game_id = gsf.game_id;

//平台维度表与销售表关联,有数据更新则将所影响的销售主键插入Kafka事实表
INSERT INTO kafka_game_sales_fact (
   sale_id
)
select
  gsf.sale_id
from mongo_platform_dimension as pd
inner join mongo_game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gsf
on pd.platform_id = gsf.platform_id;

END;

作业三:同步更新Hologres宽表

通过消费Kafka当中的主键信息(sale_id),识别该主键的数据是新增或发生变更的数据,然后通过三张业务表相互join,将得到的数据更新写入到事实表中,作业流程如下图:

image

参考作业一新建并部署作业。

//创建Kafka存储主键的事实表,消费主键信息
CREATE TEMPORARY TABLE kafka_game_sales_fact (
  sale_id      INT,
  PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
  'topic' = 'game_sales_fact',    --消费的topic
  'format' = 'json',              --数据类型
  'properties.group.id' = 'game_sales_fact',  --消费者组
  'properties.auto.offset.reset' = 'earliest' --如果消费者组首次使用,则从最早位点开始消费
);


//游戏销售表
CREATE TEMPORARY TABLE mongo_game_sales
(
  `_id`       STRING,
  sale_id     INT,
  game_id     INT,
  platform_id INT,
  sale_date   STRING,
  units_sold  INT,
  sale_amt    INT,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_sales'
);

//游戏维度表
CREATE TEMPORARY TABLE mongo_game_dimension
(
  `_id`        STRING,
  game_id      INT,
  game_name    STRING,
  release_date STRING,
  developer    STRING,
  publisher    STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_dimension'
);


//平台维度表
CREATE TEMPORARY TABLE mongo_platform_dimension
(
  `_id`          STRING
  ,platform_id   INT
  ,platform_name STRING
  ,type          STRING
  ,PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'platform_dimension'
)
;

//Hologres结果表
CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_fact
(
  sale_id       INT,
  game_id       INT,
  platform_id   INT,
  sale_date     STRING,
  units_sold    INT,
  sale_amt      INT,
  game_name     STRING,
  release_date  STRING,
  developer     STRING,
  publisher     STRING,
  platform_name STRING,
  type          STRING,
  PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
  'connector' = 'hologres',
  'dbname' = 'test', --Hologres的数据库名称
  'tablename' = 'public.game_sales_fact', --Hologres用于接收数据的表名称
  'username' = '${secret_values.AccessKeyID}', --当前阿里云账号的AccessKey ID
  'password' = '${secret_values.AccessKeySecret}', --当前阿里云账号的AccessKey Secret
  'endpoint' = '${secret_values.Hologres-endpoint}', --当前Hologres实例VPC网络的Endpoint
  'ignoredelete' = 'false', -- 是否忽略撤回消息。宽表merge需要关闭。
  'mutatetype' = 'insertorupdate', -- 更新模式。宽表merge需要开启此参数,实现部分列更新。
  'partial-insert.enabled' = 'true', -- 是否只插入INSERT语句中定义的字段。宽表merge需要开启此参数,实现部分列更新。
  'ignoreNullWhenUpdate' = 'true' --是否忽略更新写入数据中的Null值。
);


INSERT INTO game_sales_fact (
  sale_id,
  game_id,
  platform_id,
  sale_date,
  units_sold,
  sale_amt,
  game_name,
  release_date,
  developer,
  publisher,
  platform_name,
  type
)
select
  kgsf.sale_id,
  gs.game_id,
  gs.platform_id,
  gs.sale_date,
  gs.units_sold,
  gs.sale_amt,
  gd.game_name,
  gd.release_date,
  gd.developer,
  gd.publisher,
  pd.platform_name,
  pd.type
from kafka_game_sales_fact as kgsf
inner join mongo_game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on kgsf.sale_id = gs.sale_id

inner join mongo_game_dimension FOR SYSTEM_TIME AS OF PROCTIME() as gd
on gs.game_id = gd.game_id

inner join mongo_platform_dimension FOR SYSTEM_TIME AS OF PROCTIME() as pd
on gs.platform_id = pd.platform_id;

步骤三:启动作业

  1. 完成三个作业开发部署后,在左侧导航选择运维中心 > 作业运维启动三个作业。

  2. 作业运行后,前往Hologres控制台,对宽表game_sales_fact进行查询。

    SELECT * FROM game_sales_fact;

    此时game_sales_fact表中插入了一条数据。

    image

步骤四:数据更新和查询

对于销售表和维表的数据变更,能直接更新反馈到宽表上进行查询分析。下面将列举几种常见的数据变更操作,以展示数据实时更新的情况。

销售表数据变更

  1. 对MongoDB的game_sales表插入五条数据,并观察Hologres的game_sale_fact表的结果。

    db.game_sales.insert(
      [
    	{sale_id:1,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500},
    	{sale_id:2,game_id:102,platform_id:2,"sale_date":"2024-08-02",units_sold:400,sale_amt:2000},
    	{sale_id:3,game_id:103,platform_id:1,"sale_date":"2024-08-03",units_sold:300,sale_amt:1500},
    	{sale_id:4,game_id:101,platform_id:3,"sale_date":"2024-08-04",units_sold:200,sale_amt:1000},
    	{sale_id:5,game_id:104,platform_id:2,"sale_date":"2024-08-05",units_sold:100,sale_amt:3000}
      ]
    );

    查询Hologres的game_sale_fact表,可以看到,该表增加了五条相应的数据。

    image

  2. 将MongoDB表game_sales中sale_date=2024-01-01的数据均修改为2024-08-01。

    db.game_sales.updateMany({"sale_date": "2024-01-01"}, {$set: {"sale_date": "2024-08-01"}});

    而后查询Hologres的game_sale_fact表。可以看到,该表中sale_date=2024-01-01的数据都被修改成2024-08-01了。

    image

  3. 对MongoDB的game_sales表删除数据,将日期为2024-08-01的数据删除。

    db.game_sales.remove({"sale_date": "2024-08-01"});

    而后查询Hologres的game_sale_fact表。可以看到,该表中删除了两条日期为2024-08-01的数据。

    image

维度表数据变更

  1. 对MongoDB的game_dimension表和platform_dimension表插入数据,插入新的游戏和新的平台数据。

    //游戏维度表
    db.game_dimension.insert(
      [
    	{game_id:105,"game_name":"HSHWK","release_date":"2024-08-20","developer":"GameSC","publisher":"GameSC"},
    	{game_id:106,"game_name":"HPBUBG","release_date":"2018-01-01","developer":"BLUE","publisher":"KK"}
      ]
    );
    
    //平台维度表
    db.platform_dimension.insert(
      [
    	{platform_id:4,"platform_name":"Steam","type":"PC"},
    	{platform_id:5,"platform_name":"Epic","type":"PC"}
      ]
    );

    新增游戏数据和新增平台数据,并不会增加宽表数据。用户需要产生购买或者下载行为才可以,所以继续添加game_sales表的数据。

    // 游戏销售表
    db.game_sales.insert(
      [
    	{sale_id:6,game_id:105,platform_id:4,"sale_date":"2024-09-01",units_sold:400,sale_amt:2000},
    	{sale_id:7,game_id:106,platform_id:1,"sale_date":"2024-09-01",units_sold:300,sale_amt:1500}
      ]
    );

    而后查询Hologres的game_sale_fact表,可以看到宽表新增了两条关于新游戏和新平台的数据记录。

    image

  2. 对MongoDB的game_dimension表和platform_dimension表更新数据,更新游戏信息和平台信息。

    //更新开发日期
    db.game_dimension.updateMany({"release_date": "2018-01-01"}, {$set: {"release_date": "2024-01-01"}});
    
    //更新平台类型
    db.platform_dimension.updateMany({"type": "PC"}, {$set: {"type": "Swich"}});

    可以看到相关的字段信息已经被修改了。

    image

相关文档