OpenLake:Flink+Paimon+Hologres一体化实时湖仓分析

本文将为您介绍使用Flink、Paimon和Hologres搭建实时湖仓分析系统,并对GitHub公开事件行为数据进行实时分析。本系统利用Flink处理GitHub公开行为事件数据,并将数据以Paimon格式写入数据湖。 系统同时使用Hologres SQL进行实时数据探查,并通过Hologres Dynamic Table将湖上数据实时同步至数据仓库,最终在数据仓库中进行聚合分析和漏斗分析。

方案介绍

阿里云OpenLake解决方案建立在可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。基于OSS的公共湖仓,结合元数据管理平台DLF,支持结构化、半结构化及非结构化数据的管理,确保数据表和文件的安全访问,并具备增删改查与IO加速能力。该方案支持大数据、搜索和AI多引擎对接,实现引擎平台协同计算。通过DataWorks的一体化开发环境(IDE)或Notebook,用户可以统一进行多引擎SQL和Python的开发工作,同时享受多任务的可视化调度和大规模模型的并发执行保障。用户可以便捷地建立OpenLake湖仓,跨不同计算引擎进行数据操作,并通过构建模型索引,实现搜索和RAG能力的数据透出。在用户开放环境中,用户可结合AI特征工程、模型训练和在线预测,全面提升数据处理和分析效能。

本方案通过与以下产品整合,完成流式湖仓分析系统的搭建。

产品名称

产品简介

实时计算Flink版

阿里云提供的全托管Serverless Flink云服务,具备实时应用的作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。关于产品详情,请参见实时计算Flink版

实时数仓Hologres

阿里巴巴自主研发的一站式实时湖仓引擎(Real-Time Data Lakehouse),支持海量数据实时写入、加工与分析,支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),支持近实时湖仓数据交互式分析及联邦查询,为用户提供离在线一体、分析服务一体、湖仓一体的全栈数仓解决方案。关于产品详情,请参见实时数仓Hologres

Apache Paimon

一种流批统一的湖存储格式,结合Flink及Spark构建流批处理的实时湖仓一体架构。Paimon创新性地结合湖格式与LSM技术,给数据湖带来了实时流更新以及完整的流处理能力。详情请参见Apache Paimon

数据湖构建(DLF)

一款全托管的快速帮助用户构建云上数据湖及Lakehouse的服务,为客户提供了统一的元数据管理、统一的权限与安全管理、便捷的数据入湖能力以及一键式数据探索能力。DLF可以帮助用户快速完成云原生数据湖及Lakehouse方案的构建与管理,并可无缝对接多种计算引擎,打破数据孤岛,洞察业务价值。关于产品详情,请参见数据湖构建

DataWorks

智能湖仓一体数据开发治理平台,内置阿里巴巴15年大数据建设方法论,深度适配阿里云MaxCompute、E-MapReduce、Hologres、Flink、PAI 等数十种大数据和AI计算服务,为数据仓库、数据湖、OpenLake湖仓一体数据架构提供智能化ETL开发、数据分析与主动式数据资产治理服务,助力“Data+AI”全生命周期的数据管理。关于产品详情,请参见DataWorks

环境准备

您需要开通阿里云实时数仓Hologres、实时计算Flink、湖仓一体数据开发治理平台DataWorks、数据湖构建Data Lake Formation四款产品。

说明

上述产品在阿里云官网均提供免费或低价试用,如您符合试用条件,可前往免费试用专区或对应产品详情页试用入口来申请试用。

创建专有网络VPC和交换机

