使用内置公开数据集快速体验实时计算Flink版

本文为您介绍如何基于阿里云实时计算Flink版内置的GitHub公开事件样例数据,快速体验实时计算Flink版产品的实时数据分析功能。

注意事项

  • 如果您是实时计算Flink版新用户,可以申请该产品的免费资源包,免费试用体验实时计算Flink版产品的实时数据分析功能。但免费资源包有用量、时长等限制,领取前请务必确认已了解。申领入口请参见实时计算Flink版免费试用

  • 为方便您更好地体验实时计算Flink版产品服务,本文代码示例中提供了仅有只读权限的AK供您试用,涉及的AK仅可用于本文示例中读取样例数据,真实数据生产环境请替换为您自己的数据源AK。

使用限制

本文示例仅适用于北京、杭州、上海和深圳地域。

步骤一:创建Session集群

  1. 登录实时计算控制台

  2. 单击目标工作空间名称对应操作列下的控制台

  3. 在左侧导航栏,单击运维中心 > Session管理

  4. 单击创建Session集群

    表格中未提及的参数保持默认值即可,需要配置的参数说明请参见下表。

    配置项

    说明

    配置示例

    名称

    Session集群名称。

    flink-sql-test-session

    状态

    设置当前集群的期望运行状态:

    • STOPPED:当集群配置完成后保持停止状态,同样会停止所有在运行中的作业。

    • RUNNING:当集群配置完成后保持运行状态。

    RUNNING

    引擎版本

    Session集群引擎版本号。

    vvr-8.0.9-flink-1.17

    Task Managers数量

    默认与并行度保持一致。

    4

  5. 单击创建Session集群

    当Session集群状态(页面上方集群名称旁边)从启动中变为运行中后,您可以进入后续步骤。

步骤二:创建SQL作业

  1. 在左侧导航栏,单击数据开发 > ETL

  2. 在页面顶部,单击新建

  3. 单击空白的流作业草稿后,单击下一步

  4. 新建文件对话框,填写作业配置信息。

    作业参数

    说明

    配置示例

    文件名称

    作业的名称。

    flink-sql-test

    存储位置

    指定该作业的代码文件所属的文件夹。默认存放在作业草稿目录。

    作业开发

    引擎版本

    选择作业的引擎版本,需要和Session集群引擎版本号一致。

    vvr-8.0.9-flink-1.17

  5. 单击创建

步骤三:体验数据实时分析

统计当天各个仓库新增Star数Top 10排行榜。

  1. 复制如下代码到作业开发文本编辑区。

    -- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。
    CREATE TEMPORARY TABLE gh_event(
      id STRING,                                        -- 每个事件的唯一ID。
      created_at BIGINT,                                -- 事件时间,单位秒。
      created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
      type STRING,                                      -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
      actor_id STRING,                                  -- Github用户ID。
      actor_login STRING,                               -- Github用户名。
      repo_id STRING,                                   -- Github仓库ID。
      repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
      org STRING,                                       -- Github组织ID。
      org_login STRING                                  -- Github组织名,如: apache,google,alibaba等。
    ) WITH (
      'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
      'project' = 'github-events-shenzhen',             -- 存放公开数据的SLS项目。本示例以深圳为例,您需要修改为您的实际地域信息。
      'endPoint' = 'https://cn-shenzhen-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以深圳为例,您需要修改为您的实际地域信息。
      'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
      'accessId' =  'LTAI5tNF1rP8PKVyYjr9TKgh',         -- 只读账号的AK,无需修改。
      'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP',   -- 只读账号的SK,无需修改。
      'batchGetSize' = '500'                            -- 批量读取数据,每批最多拉取500条。
    );
    
    -- 配置开启mini-batch, 每2s处理一次。
    SET 'table.exec.mini-batch.enabled'='true'; 
    SET 'table.exec.mini-batch.allow-latency'='2s'; 
    SET 'table.exec.mini-batch.size'='4096'; 
    
    -- 作业设置4个并发,聚合更快。
    SET 'parallelism.default' = '4';
    
    -- 查看当天Github新增star数Top 10仓库。
    SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as `date`, repo_name, COUNT(*) as num
    FROM gh_event
    WHERE type = 'WatchEvent' AND DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') = DATE_FORMAT(NOW(), 'yyyy-MM-dd')
    GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), repo_name
    ORDER BY num DESC
    LIMIT 10; 
  2. 按照实际地域配置WITH参数中的project和endPoint(本示例以深圳地域为例),其他参数保持默认值,无需修改。

    地域

    参数

    说明

    北京

    project

    github-events-beijing

    endPoint

    https://cn-beijing-intranet.log.aliyuncs.com

    杭州

    project

    github-events-hangzhou

    endPoint

    https://cn-hangzhou-intranet.log.aliyuncs.com

    上海

    project

    github-events-shanghai

    endPoint

    https://cn-shanghai-intranet.log.aliyuncs.com

    深圳

    project

    github-events-shenzhen

    endPoint

    https://cn-shenzhen-intranet.log.aliyuncs.com

  3. 单击调试

  4. 在弹出的对话框中,选择调试集群为flink-sql-test-session(第一步中创建的Session集群),确认调试数据状态为线上数据后,单击确定

  5. 查看可视化结果。

    image..png

    如果需要继续下一步,请单击结果区域左侧红色停止image按钮。

    因为这是一个流式作业,作业会持续运行,结果会持续更新。此步骤完成后如果您忘记单击停止按钮且继续了后续示例的执行,可能会出现资源不足无法显示结果的情况。这时,您需要在Session集群页面先停止Session集群,再启动Session集群至运行状态。

