文档

OpenLake:Flink+Paimon+Hologres一体化实时湖仓分析

更新时间:

本文将为您介绍使用Flink、Paimon和Hologres搭建流式湖仓分析系统,实时分析GitHub公开事件行为数据。该系统使用Flink完成GitHub公开行为事件以Paimon格式实时入湖,并使用Hologres SQL完成实时探查,通过Hologres Dynamic Table完成湖上数据实时入仓,并进行仓内数据聚合分析和漏斗分析。

背景信息

OpenLake简介

阿里云OpenLake解决方案建立在可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。基于OSS的公共湖仓,结合元数据管理平台DLF,支持结构化、半结构化及非结构化数据的管理,确保数据表和文件的安全访问,并具备增删改查与IO加速能力。该方案支持大数据、搜索和AI多引擎对接,实现引擎平台协同计算。通过DataWorks一体化IDE或Notebook,用于统一进行多引擎SQL或Python开发,享受多任务可视化调度与大规模模型并发执行的保障。

客户可以便捷地建立OpenLake湖仓,跨不同计算引擎进行数据操作,并通过构建模型索引,实现搜索和RAG能力的数据透出。在用户开放环境中,用户可结合AI特征工程、模型训练和在线预测,全面提升数据处理和分析效能。

本方案通过与以下产品整合,完成流式湖仓分析系统的搭建。

产品名称

产品简介

实时计算Flink版

阿里云提供的全托管Serverless Flink云服务,具备实时应用的作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。

实时数仓 Hologres

阿里巴巴自主研发的一站式实时湖仓引擎(Real-Time Data Lakehouse),支持海量数据实时写入、加工与分析,支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),支持近实时湖仓数据交互式分析及联邦查询,为用户提供离在线一体、分析服务一体、湖仓一体的全栈数仓解决方案。

Apache Paimon

一种流批统一的湖存储格式,结合Flink及Spark构建流批处理的实时湖仓一体架构。Paimon创新的结合湖格式与LSM技术,给数据湖带来了实时流更新以及完整的流处理能力。

数据湖构建(DLF)

一款全托管的快速帮助用户构建云上数据湖及Lakehouse的服务,为客户提供了统一的元数据管理、统一的权限与安全管理、便捷的数据入湖能力以及一键式数据探索能力。DLF可以帮助用户快速完成云原生数据湖及Lakehouse方案的构建与管理,并可无缝对接多种计算引擎,打破数据孤岛,洞察业务价值。

DataWorks

智能湖仓一体数据开发治理平台,内置阿里巴巴15年大数据建设方法论,深度适配阿里云MaxCompute、E-MapReduce、Hologres、Flink、PAI 等数十种大数据和AI计算服务,为数据仓库、数据湖、OpenLake湖仓一体数据架构提供智能化ETL开发、数据分析与主动式数据资产治理服务,助力“Data+AI”全生命周期的数据管理。

环境准备

您需要开通阿里云实时数仓Hologres、实时计算Flink、湖仓一体数据开发治理平台DataWorks、数据湖构建Data Lake Formation四款产品。

说明
  • 上述产品在阿里云官网均提供免费或低价试用,如您符合试用条件,可前往免费试用专区或对应产品详情页试用入口来申请试用。

实时数仓Hologres

  1. 进入Hologres产品详情页

  2. 单击立即购买

  3. 在购买页面,配置相关参数。其他选项保持默认或根据需要选择。

    参数类型

    说明

    商品类型

    选择独享实例(按量付费)

    地域

    选择华东1(杭州)

    可用区

    选择可用区J

    专有网络(VPC)

    选择与阿里云云服务位于同一地域和同一VPC,进行内网互连。首次需要创建,详情请参见创建和管理专有网络

    专有网络交换机

    交换机的选择对连通性和性能没有特殊影响。首次需要创建,详情请参见创建和管理交换机

    实例名称

    自定义填写。

    服务关联角色

    首次购买Hologres,需在购买页底部单击创建服务关联角色

  4. 单击立即购买,确认订单后完成开通。

  5. 实例开通成功后,进入实例列表页,单击数据湖加速,开启数据湖能力。

    image

  6. 单击所创建实例ID进入实例详情页。

  7. 单击数据库管理进入HoloWeb。

  8. 选择元数据管理 > 新建库

  9. 在新建库页面,输入表名db_demo_kmx。选择SPM简单权限模型,单击确认完成数据库创建。