您需要创建专有网络VPC和交换机,为该系统构建云上的私有网络。

  1. 登录专有网络管理控制台

  2. 在顶部菜单栏,选择华东1(杭州)地域。

  3. 在左侧导航栏,单击专有网络

  4. 专有网络页面,单击创建专有网络

  5. 创建专有网络页面,配置1个专有网络和1台交换机。

    项目

    说明

    示例值

    VPC名称

    建议您在部署过程中新建一个VPC作为本方案的专有网络。部署过程中填写VPC名称即可创建对应名称的VPC。

    长度为2~128个字符,以英文大小写字母或中文开头,可包含数字、下划线(_)和连字符(-)。

    VPC_Openlake

    IPv4网段

    在创建VPC时,您必须按照无类域间路由块(CIDR block)的格式为您的专有网络划分私网网段。阿里云VPC支持的网段信息请参见组成部分

    在网络规划时可以按照管理网段-开发网段-测试网段-生产网段等规则做好规划。网段一旦投入使用,调整过程复杂,因此规划十分重要。

    192.168.0.0/16

    交换机名称

    建议您在部署过程中在新建的VPC内创建虚拟交换机。部署过程中填写交换机名称即可创建对应名称的虚拟交换机。

    长度为2~128个字符,以英文大小写字母或中文开头,可包含数字、下划线(_)和连字符(-)。

    vsw_Openlake

    可用区

    建议选择排序靠后的可用区,一般此类可用区较新。新的可用区资源更充沛,新规格也会在新的可用区优先上线。

    可用区 J

    IPv4网段

    每台虚拟交换机需要一个IPv4网段

    192.168.1.0/24

实时数仓Hologres

  1. 登录Hologres管理控制台,在实例列表页面,单击新增实例引擎

  2. 在购买页面,按照以下配置项完成相应参数配置。其他选项保持默认或根据需要选择。

    参数类型

    说明

    商品类型

    选择独享实例(按量付费)

    实例类型

    选择计算组型

    地域

    选择华东1(杭州)

    可用区

    选择可用区J

    专有网络(VPC)

    选择之前已创建的专有网络VPC。

    专有网络交换机

    选择之前已创建的专有网络交换机。

    实例名称

    自定义填写。

    服务关联角色

    首次购买Hologres,需在购买页底部单击创建服务关联角色

    • 角色名称:AliyunServiceRoleForHologresIdentityMgmt

    • 权限说明:Hologres使用此角色来访问您在其他云产品中的资源。

    资源组

    选择默认资源组

  3. 单击立即购买,然后根据页面提示完成开通。

  4. 实例开通成功后,进入实例列表页,单击数据湖加速,开启数据湖能力。

    image

  5. 创建Hologres数据库。

    1. 单击刚创建的实例名称,进入实例详情页面,然后单击右上方的登录实例

    2. HoloWeb页面,选择元数据管理 > 新建库

    3. 新建数据库对话框中配置如下参数,并单击确认

      配置项

      说明

      实例名

      选择在刚创建的Hologres实例上创建数据库。

      数据库名称

      本示例数据库名称设置为db_demo_kmx

      权限策略

      选择SPM。更多关于权限策略的说明,请参见:

      • SPM:简单权限模型,该权限模型授权是以DB为粒度,划分admin(管理员)、developer(开发者)、writer(读写者)以及viewer(分析师)四种角色,您可以通过少量的权限管理函数,即可对DB中的对象进行方便且安全的权限管理。

      • SLPM:基于Schema级别的简单权限模型,该权限模型以Schema为粒度,划分 <db>.admin(DB管理员)、<db>.<schema>.developer(开发者)、<db>.<schema>.writer(读写者)以及 <db>.<schema>.viewer(分析师),相比于简单权限模型更为细粒度。

      • 专家:Hologres兼容PostgreSQL,使用与Postgres完全一致的权限系统。

      立即登录

      选择

实时计算Flink版

  1. 登录Flink控制台

  2. 单击Flink全托管旁的立即购买

    说明

    首次购买时,需要您授予Flink访问相应云资源的权限。

  3. 在购买页面,配置相关参数。其他选项保持默认或根据需要选择。

    参数类型

    说明

    付费模式

    选择按量付费

    地域

    选择华东1(杭州)

    可用区

    选择可用区J

    专有网络

    选择之前已创建的专有网络VPC。

    虚拟交换机

    选择之前已创建的专有网络交换机。

    工作空间名称

    自定义填写。

  4. 单击确认订单并支付,即可开通成功。

  5. 单击管理控制台,即可看到正在创建的工作空间。

    image

