本文将为您介绍使用Flink、Paimon和Hologres搭建流式湖仓分析系统,实时分析GitHub公开事件行为数据。该系统使用Flink完成GitHub公开行为事件以Paimon格式实时入湖,并使用Hologres SQL完成实时探查,通过Hologres Dynamic Table完成湖上数据实时入仓,并进行仓内数据聚合分析和漏斗分析。
背景信息
OpenLake简介
阿里云OpenLake解决方案建立在可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。基于OSS的公共湖仓,结合元数据管理平台DLF,支持结构化、半结构化及非结构化数据的管理,确保数据表和文件的安全访问,并具备增删改查与IO加速能力。该方案支持大数据、搜索和AI多引擎对接,实现引擎平台协同计算。通过DataWorks一体化IDE或Notebook,用于统一进行多引擎SQL或Python开发,享受多任务可视化调度与大规模模型并发执行的保障。
客户可以便捷地建立OpenLake湖仓,跨不同计算引擎进行数据操作,并通过构建模型索引,实现搜索和RAG能力的数据透出。在用户开放环境中,用户可结合AI特征工程、模型训练和在线预测,全面提升数据处理和分析效能。
本方案通过与以下产品整合,完成流式湖仓分析系统的搭建。
产品名称 | 产品简介 |
阿里云提供的全托管Serverless Flink云服务,具备实时应用的作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。 | |
阿里巴巴自主研发的一站式实时湖仓引擎(Real-Time Data Lakehouse),支持海量数据实时写入、加工与分析,支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),支持近实时湖仓数据交互式分析及联邦查询,为用户提供离在线一体、分析服务一体、湖仓一体的全栈数仓解决方案。 | |
一种流批统一的湖存储格式,结合Flink及Spark构建流批处理的实时湖仓一体架构。Paimon创新的结合湖格式与LSM技术,给数据湖带来了实时流更新以及完整的流处理能力。 | |
一款全托管的快速帮助用户构建云上数据湖及Lakehouse的服务,为客户提供了统一的元数据管理、统一的权限与安全管理、便捷的数据入湖能力以及一键式数据探索能力。DLF可以帮助用户快速完成云原生数据湖及Lakehouse方案的构建与管理,并可无缝对接多种计算引擎,打破数据孤岛,洞察业务价值。 | |
智能湖仓一体数据开发治理平台,内置阿里巴巴15年大数据建设方法论,深度适配阿里云MaxCompute、E-MapReduce、Hologres、Flink、PAI 等数十种大数据和AI计算服务,为数据仓库、数据湖、OpenLake湖仓一体数据架构提供智能化ETL开发、数据分析与主动式数据资产治理服务,助力“Data+AI”全生命周期的数据管理。 |
环境准备
您需要开通阿里云实时数仓Hologres、实时计算Flink、湖仓一体数据开发治理平台DataWorks、数据湖构建Data Lake Formation四款产品。
上述产品在阿里云官网均提供免费或低价试用,如您符合试用条件,可前往免费试用专区或对应产品详情页试用入口来申请试用。
实时数仓Hologres
单击立即购买。
在购买页面,配置相关参数。其他选项保持默认或根据需要选择。
参数类型
说明
商品类型
选择独享实例(按量付费)。
地域
选择华东1(杭州)。
可用区
选择可用区J。
专有网络(VPC)
选择与阿里云云服务位于同一地域和同一VPC,进行内网互连。首次需要创建,详情请参见创建和管理专有网络。
专有网络交换机
交换机的选择对连通性和性能没有特殊影响。首次需要创建,详情请参见创建和管理交换机。
实例名称
自定义填写。
服务关联角色
首次购买Hologres,需在购买页底部单击创建服务关联角色。
单击立即购买,确认订单后完成开通。
实例开通成功后,进入实例列表页,单击数据湖加速,开启数据湖能力。
单击所创建实例ID进入实例详情页。
单击数据库管理进入HoloWeb。
选择
。在新建库页面,输入表名
db_demo_kmx
。选择SPM简单权限模型,单击确认完成数据库创建。
实时计算Flink版
数据湖构建(DLF)
登录DLF控制台。
完成云资源访问授权、开通数据湖构建。
在Catalog列表页面,单击新建Catalog。
输入相关信息,勾选服务协议,并单击创建Catalog。
在Catalog列表页面查看已创建Catalog。
DataWorks
开通DataWorks服务,地域选择华东1(杭州)。详情请参见开通DataWorks服务。
登录DataWorks控制台。
选择工作空间,在工作空间列表单击创建工作空间。详情请参见创建工作空间。
相关参数配置如下,其他选项保持默认或根据需要选择。
参数名称
说明
工作空间名称
自定义个人空间名称。
参加数据开发(新版)公测
打开开关。
默认资源组配置
没有独享资源组,可单击新建资源组。
在绑定计算资源页面,选择Hologres,单击绑定Hologres计算资源来绑定Hologres计算资源。
相关参数配置如下,其他选项保持默认或根据需要选择。
参数名称
说明
Hologres实例
下拉选择上文创建的实例。
数据库名称
下拉选择内部DB,如上文创建的openlake_db。
默认访问身份
选择主账号访问。
计算资源实例名
输入
hologres_datasource
。您也可以自定义设置。在绑定计算资源页面,选择全托管Flink,单击绑定全托管Flink计算资源来绑定Flink计算资源。
相关参数配置如下,其他选项保持默认或根据需要选择。
参数名称
说明
Flink工作空间
下拉选择上文创建的工作空间。
Flink项目空间
下拉选择默认项目空间。
默认部署目标
下拉选择
default-queue
。计算资源实例名
输入
flink_datasource
。您也可以自定义设置。选择
,在工作空间列表页面,查看已创建工作空间。
GitHub公开行为事件实时入湖
在DataWorks中新建Notebook
DataWorks Notebook是智能化交互式数据开发和分析工具,能够面向多种数据引擎开展SQL或Python分析,即时运行或调试代码,获取可视化数据结果。同时,DataWorks Notebook能够与其他任务节点混合编排为工作流,提交至调度系统运行,助力复杂业务场景的灵活实现。
登录DataWorks控制台。
选择
。选择
,单击。输入Notebook名称,敲击键盘回车键或单击页面空白位置,使Notebook名称生效。
选择
。进入新建实例页面,配置相关参数。
参数名称
说明
实例名称
自定义设置。
资源组
选择资源组。如果没有对应资源组,您可以直接新建资源组。
专有网络
选择与阿里云云服务位于同一地域和同一VPC。
安全组
选择安全组。如果没有对应资源组,您可以直接新建安全组。
交换机
选择交换机。如果没有对应交换机,您可以直接新建交换机。
实例RAM角色
选择DataWorks默认角色。
单击确定,完成实例创建。
单击Notebook名称,即可打开并进入Notebook编辑页面,进行后续步骤。
使用Hologres在DLF中创建DB和表
登录DataWorks控制台,完成Notebook新建。
单击目标Notebook名称,进入编辑页面。
单击,Cell类型选择Hologres SQL,选择上文绑定的计算资源。
输入SQL,单击,创建External Database来映射上文创建的DLF Catalog。
CREATE EXTERNAL DATABASE ext_db_dlf WITH metastore_type 'dlf-paimon' catalog_type 'paimon' dlf_region 'cn-hangzhou' dlf_endpoint 'dlfnext-share.cn-hangzhou.aliyuncs.com' dlf_catalog 'clg-paimon-d24*********'; --修改为自己在DLF中创建的Paimon Catalog的ID
新建SQL Cell,在上文创建的EXTERNAL DATABASE
ext_db_dlf
数据库中创建External Schema,创建后DLF Catalog可以看到名为github_events
的Schema。CREATE EXTERNAL SCHEMA IF NOT EXISTS ext_db_dlf.github_events;
新建SQL Cell,在上文创建的EXTERNAL SCHEMA
github_events
下创建External Table,同样,DLF对应位置可以看到此表。CREATE EXTERNAL TABLE ext_db_dlf.github_events.gh_event_ods( id TEXT, -- 每个事件的唯一ID。 created_at BIGINT, -- 事件时间,单位秒。 type TEXT, -- GitHub事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。 actor_id TEXT, -- GitHub用户ID。 actor_login TEXT, -- GitHub用户名。 repo_id TEXT, -- GitHub仓库ID。 repo_name TEXT, -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。 org TEXT, -- GitHub组织ID。 org_login TEXT, -- GitHub组织名,如: apache,google,alibaba等。 PRIMARY KEY(id) ) WITH ( "changelog-producer"='input', "bucket"=6, "bucket-key"='id' );
使用Flink完成GitHub公开行为事件实时入湖
登录Flink控制台。
选择
。进入配置Catalog页面。单击确定完成创建。
说明metastore:dlf。
catalog name:上文创建的Paimon Catalog名称。
登录DataWorks控制台。在目标Notebook中新建SQL Cell,输入SQL创建Flink作业来实时摄取SLS中的数据。
说明为方便您更好地体验实时计算Flink版产品服务,本文代码示例中提供了仅有只读权限的AK供您试用,涉及的AK仅可用于本文示例中读取样例数据,真实数据生产环境请替换为您自己的数据源AK。
-- @conf executionMode = STREAMING -- @conf name = flink_vvp_job_github_events -- @conf engineVersion=vvr-8.0.9-flink-1.17 -- @conf flinkConf."execution.checkpointing.interval"=10second -- @conf flinkConf."execution.checkpointing.min-pause"=10second -- 创建一张临时表来实时摄取sls中的数据 CREATE TEMPORARY TABLE sls_input ( id STRING, -- 每个事件的唯一ID。 created_at BIGINT, -- 事件时间,单位秒。 type STRING, -- GitHub事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。 actor_id STRING, -- GitHub用户ID。 actor_login STRING, -- GitHub用户名。 repo_id STRING, -- GitHub仓库ID。 repo_name STRING, -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。 org STRING, -- GitHub组织ID。 org_login STRING -- GitHub组织名,如: apache,google,alibaba等。 ) WITH ( 'connector' = 'sls', -- 实时采集的GitHub事件存放在阿里云SLS中 'project' = 'github-events-hangzhou', -- 存放公开数据的SLS项目。需改为您的实际地域信息 'endPoint' = 'https://cn-hangzhou-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以杭州为例,您需要修改为您的实际地域信息。 'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore 'accessId' = 'LTAI5tNF1rP8PKVyYjr9TKgh', -- 访问公开数据集的只读AK,无需替换 'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP' -- 访问公开数据集的只读secret,无需替换 ); -- 将临时表数据写入Hologres中创建的Paimon格式的External Table INSERT INTO ext_db_dlf.github_events.gh_event_ods SELECT * FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND TO_TIMESTAMP(created_at * 1000) >= date_add(CURRENT_DATE,-1);
新建SQL Cell,Cell类型选择Hologres SQL,输入SQL查询Flink实时写入的数据。
-- 查看数据 SELECT * FROM ext_db_dlf.github_events.gh_event_ods limit 10;
使用Hologres完成湖上数据实时入仓以及聚合分析
使用Hologres Dynamic Table完成湖上数据实时入仓
Dynamic Table是一种声明式数据处理架构,可以自动处理并存储一个或者多个基表对象的数据聚合结果,内置不同的数据刷新策略,业务可以根据业务需求设置不同的数据刷新策略,实现数据从基表对象到Dynamic Table的自动流转,满足业务统一开发、数据自动流转、处理时效性等诉求。
登录DataWorks控制台。在目标Notebook编辑页面单击+SQL,Cell类型选择Hologres SQL,输入SQL创建Dynamic Table。
DROP TABLE IF EXISTS hologres_github_event ;
CREATE DYNAMIC TABLE hologres_github_event
WITH (
auto_refresh_enable='true',
refresh_mode = 'incremental',
incremental_auto_refresh_schd_start_time='immediate',
incremental_auto_refresh_interval = '1 minute',
clustering_key='created_at_tz',
distribution_key='id'
) AS
SELECT
id::BIGINT ,
TO_TIMESTAMP(created_at)::TIMESTAMPTZ AS created_at_tz,
type::TEXT, -- GitHub事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
actor_id::TEXT, -- GitHub用户ID。
actor_login::TEXT, -- GitHub用户名。
repo_id::TEXT, -- GitHub仓库ID。
repo_name::TEXT, -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
org::TEXT, -- GitHub组织ID。
org_login::TEXT,
to_char(TO_TIMESTAMP(created_at),'YYYY-MM-DD')::text AS ds
FROM ext_db_dlf.github_events.gh_event_ods;
对于仓内数据进行聚合分析
登录DataWorks控制台。在目标Notebook编辑页面单击+SQL,Cell类型选择Hologres SQL,输入SQL来分析GitHub行为事件。
查询今日最活跃项目。
SELECT repo_name, COUNT(*) AS events FROM hologres_github_event WHERE created_at_tz >= CURRENT_DATE GROUP BY repo_name ORDER BY events DESC LIMIT 10;
查询1天内最活跃(事件数最多)的几位开发者。
SELECT actor_login, COUNT(*) AS events FROM hologres_github_event WHERE created_at_tz >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 10;
查询过去1天最多的10项操作。
SELECT type, count(*) total FROM hologres_github_event WHERE created_at_tz > now() - interval '1 day' GROUP BY 1 ORDER BY total DESC LIMIT 10;
查看当天漏斗转化情况。
CREATE EXTENSION IF NOT EXISTS flow_analysis; --开启Extension -- 查看当天的转换漏斗 WITH level_detail AS ( SELECT level, COUNT(1) AS count_user FROM ( SELECT actor_id, windowFunnel (1800, 'default', created_at_tz, type = 'CreateEvent', type = 'PushEvent',type = 'IssuesEvent') AS level FROM hologres_github_event WHERE created_at_tz >= now() - interval '1 day' AND created_at_tz < now() GROUP BY actor_id) AS basic_table GROUP BY level ORDER BY level ASC ) SELECT CASE level WHEN 0 THEN 'total' WHEN 1 THEN 'CreateEvent' WHEN 2 THEN 'PushEvent' WHEN 3 THEN 'IssuesEvent' END ,SUM(count_user) OVER ( ORDER BY level DESC ) FROM level_detail GROUP BY level, count_user ORDER BY level ASC;