查看某组织近7天的开发者活跃度实时趋势

  1. 复制如下代码到作业开发文本编辑区。

    -- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。
    CREATE TEMPORARY TABLE gh_event(
      id STRING,                                        -- 每个事件的唯一ID。
      created_at BIGINT,                                -- 事件时间,单位秒。
      created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
      type STRING,                                      -- Github事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
      actor_id STRING,                                  -- Github用户ID。 
      actor_login STRING,                               -- Github用户名。
      repo_id STRING,                                   -- Github仓库ID。
      repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
      org STRING,                                       -- Github组织ID。
      org_login STRING                                  -- Github组织名,如: apache,google,alibaba等。
    ) WITH (
      'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
      'project' = 'github-events-shenzhen',             -- 存放公开数据的SLS项目。本示例以深圳为例,您需要修改为您的实际地域信息。
      'endPoint' = 'https://cn-shenzhen-intranet.log.aliyuncs.com',  -- 公开数据仅限VVP通过私网地址访问。本示例以深圳为例,您需要修改为您的实际地域信息。
      'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
      'accessId' =  'LTAI5tNF1rP8PKVyYjr9TKgh',         -- 只读账号的AK,无需修改。
      'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP',   -- 只读账号的SK,无需修改。
      'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
      'startTime' = '2023-07-24 00:00:00'               -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长。默认值为当前值。
    );
    
    -- 配置开启mini-batch, 每2s处理一次。
    SET 'table.exec.mini-batch.enabled'='true'; 
    SET 'table.exec.mini-batch.allow-latency'='2s'; 
    SET 'table.exec.mini-batch.size'='4096'; 
    
    -- 作业设置4个并发,聚合更快。
    SET 'parallelism.default' = '4'; 
    
    -- 查看某组织(如:apache)最近7天的开发者活跃度实时趋势。
    SELECT NOW() as `current_ts`, COUNT(*) as pr_count
    FROM gh_event
    WHERE org_login = 'apache' and  type = 'PullRequestEvent' and
    created_at_ts >= NOW() - INTERVAL '7' DAY;
  2. 按照实际地域配置WITH参数中的project和endPoint(本示例以深圳地域为例),其他参数保持默认值,无需修改。

    地域

    参数

    说明

    北京

    project

    github-events-beijing

    endPoint

    https://cn-beijing-intranet.log.aliyuncs.com

    杭州

    project

    github-events-hangzhou

    endPoint

    https://cn-hangzhou-intranet.log.aliyuncs.com

    上海

    project

    github-events-shanghai

    endPoint

    https://cn-shanghai-intranet.log.aliyuncs.com

    深圳

    project

    github-events-shenzhen

    endPoint

    https://cn-shenzhen-intranet.log.aliyuncs.com

  3. 单击调试

  4. 在弹出的对话框中,选择调试集群为flink-sql-test-session(第一步中创建的Session集群),确认调试数据状态为线上数据后,单击确定

  5. 查看可视化结果。

    image..png

    如果需要继续下一步,请单击结果区域左侧红色停止image按钮。

    因为这是一个流式作业,作业会持续运行,结果会持续更新。此步骤完成后如果您忘记单击停止按钮且继续了后续示例的执行,可能会出现资源不足无法显示结果的情况。这时,您需要在Session集群页面先停止Session集群,再启动Session集群。