实时计算Flink版

  1. 登录Flink控制台

  2. 单击Flink全托管旁的立即购买

    说明

    首次购买时,需要您授予Flink访问相应云资源的权限。

  3. 在购买页面,配置相关参数。其他选项保持默认或根据需要选择。

    参数类型

    说明

    付费模式

    选择按量付费

    地域

    选择华东1(杭州)

    可用区

    选择可用区J

    专有网络

    Flink工作空间必须和目标上下游存储在同一个专有网络下。

    虚拟交换机

    Flink控制台上只显示指定专有网络和指定可用区下的虚拟交换机。如果没有可供选择的虚拟交换机,您可以创建和管理交换机

    工作空间名称

    自定义填写。

  4. 单击确认订单并支付,即可开通成功。

  5. 单击管理控制台,即可看到正在创建的工作空间。

    image

数据湖构建(DLF)

  1. 登录DLF控制台

  2. 完成云资源访问授权开通数据湖构建

  3. Catalog列表页面,单击新建Catalog

  4. 输入相关信息,勾选服务协议,并单击创建Catalog

  5. Catalog列表页面查看已创建Catalog。

    image

DataWorks

  1. 开通DataWorks服务,地域选择华东1(杭州)。详情请参见开通DataWorks服务

  2. 登录DataWorks控制台

  3. 选择工作空间,在工作空间列表单击创建工作空间。详情请参见创建工作空间

    相关参数配置如下,其他选项保持默认或根据需要选择。

    参数名称

    说明

    工作空间名称

    自定义个人空间名称。

    参加数据开发(新版)公测

    打开开关。

    默认资源组配置

    没有独享资源组,可单击新建资源组。

  4. 在绑定计算资源页面,选择Hologres,单击绑定Hologres计算资源来绑定Hologres计算资源。

    相关参数配置如下,其他选项保持默认或根据需要选择。

    参数名称

    说明

    Hologres实例

    下拉选择上文创建的实例。

    数据库名称

    下拉选择内部DB,如上文创建的openlake_db。

    默认访问身份

    选择主账号访问。

    计算资源实例名

    输入hologres_datasource。您也可以自定义设置。

  5. 在绑定计算资源页面,选择全托管Flink,单击绑定全托管Flink计算资源来绑定Flink计算资源。

    相关参数配置如下,其他选项保持默认或根据需要选择。

    参数名称

    说明

    Flink工作空间

    下拉选择上文创建的工作空间。

    Flink项目空间

    下拉选择默认项目空间。

    默认部署目标

    下拉选择default-queue

    计算资源实例名

    输入flink_datasource。您也可以自定义设置。

  6. 选择下一步 > 完成,在工作空间列表页面,查看已创建工作空间。

    image

GitHub公开行为事件实时入湖

在DataWorks中新建Notebook

说明

DataWorks Notebook是智能化交互式数据开发和分析工具,能够面向多种数据引擎开展SQL或Python分析,即时运行或调试代码,获取可视化数据结果。同时,DataWorks Notebook能够与其他任务节点混合编排为工作流,提交至调度系统运行,助力复杂业务场景的灵活实现。

  1. 登录DataWorks控制台

  2. 选择数据开发与治理 > 数据开发 > 进入数据开发(新版)

  3. 选择数据开发 > 个人目录,单击image

    image

  4. 输入Notebook名称,敲击键盘回车键或单击页面空白位置,使Notebook名称生效。

  5. 选择个人开发环境 > 前往新建

    image

  6. 进入新建实例页面,配置相关参数。

    参数名称

    说明

    实例名称

    自定义设置。

    资源组

    选择资源组。如果没有对应资源组,您可以直接新建资源组

    专有网络

    选择与阿里云云服务位于同一地域和同一VPC。

    安全组

    选择安全组。如果没有对应资源组,您可以直接新建安全组

    交换机

    选择交换机。如果没有对应交换机,您可以直接新建交换机

    实例RAM角色

    选择DataWorks默认角色。

  7. 单击确定,完成实例创建。

  8. 单击Notebook名称,即可打开并进入Notebook编辑页面,进行后续步骤。