数据湖构建(DLF)

  1. 登录DLF控制台

  2. 完成云资源访问授权开通数据湖构建

  3. Catalog列表页面,单击新建Catalog

  4. 输入相关信息,勾选服务协议,并单击创建Catalog

  5. Catalog列表页面查看已创建Catalog。

    image

DataWorks

  1. 开通DataWorks服务,地域选择华东1(杭州)。详情请参见开通DataWorks服务

  2. 登录DataWorks控制台

  3. 单击左侧导航栏工作空间,在工作空间列表页面单击创建工作空间。详情请参见创建工作空间

    相关参数配置如下,其他选项保持默认或根据需要选择。

    参数名称

    说明

    工作空间名称

    自定义个人空间名称。

    参加数据开发(新版)公测

    打开开关。

    默认资源组配置

    选择默认资源组。

    • 开通DataWorks服务时,可以创建资源组。

    • 首次开通服务后,会提供一个新的默认资源组。用户无需单独创建。

  4. 绑定计算资源页面,选择Hologres,单击绑定Hologres计算资源进行资源绑定。

    相关参数配置如下,其他选项保持默认或根据需要选择。

    参数名称

    说明

    Hologres实例

    下拉选择上文创建的实例。

    数据库名称

    下拉选择内部DB,如上文创建的openlake_db。

    默认访问身份

    选择主账号访问。

    计算资源实例名

    输入hologres_datasource。您也可以自定义设置。

  5. 绑定计算资源页面,选择全托管Flink,单击绑定全托管Flink计算资源进行资源绑定。

    相关参数配置如下,其他选项保持默认或根据需要选择。

    参数名称

    说明

    Flink工作空间

    下拉选择上文创建的工作空间。

    Flink项目空间

    下拉选择默认项目空间。

    默认部署目标

    下拉选择default-queue

    计算资源实例名

    输入flink_datasource。您也可以自定义设置。

  6. 选择下一步 > 完成,在工作空间列表页面,查看已创建工作空间。

    image

GitHub公开行为事件实时入湖

在DataWorks中新建Notebook

说明

DataWorks Notebook是智能化交互式数据开发和分析工具,能够面向多种数据引擎开展SQL或Python分析,即时运行或调试代码,获取可视化数据结果。同时,DataWorks Notebook能够与其他任务节点混合编排为工作流,提交至调度系统运行,助力复杂业务场景的灵活实现。

  1. 登录DataWorks控制台

  2. 单击控制台左侧导航栏数据开发与治理 > 数据开发 > 进入数据开发(新版)进入DataWorks数据开发页面

  3. 单击左侧个人目录标题栏中image图标,新建Notebooks。

    image

  4. 输入Notebook名称,按下回车键或单击页面空白位置,使Notebook名称生效。

  5. 单击页面顶部个人开发环境 > 前往新建

    image

  6. 进入新建实例面板,配置相关参数。

    参数名称

    说明

    实例名称

    自定义设置。

    资源组

    选择资源组。如果没有对应资源组,您可以直接新建资源组

    专有网络

    选择与阿里云云服务位于同一地域和同一VPC。

    安全组

    选择安全组。如果没有对应资源组,您可以直接新建安全组

    交换机

    选择交换机。如果没有对应交换机,您可以直接新建交换机

    实例RAM角色

    选择DataWorks默认角色。

  7. 单击确定,完成实例创建。

  8. 单击Notebook名称,即可打开并进入Notebook编辑页面,进行后续步骤。

