本文为您介绍如何基于阿里云实时计算Flink版内置的GitHub公开事件样例数据,快速体验实时计算Flink版产品的实时数据分析功能。
注意事项
如果您是实时计算Flink版新用户,可以申请该产品的免费资源包,免费试用体验实时计算Flink版产品的实时数据分析功能。但免费资源包有用量、时长等限制,领取前请务必确认已了解。申领入口请参见实时计算Flink版免费试用。
为方便您更好地体验实时计算Flink版产品服务,本文代码示例中提供了仅有只读权限的AK供您试用,涉及的AK仅可用于本文示例中读取样例数据,真实数据生产环境请替换为您自己的数据源AK。
使用限制
本文示例仅适用于北京、杭州、上海和深圳地域。
步骤一:创建Session集群
登录实时计算控制台。
单击目标工作空间名称对应操作列下的控制台。
在左侧导航栏,单击
。单击创建Session集群。
表格中未提及的参数保持默认值即可,需要配置的参数说明请参见下表。
配置项
说明
配置示例
名称
Session集群名称。
flink-sql-test-session
状态
设置当前集群的期望运行状态:
STOPPED:当集群配置完成后保持停止状态,同样会停止所有在运行中的作业。
RUNNING:当集群配置完成后保持运行状态。
RUNNING
引擎版本
Session集群引擎版本号。
vvr-8.0.9-flink-1.17
Task Managers数量
默认与并行度保持一致。
4
单击创建Session集群。
当Session集群状态(页面上方集群名称旁边)从启动中变为运行中后,您可以进入后续步骤。
步骤二:创建SQL作业
在左侧导航栏,单击
。在页面顶部,单击新建。
单击空白的流作业草稿后,单击下一步。
在新建文件对话框,填写作业配置信息。
作业参数
说明
配置示例
文件名称
作业的名称。
flink-sql-test
存储位置
指定该作业的代码文件所属的文件夹。默认存放在作业草稿目录。
作业开发
引擎版本
选择作业的引擎版本,需要和Session集群引擎版本号一致。
vvr-8.0.9-flink-1.17
单击创建。
步骤三:体验数据实时分析
统计当天各个仓库新增Star数Top 10排行榜。
复制如下代码到作业开发文本编辑区。
-- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。 CREATE TEMPORARY TABLE gh_event( id STRING, -- 每个事件的唯一ID。 created_at BIGINT, -- 事件时间,单位秒。 created_at_ts as TO_TIMESTAMP(created_at*1000), -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。 type STRING, -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。 actor_id STRING, -- Github用户ID。 actor_login STRING, -- Github用户名。 repo_id STRING, -- Github仓库ID。 repo_name STRING, -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。 org STRING, -- Github组织ID。 org_login STRING -- Github组织名,如: apache,google,alibaba等。 ) WITH ( 'connector' = 'sls', -- 实时采集的Github事件存放在阿里云SLS中。 'project' = 'github-events-shenzhen', -- 存放公开数据的SLS项目。本示例以深圳为例,您需要修改为您的实际地域信息。 'endPoint' = 'https://cn-shenzhen-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以深圳为例,您需要修改为您的实际地域信息。 'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore。 'accessId' = 'LTAI5tNF1rP8PKVyYjr9TKgh', -- 只读账号的AK,无需修改。 'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP', -- 只读账号的SK,无需修改。 'batchGetSize' = '500' -- 批量读取数据,每批最多拉取500条。 ); -- 配置开启mini-batch, 每2s处理一次。 SET 'table.exec.mini-batch.enabled'='true'; SET 'table.exec.mini-batch.allow-latency'='2s'; SET 'table.exec.mini-batch.size'='4096'; -- 作业设置4个并发,聚合更快。 SET 'parallelism.default' = '4'; -- 查看当天Github新增star数Top 10仓库。 SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as `date`, repo_name, COUNT(*) as num FROM gh_event WHERE type = 'WatchEvent' AND DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') = DATE_FORMAT(NOW(), 'yyyy-MM-dd') GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), repo_name ORDER BY num DESC LIMIT 10;
按照实际地域配置WITH参数中的project和endPoint(本示例以深圳地域为例),其他参数保持默认值,无需修改。
地域
参数
说明
北京
project
github-events-beijing
endPoint
https://cn-beijing-intranet.log.aliyuncs.com
杭州
project
github-events-hangzhou
endPoint
https://cn-hangzhou-intranet.log.aliyuncs.com
上海
project
github-events-shanghai
endPoint
https://cn-shanghai-intranet.log.aliyuncs.com
深圳
project
github-events-shenzhen
endPoint
https://cn-shenzhen-intranet.log.aliyuncs.com
单击调试。
在弹出的对话框中,选择调试集群为flink-sql-test-session(第一步中创建的Session集群),确认调试数据状态为线上数据后,单击确定。
查看可视化结果。
如果需要继续下一步,请单击结果区域左侧红色停止按钮。
因为这是一个流式作业,作业会持续运行,结果会持续更新。此步骤完成后如果您忘记单击停止按钮且继续了后续示例的执行,可能会出现资源不足无法显示结果的情况。这时,您需要在Session集群页面先停止Session集群,再启动Session集群至运行状态。
查看某组织近7天的开发者活跃度实时趋势
复制如下代码到作业开发文本编辑区。
-- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。 CREATE TEMPORARY TABLE gh_event( id STRING, -- 每个事件的唯一ID。 created_at BIGINT, -- 事件时间,单位秒。 created_at_ts as TO_TIMESTAMP(created_at*1000), -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。 type STRING, -- Github事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。 actor_id STRING, -- Github用户ID。 actor_login STRING, -- Github用户名。 repo_id STRING, -- Github仓库ID。 repo_name STRING, -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。 org STRING, -- Github组织ID。 org_login STRING -- Github组织名,如: apache,google,alibaba等。 ) WITH ( 'connector' = 'sls', -- 实时采集的Github事件存放在阿里云SLS中。 'project' = 'github-events-shenzhen', -- 存放公开数据的SLS项目。本示例以深圳为例,您需要修改为您的实际地域信息。 'endPoint' = 'https://cn-shenzhen-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以深圳为例,您需要修改为您的实际地域信息。 'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore。 'accessId' = 'LTAI5tNF1rP8PKVyYjr9TKgh', -- 只读账号的AK,无需修改。 'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP', -- 只读账号的SK,无需修改。 'batchGetSize' = '500', -- 批量读取数据,每批最多拉取500条。 'startTime' = '2023-07-24 00:00:00' -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长。默认值为当前值。 ); -- 配置开启mini-batch, 每2s处理一次。 SET 'table.exec.mini-batch.enabled'='true'; SET 'table.exec.mini-batch.allow-latency'='2s'; SET 'table.exec.mini-batch.size'='4096'; -- 作业设置4个并发,聚合更快。 SET 'parallelism.default' = '4'; -- 查看某组织(如:apache)最近7天的开发者活跃度实时趋势。 SELECT NOW() as `current_ts`, COUNT(*) as pr_count FROM gh_event WHERE org_login = 'apache' and type = 'PullRequestEvent' and created_at_ts >= NOW() - INTERVAL '7' DAY;
按照实际地域配置WITH参数中的project和endPoint(本示例以深圳地域为例),其他参数保持默认值,无需修改。
地域
参数
说明
北京
project
github-events-beijing
endPoint
https://cn-beijing-intranet.log.aliyuncs.com
杭州
project
github-events-hangzhou
endPoint
https://cn-hangzhou-intranet.log.aliyuncs.com
上海
project
github-events-shanghai
endPoint
https://cn-shanghai-intranet.log.aliyuncs.com
深圳
project
github-events-shenzhen
endPoint
https://cn-shenzhen-intranet.log.aliyuncs.com
单击调试。
在弹出的对话框中,选择调试集群为flink-sql-test-session(第一步中创建的Session集群),确认调试数据状态为线上数据后,单击确定。
查看可视化结果。
如果需要继续下一步,请单击结果区域左侧红色停止按钮。
因为这是一个流式作业,作业会持续运行,结果会持续更新。此步骤完成后如果您忘记单击停止按钮且继续了后续示例的执行,可能会出现资源不足无法显示结果的情况。这时,您需要在Session集群页面先停止Session集群,再启动Session集群。
统计过去一周里Github小时级别的评论热力图
复制如下代码到作业开发文本编辑区。
-- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。 CREATE TEMPORARY TABLE gh_event( id STRING, -- 每个事件的唯一ID。 created_at BIGINT, -- 事件时间,单位秒。 created_at_ts as TO_TIMESTAMP(created_at*1000), -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。 type STRING, -- Github事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。 actor_id STRING, -- Github用户ID。 actor_login STRING, -- Github用户名。 repo_id STRING, -- Github仓库ID。 repo_name STRING, -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。 org STRING, -- Github组织ID。 org_login STRING -- Github组织名,如: apache,google,alibaba等。 ) WITH ( 'connector' = 'sls', -- 实时采集的Github事件存放在阿里云SLS中。 'project' ='github-events-shenzhen', -- 存放公开数据的SLS项目。本示例以深圳为例,您需要修改为您的实际地域信息。 'endPoint' = 'https://cn-shenzhen-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以深圳为例,您需要修改为您的实际地域信息。 'logStore' = 'realtime-github-events', -- 存放公开数据的SLS logStore。 'accessId' = 'LTAI5tNF1rP8PKVyYjr9TKgh', -- 只读账号的AK,无需修改。 'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP', -- 只读账号的SK,无需修改。 'batchGetSize' = '500', -- 批量读取数据,每批最多拉取500条。 'startTime' = '2023-07-24 00:00:00' -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长。默认值为当前值。 ); -- 配置开启mini-batch, 每2s处理一次。 SET 'table.exec.mini-batch.enabled'='true'; SET 'table.exec.mini-batch.allow-latency'='2s'; SET 'table.exec.mini-batch.size'='4096'; -- 作业设置4个并发,聚合更快。 SET 'parallelism.default' = '4'; -- 实时统计过去一周里Github小时级别的评论热力图。 SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as comment_date, HOUR(created_at_ts) AS comment_hour ,COUNT(*) AS comment_count FROM gh_event WHERE created_at_ts >= NOW() - INTERVAL '7' DAY AND (type='CommitCommentEvent' OR type='IssueCommentEvent' or type = 'PullRequestReviewCommentEvent') GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), HOUR(created_at_ts) ;
按照实际地域配置WITH参数中的project和endPoint(本示例以深圳地域为例),其他参数保持默认值,无需修改。
地域
参数
说明
北京
project
github-events-beijing
endPoint
https://cn-beijing-intranet.log.aliyuncs.com
杭州
project
github-events-hangzhou
endPoint
https://cn-hangzhou-intranet.log.aliyuncs.com
上海
project
github-events-shanghai
endPoint
https://cn-shanghai-intranet.log.aliyuncs.com
深圳
project
github-events-shenzhen
endPoint
https://cn-shenzhen-intranet.log.aliyuncs.com
单击调试。
在弹出的对话框中,选择调试集群为flink-sql-test-session(第一步中创建的Session集群),确认调试数据状态为线上数据后,单击确定。
查看可视化结果。
完成本示例后,请单击结果区域左侧红色停止按钮。
因为这是一个流式作业,作业会持续运行,结果会持续更新。体验完成之后,请及时单击停止按钮,否则会继续消耗计算资源。
延伸阅读
产品介绍查看什么是阿里云实时计算Flink版。
产品优势查看功能与优势。
更多连接器查看支持的连接器。