统计过去一周里Github小时级别的评论热力图

  1. 复制如下代码到作业开发文本编辑区。

    -- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。
    CREATE TEMPORARY TABLE gh_event(
      id STRING,                                        -- 每个事件的唯一ID。
      created_at BIGINT,                                -- 事件时间,单位秒。
      created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
      type STRING,                                      -- Github事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
      actor_id STRING,                                  -- Github用户ID。
      actor_login STRING,                               -- Github用户名。
      repo_id STRING,                                   -- Github仓库ID。
      repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
      org STRING,                                       -- Github组织ID。
      org_login STRING                                  -- Github组织名,如: apache,google,alibaba等。
    ) WITH (
      'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
      'project' ='github-events-shenzhen',              -- 存放公开数据的SLS项目。本示例以深圳为例,您需要修改为您的实际地域信息。
      'endPoint' = 'https://cn-shenzhen-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以深圳为例,您需要修改为您的实际地域信息。
      'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
      'accessId' =  'LTAI5tNF1rP8PKVyYjr9TKgh',         -- 只读账号的AK,无需修改。
      'accessKey' = 'FDgsh6fjSmkbFsx083tN6HOiqNVWTP',   -- 只读账号的SK,无需修改。
      'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
      'startTime' = '2023-07-24 00:00:00'               -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长。默认值为当前值。
    );
    
    -- 配置开启mini-batch, 每2s处理一次。
    SET 'table.exec.mini-batch.enabled'='true'; 
    SET 'table.exec.mini-batch.allow-latency'='2s'; 
    SET 'table.exec.mini-batch.size'='4096'; 
    
    -- 作业设置4个并发,聚合更快。
    SET 'parallelism.default' = '4'; 
    
    -- 实时统计过去一周里Github小时级别的评论热力图。
    SELECT  DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as comment_date, HOUR(created_at_ts) AS comment_hour ,COUNT(*) AS comment_count
    FROM gh_event
    WHERE created_at_ts >= NOW() - INTERVAL '7' DAY AND (type='CommitCommentEvent'
        OR type='IssueCommentEvent' or type = 'PullRequestReviewCommentEvent')
    GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), HOUR(created_at_ts) ;
  2. 按照实际地域配置WITH参数中的project和endPoint(本示例以深圳地域为例),其他参数保持默认值,无需修改。

    地域

    参数

    说明

    北京

    project

    github-events-beijing

    endPoint

    https://cn-beijing-intranet.log.aliyuncs.com

    杭州

    project

    github-events-hangzhou

    endPoint

    https://cn-hangzhou-intranet.log.aliyuncs.com

    上海

    project

    github-events-shanghai

    endPoint

    https://cn-shanghai-intranet.log.aliyuncs.com

    深圳

    project

    github-events-shenzhen

    endPoint

    https://cn-shenzhen-intranet.log.aliyuncs.com

  3. 单击调试

  4. 在弹出的对话框中,选择调试集群为flink-sql-test-session(第一步中创建的Session集群),确认调试数据状态为线上数据后,单击确定

  5. 查看可视化结果。

    image..png

  6. 完成本示例后,请单击结果区域左侧红色停止image按钮。

    因为这是一个流式作业,作业会持续运行,结果会持续更新。体验完成之后,请及时单击停止按钮,否则会继续消耗计算资源。

延伸阅读