使用Hologres在DLF中创建DB和表

  1. 登录DataWorks控制台,完成Notebook新建。

  2. 单击目标Notebook名称,进入编辑页面。

  3. 单击imageCell类型选择Hologres SQL,选择上文绑定的计算资源。

    image

  4. 输入SQL,单击image,创建External Database来映射上文创建的DLF Catalog。

    CREATE EXTERNAL DATABASE ext_db_dlf
    WITH 
      metastore_type 'dlf-paimon' 
      catalog_type 'paimon' 
      dlf_region 'cn-hangzhou' 
      dlf_endpoint 'dlfnext-share.cn-hangzhou.aliyuncs.com' 
      dlf_catalog 'clg-paimon-d24*********'; 
     --修改为自己在DLF中创建的Paimon Catalog的ID
  5. 新建SQL Cell,在上文创建的EXTERNAL DATABASEext_db_dlf数据库中创建External Schema,创建后DLF Catalog可以看到名为github_events的Schema。

    CREATE EXTERNAL SCHEMA IF NOT EXISTS ext_db_dlf.github_events;
  6. 新建SQL Cell,在上文创建的EXTERNAL SCHEMAgithub_events 下创建External Table,同样,DLF对应位置可以看到此表。

    CREATE EXTERNAL TABLE ext_db_dlf.github_events.gh_event_ods(
        id TEXT,                                        -- 每个事件的唯一ID。
        created_at BIGINT,                              -- 事件时间,单位秒。
        type TEXT,                                      -- GitHub事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
        actor_id TEXT,                                  -- GitHub用户ID。
        actor_login TEXT,                               -- GitHub用户名。
        repo_id TEXT,                                   -- GitHub仓库ID。
        repo_name TEXT,                                 -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
        org TEXT,                                       -- GitHub组织ID。
        org_login TEXT,                                 -- GitHub组织名,如: apache,google,alibaba等。
        PRIMARY KEY(id)
    ) WITH (
      "changelog-producer"='input',
      "bucket"=6,
      "bucket-key"='id'
    );

使用Flink完成GitHub公开行为事件实时入湖

  1. 登录Flink控制台

  2. 选择元数据管理 > 创建Catalog > Apache Paimon > 下一步

  3. 进入配置Catalog页面。单击确定完成创建。

    说明

    metastore:dlf。

    catalog name:上文创建的Paimon Catalog名称。

  4. 登录DataWorks控制台。在目标Notebook中新建SQL Cell,输入SQL创建Flink作业来实时摄取SLS中的数据。

    说明

    为方便您更好地体验实时计算Flink版产品服务,本文代码示例中提供了仅有只读权限的AK供您试用,涉及的AK仅可用于本文示例中读取样例数据,真实数据生产环境请替换为您自己的数据源AK。

    -- @conf executionMode = STREAMING
    -- @conf name = flink_vvp_job_github_events
    -- @conf engineVersion=vvr-8.0.9-flink-1.17
    -- @conf flinkConf."execution.checkpointing.interval"=10second
    -- @conf flinkConf."execution.checkpointing.min-pause"=10second
    
    -- 创建一张临时表来实时摄取sls中的数据
    CREATE TEMPORARY TABLE sls_input (
        id STRING, -- 每个事件的唯一ID。
        created_at BIGINT, -- 事件时间,单位秒。
        type STRING, -- GitHub事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
        actor_id STRING, -- GitHub用户ID。
        actor_login STRING, -- GitHub用户名。
        repo_id STRING, -- GitHub仓库ID。
        repo_name STRING, -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
        org STRING, -- GitHub组织ID。
        org_login STRING -- GitHub组织名,如: apache,google,alibaba等。
    ) 
    WITH (
        'connector' = 'sls', -- 实时采集的GitHub事件存放在阿里云SLS中
        'project' = 'github-events-hangzhou', -- 存放公开数据的SLS项目。需改为您的实际地域信息
        'endPoint' = 'https://cn-hangzhou-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以杭州为例,您需要修改为您的实际地域信息。
        'logStore' = 'realtime-github-events',           -- 存放公开数据的SLS logStore
        'accessId' =  'LTAI5tNF1rP8PKVyYjr9TKgh',        -- 访问公开数据集的只读AK,无需替换     
        'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP'   -- 访问公开数据集的只读secret,无需替换
    );
    
    -- 将临时表数据写入Hologres中创建的Paimon格式的External Table
    
    INSERT INTO ext_db_dlf.github_events.gh_event_ods
    SELECT * FROM sls_input 
    WHERE id IS NOT NULL 
    AND created_at IS NOT NULL 
    AND TO_TIMESTAMP(created_at * 1000) >= date_add(CURRENT_DATE,-1);
  5. 新建SQL Cell,Cell类型选择Hologres SQL输入SQL查询Flink实时写入的数据。

    -- 查看数据
    SELECT * FROM ext_db_dlf.github_events.gh_event_ods limit 10;