使用Hologres在DLF中创建DB和表

  1. 登录DataWorks控制台,完成Notebook新建。

  2. 单击目标Notebook名称,进入编辑页面。

  3. 单击image,Cell类型选择Hologres SQL,选择上文绑定的计算资源。

    image

  4. 输入SQL,单击image,创建External Database来映射上文创建的DLF Catalog。

    CREATE EXTERNAL DATABASE ext_db_dlf
    WITH 
      metastore_type 'dlf-paimon' 
      catalog_type 'paimon' 
      dlf_region 'cn-hangzhou' 
      dlf_endpoint 'dlfnext-share.cn-hangzhou.aliyuncs.com' 
      dlf_catalog 'clg-paimon-d24*********'; 
     --修改为自己在DLF中创建的Paimon Catalog的ID
  5. 新建SQL Cell,在上文创建的EXTERNAL DATABASEext_db_dlf数据库中创建External Schema,创建后DLF Catalog可以看到名为github_events的Schema。

    CREATE EXTERNAL SCHEMA IF NOT EXISTS ext_db_dlf.github_events;
  6. 新建SQL Cell,在上文创建的EXTERNAL SCHEMAgithub_events 下创建External Table,同样,DLF对应位置可以看到此表。

    CREATE EXTERNAL TABLE ext_db_dlf.github_events.gh_event_ods(
        id TEXT,                                        -- 每个事件的唯一ID。
        created_at BIGINT,                              -- 事件时间,单位秒。
        type TEXT,                                      -- GitHub事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
        actor_id TEXT,                                  -- GitHub用户ID。
        actor_login TEXT,                               -- GitHub用户名。
        repo_id TEXT,                                   -- GitHub仓库ID。
        repo_name TEXT,                                 -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
        org TEXT,                                       -- GitHub组织ID。
        org_login TEXT,                                 -- GitHub组织名,如: apache,google,alibaba等。
        PRIMARY KEY(id)
    ) WITH (
      "changelog-producer"='input',
      "bucket"=6,
      "bucket-key"='id'
    );

使用Flink完成GitHub公开行为事件实时入湖

  1. 登录Flink控制台

  2. 选择元数据管理 > 创建Catalog > Apache Paimon > 下一步

  3. 进入配置Catalog页面。单击确定完成创建。

    说明

    metastore:dlf。

    catalog name:上文创建的Paimon Catalog名称。

  4. 登录DataWorks控制台。在目标Notebook中新建SQL Cell,输入SQL创建Flink作业来实时摄取SLS中的数据。

    说明

    为方便您更好地体验实时计算Flink版产品服务,本文代码示例中提供了仅有只读权限的AK供您试用,涉及的AK仅可用于本文示例中读取样例数据,真实数据生产环境请替换为您自己的数据源AK。

    -- @conf executionMode = STREAMING
    -- @conf name = flink_vvp_job_github_events
    -- @conf engineVersion=vvr-8.0.9-flink-1.17
    -- @conf flinkConf."execution.checkpointing.interval"=10second
    -- @conf flinkConf."execution.checkpointing.min-pause"=10second
    
    -- 创建一张临时表来实时摄取sls中的数据
    CREATE TEMPORARY TABLE sls_input (
        id STRING, -- 每个事件的唯一ID。
        created_at BIGINT, -- 事件时间,单位秒。
        type STRING, -- GitHub事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
        actor_id STRING, -- GitHub用户ID。
        actor_login STRING, -- GitHub用户名。
        repo_id STRING, -- GitHub仓库ID。
        repo_name STRING, -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
        org STRING, -- GitHub组织ID。
        org_login STRING -- GitHub组织名,如: apache,google,alibaba等。
    ) 
    WITH (
        'connector' = 'sls', -- 实时采集的GitHub事件存放在阿里云SLS中
        'project' = 'github-events-hangzhou', -- 存放公开数据的SLS项目。需改为您的实际地域信息
        'endPoint' = 'https://cn-hangzhou-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以杭州为例,您需要修改为您的实际地域信息。
        'logStore' = 'realtime-github-events',           -- 存放公开数据的SLS logStore
        'accessId' =  'LTAI5tNF1rP8PKVyYjr9TKgh',        -- 访问公开数据集的只读AK,无需替换     
        'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP'   -- 访问公开数据集的只读secret,无需替换
    );
    
    -- 将临时表数据写入Hologres中创建的Paimon格式的External Table
    
    INSERT INTO ext_db_dlf.github_events.gh_event_ods
    SELECT * FROM sls_input 
    WHERE id IS NOT NULL 
    AND created_at IS NOT NULL 
    AND TO_TIMESTAMP(created_at * 1000) >= date_add(CURRENT_DATE,-1);
  5. 新建SQL Cell,Cell类型选择Hologres SQL输入SQL查询Flink实时写入的数据。

    -- 查看数据
    SELECT * FROM ext_db_dlf.github_events.gh_event_ods limit 10;

