本文以GitHub公开事件数据为例,为您介绍使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(本文以DataV为例),实现海量数据实时分析的通用架构与核心步骤。
示例架构
搭建实时数仓时,Flink可对待处理数据进行实时清洗,完成后Hologres可直接读取Flink中的数据,并对接BI分析工具将数据实时展示在大屏中,示例架构如图所示。
其中:
待处理数据:
本实践使用GitHub公开事件作为示例数据,更多关于数据集的介绍请参见业务与数据认知。
实时清洗:
Flink是流式计算引擎。Flink与Hologres原生深度集成,支持高吞吐、低延时、有模型、高质量的实时数仓开发。
数据服务:
Hologres是兼容PostgreSQL协议的实时数仓引擎,支持海量数据实时写入与更新、实时数据写入即可查。
实时大屏:
本实践以DataV为例,为您展示搭建实时大屏后查看并分析数据的效果。
实践步骤
准备工作
本实践使用已存储在Flink中的Github公共事件作为示例数据,因此您无需操作数据集成步骤,可使用Hologres直接读取Flink中的示例数据。本实践需准备的环境如下所示。
创建Hologres内部表
您需要先创建一个Hologres内部表并创建相应的索引,用于后续数据实时写入。本实践以实时写入Github公开事件中今日最活跃项目数据为例,需提前创建的内部表示例代码如下。
-- 新建schema用于创建内表并导入数据
CREATE SCHEMA IF NOT EXISTS hologres_dataset_github_event;
DROP TABLE IF EXISTS hologres_dataset_github_event.hologres_github_event;
BEGIN;
CREATE TABLE hologres_dataset_github_event.hologres_github_event (
id text,
created_at_bigint bigint,
created_at timestamp with time zone NOT NULL,
type text,
actor_id text,
actor_login text,
repo_id text,
repo_name text,
org text,
org_login text
);
CALL set_table_property ('hologres_dataset_github_event.hologres_github_event', 'distribution_key', 'id');
CALL set_table_property ('hologres_dataset_github_event.hologres_github_event', 'event_time_column', 'created_at');
CALL set_table_property ('hologres_dataset_github_event.hologres_github_event', 'clustering_key', 'created_at');
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.id IS '事件ID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.created_at_bigint IS '事件发生时间戳';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.created_at IS '事件发生时间';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.type IS '事件类型';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.actor_id IS '事件发起人ID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.actor_login IS '事件发起人登录名';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.repo_id IS 'repoID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.repo_name IS 'repo名称';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.org IS 'repo所属组织ID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.org_login IS 'repo所属组织名称';
COMMIT;
通过Flink实时写入数据至Hologres
如果您申请了免费试用版的Flink,可参见教程文档使用内置公开数据集快速体验实时计算Flink版进入Flink SQL作业编辑运行页面。本实践的Flink SQL作业示例代码如下,其中SLS的project、endpoint、logstore等参数值请参见教程文档使用内置公开数据集快速体验实时计算Flink版,填入对应Region的固定值。
CREATE TEMPORARY TABLE sls_input (
id STRING, -- 每个事件的唯一ID。
created_at BIGINT, -- 事件时间,单位秒。
created_at_ts as TO_TIMESTAMP(created_at * 1000), -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
type STRING, -- Github事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
actor_id STRING, -- Github用户ID。
actor_login STRING, -- Github用户名。
repo_id STRING, -- Github仓库ID。
repo_name STRING, -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
org STRING, -- Github组织ID。
org_login STRING -- Github组织名,如: apache,google,alibaba等。
)
WITH (
'connector' = 'sls', -- 实时采集的Github事件存放在阿里云SLS中。
'project' = '<yourSlsProject>', -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
'endPoint' = '<yourSlsEndpoint>', -- 公开数据仅限VVP通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore。
'accessId' = '****', -- 仅有内置数据集只读权限的AK,参见Flink教程。
'accessKey' = '****', -- 仅有内置数据集只读权限的SK,参见Flink教程。
'batchGetSize' = '500' -- 批量读取数据,每批最多拉取500条。
);
CREATE TEMPORARY TABLE hologres_sink (
id STRING,
created_at_bigint BIGINT,
created_at TIMESTAMP,
type STRING,
actor_id STRING,
actor_login STRING,
repo_id STRING,
repo_name STRING,
org STRING,
org_login STRING
)
WITH (
'connector' = 'hologres',
'dbname' = 'holo db name', --Hologres的数据库名称
'tablename' = 'schema_name.table_name', --Hologres用于接收数据的表名称
'username' = 'access id', --当前阿里云账号的AccessKey ID
'password' = 'access key', --当前阿里云账号的AccessKey Secret
'endpoint' = 'holo vpc endpoint', --当前Hologres实例VPC网络的Endpoint
'jdbcretrycount' = '1', --连接故障时的重试次数
'partitionrouter' = 'true', --是否写入分区表
'createparttable' = 'true', --是否自动创建分区
'mutatetype' = 'insertorignore' --数据写入模式
);
INSERT INTO hologres_sink
SELECT
*
FROM
sls_input
WHERE
id IS NOT NULL
AND created_at_ts IS NOT NULL
AND created_at_ts >= date_add(CURRENT_DATE, - 1);
查询实时数据
在Hologres中通过内部表查询今日最活跃项目。
SELECT
repo_name,
COUNT(*) AS events
FROM
hologres_dataset_github_event.hologres_github_event
WHERE
created_at >= CURRENT_DATE
GROUP BY
repo_name
ORDER BY
events DESC
LIMIT 5;
(可选)通过DataV搭建实时大屏
您可以通过DataV的数据大屏模板,基于Hologres数据源来快速搭建GitHub事件数据实时大屏。
创建Hologres数据源。
将数据所在的Hologres实例和数据库创建为DataV的数据源,详情请参见DataV。
创建可视化应用。
登录DataV控制台。
在工作台页面,单击创建PC端看板。
选择使用Hologres实时分析GitHub事件数据模板。
修改模板中相关组件的数据源。
以左上角的今日公开事件总数为例:
单击今日公开事件总数对应的数字框,点击右侧数据源,选择数据源类型为实时数仓Hologres。
选择已有数据源为您已创建的数据源。
如果您在Hologres中的表名和Schema与本实践相同,则不需修改SQL。
修改完成后,数据响应结果刷新,大屏中成功展示实时数据。
按照示例更新大屏中的数据源和表名,需更新组件及更新后效果如下图所示。
单击右上角发布,完成大屏搭建。
您也可以单击预览,预览实时更新的数据大屏。