使用Hologres完成湖上数据实时入仓以及聚合分析

使用Hologres Dynamic Table完成湖上数据实时入仓

Dynamic Table是一种声明式数据处理架构,可以自动处理并存储一个或者多个基表对象的数据聚合结果,内置不同的数据刷新策略,业务可以根据业务需求设置不同的数据刷新策略,实现数据从基表对象到Dynamic Table的自动流转,满足业务统一开发、数据自动流转、处理时效性等诉求。

登录DataWorks控制台。在目标Notebook编辑页面单击+SQLCell类型选择Hologres SQL输入SQL创建Dynamic Table。

DROP TABLE IF EXISTS hologres_github_event ;

CREATE DYNAMIC TABLE hologres_github_event 
WITH (
    auto_refresh_enable='true',
    refresh_mode = 'incremental',
    incremental_auto_refresh_schd_start_time='immediate',
    incremental_auto_refresh_interval = '1 minute',
    clustering_key='created_at_tz',
    distribution_key='id'  
) AS
SELECT 
    id::BIGINT ,
    TO_TIMESTAMP(created_at)::TIMESTAMPTZ AS created_at_tz,
    type::TEXT,                                      -- GitHub事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
    actor_id::TEXT,                                  -- GitHub用户ID。
    actor_login::TEXT,                               -- GitHub用户名。
    repo_id::TEXT,                                   -- GitHub仓库ID。
    repo_name::TEXT,                                 -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
    org::TEXT,                                       -- GitHub组织ID。
    org_login::TEXT,
    to_char(TO_TIMESTAMP(created_at),'YYYY-MM-DD')::text AS ds
FROM ext_db_dlf.github_events.gh_event_ods;

对于仓内数据进行聚合分析

登录DataWorks控制台。在目标Notebook编辑页面单击+SQLCell类型选择Hologres SQL输入SQL来分析GitHub行为事件。

  • 查询今日最活跃项目。

    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        hologres_github_event
    WHERE
        created_at_tz >= CURRENT_DATE 
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 10;
  • 查询1天内最活跃(事件数最多)的几位开发者。

    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        hologres_github_event
    WHERE
        created_at_tz >= now() - interval '1 day'
        AND actor_login NOT LIKE '%[bot]'
    GROUP BY
        actor_login
    ORDER BY
        events DESC
    LIMIT 10;
  • 查询过去1天最多的10项操作。

    SELECT
        type,
        count(*) total
    FROM
        hologres_github_event
    WHERE
        created_at_tz > now() - interval '1 day'
    GROUP BY
        1
    ORDER BY
        total DESC
    LIMIT 10;
  • 查看当天漏斗转化情况。

    CREATE EXTENSION IF NOT EXISTS flow_analysis; --开启Extension
    
    -- 查看当天的转换漏斗
    WITH level_detail AS (
        SELECT
            level,
            COUNT(1) AS count_user
        FROM (
            SELECT
                actor_id,
                windowFunnel (1800, 'default', created_at_tz, type = 'CreateEvent', type = 'PushEvent',type = 'IssuesEvent') AS level
            FROM
                hologres_github_event
            WHERE
                created_at_tz >= now() - interval '1 day'
                AND created_at_tz < now()
            GROUP BY
                actor_id) AS basic_table
        GROUP BY
            level
        ORDER BY
            level ASC
    )
    SELECT  CASE level    WHEN 0 THEN 'total'
                          WHEN 1 THEN 'CreateEvent'
                          WHEN 2 THEN 'PushEvent'
                          WHEN 3 THEN 'IssuesEvent'
                          END
            ,SUM(count_user) OVER ( ORDER BY level DESC )
    FROM
        level_detail
    GROUP BY
        level,
        count_user
    ORDER BY
        level ASC;