使用Hologres完成湖上数据实时入仓以及聚合分析

使用Hologres Dynamic Table完成湖上数据实时入仓

Dynamic Table是一种声明式数据处理架构,可以自动处理并存储一个或者多个基表对象的数据聚合结果,内置不同的数据刷新策略,业务可以根据业务需求设置不同的数据刷新策略,实现数据从基表对象到Dynamic Table的自动流转,满足业务统一开发、数据自动流转、处理时效性等诉求。

登录DataWorks控制台。在目标Notebook编辑页面单击+SQLCell类型选择Hologres SQL输入SQL创建Dynamic Table。

DROP TABLE IF EXISTS hologres_github_event ;

CREATE DYNAMIC TABLE hologres_github_event 
WITH (
    auto_refresh_enable='true',
    refresh_mode = 'incremental',
    incremental_auto_refresh_schd_start_time='immediate',
    incremental_auto_refresh_interval = '1 minute',
    clustering_key='created_at_tz',
    distribution_key='id'  
) AS
SELECT 
    id::BIGINT ,
    TO_TIMESTAMP(created_at)::TIMESTAMPTZ AS created_at_tz,
    type::TEXT,                                      -- GitHub事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
    actor_id::TEXT,                                  -- GitHub用户ID。
    actor_login::TEXT,                               -- GitHub用户名。
    repo_id::TEXT,                                   -- GitHub仓库ID。
    repo_name::TEXT,                                 -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
    org::TEXT,                                       -- GitHub组织ID。
    org_login::TEXT,
    to_char(TO_TIMESTAMP(created_at),'YYYY-MM-DD')::text AS ds
FROM ext_db_dlf.github_events.gh_event_ods;

对于仓内数据进行聚合分析

登录DataWorks控制台。在目标Notebook编辑页面单击+SQL,Cell类型选择Hologres SQL输入SQL来分析GitHub行为事件。

  • 查询今日最活跃项目。

    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        hologres_github_event
    WHERE
        created_at_tz >= CURRENT_DATE 
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 10;
  • 查询1天内最活跃(事件数最多)的几位开发者。

    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        hologres_github_event
    WHERE
        created_at_tz >= now() - interval '1 day'
        AND actor_login NOT LIKE '%[bot]'
    GROUP BY
        actor_login
    ORDER BY
        events DESC
    LIMIT 10;
  • 查询过去1天最多的10项操作。

    SELECT
        type,
        count(*) total
    FROM
        hologres_github_event
    WHERE
        created_at_tz > now() - interval '1 day'
    GROUP BY
        1
    ORDER BY
        total DESC
    LIMIT 10;
  • 查看当天漏斗转化情况。

    CREATE EXTENSION IF NOT EXISTS flow_analysis; --开启Extension
    
    -- 查看当天的转换漏斗
    WITH level_detail AS (
        SELECT
            level,
            COUNT(1) AS count_user
        FROM (
            SELECT
                actor_id,
                windowFunnel (1800, 'default', created_at_tz, type = 'CreateEvent', type = 'PushEvent',type = 'IssuesEvent') AS level
            FROM
                hologres_github_event
            WHERE
                created_at_tz >= now() - interval '1 day'
                AND created_at_tz < now()
            GROUP BY
                actor_id) AS basic_table
        GROUP BY
            level
        ORDER BY
            level ASC
    )
    SELECT  CASE level    WHEN 0 THEN 'total'
                          WHEN 1 THEN 'CreateEvent'
                          WHEN 2 THEN 'PushEvent'
                          WHEN 3 THEN 'IssuesEvent'
                          END
            ,SUM(count_user) OVER ( ORDER BY level DESC )
    FROM
        level_detail
    GROUP BY
        level,
        count_user
    ORDER BY
        level ASC;