GitHub近实时数据同步以及增全量数据一体化分析

本文介绍如何通过MaxCompute实现GitHub近实时数据同步以及增全量数据一体化分析。

方案概述

基于GitHub Archive公开数据集,通过DataWorks数据集成、FlinkCDC和Flink等多种实时数据写入方式,将GitHub中的项目、行为等超过十种事件类型的数据实时采集至MaxCompute进行增全量更新。借助MCQA 2.0的资源隔离能力,同时构建批处理资源组(QuotaA组)与交互式资源组(QuotaB组),从而实现MaxCompute中增全量数据的写入与更新,并同时进行交互式查询分析。此外,通过使用TopConsole和DataWorks Notebook,从开发者、项目和编程语言等多个维度,对GitHub实时数据的变化情况进行深入分析与挖掘。

方案架构与优势

image.png

以上图示基于典型数据分析场景设计,可以满足当日近实时数据的写入以及历史离线数据的更新与查询分析场景。

  • 实现增全量数据的统一校正(包括数据聚合、去重和反作弊等),定期将全量数据回写至DWD,并对应地更新DWS和ADS的增量MV。通过FlinkCDC和DataWorks实时数据集成,将数据写入Delta Table的增量数据表,实现增量查询与更新。

  • 实现交互式数据查询(DWS/ADS层),支持增量物化视图(MV)在DWS和ADS中自动刷新,以确保数据的时效性。同时,与TopConsole和DataWorks对接,以便进行数据查询和展示。

  • 借助MCQA 2.0查询加速引擎,在资源隔离架构下配置不同的Quota组,以分别支持增量数据计算和交互式查询分析场景。

近实时数仓-Delta Table增量表格式

针对分钟级或者小时级的近实时数据处理叠加海量数据批处理的场景,MaxCompute基于Delta Table的统一表格式特性,提供近实时的增全量一体的数据存储和计算解决方案,支持分钟级数据实时Upsert写入和TimeTravel数据回溯等能力。其核心特性包括:

  • 支持近实时写入,并且能够实现Checkpoint间隔达到分钟级别以内。

  • 支持SQL近实时查询(Incremental Query),且在完成写入后,可在分钟级别内进行查询。

  • 通过StorageService、AutoCompaction和AutoSorting功能,实现对数据文件的自动管理。

近实时数仓-增量计算&增量物化视图(Incremental MV)

MaxCompute的增量计算结合了CDC和Stream增量查询能力,使用户可以通过自定义SQL来构建自己的增量数据处理链路。增量物化视图(MV)可有效地构建增量计算模型,用户只需使用声明式SQL表达预期的数据结果,便可通过配置不同的刷新参数来指定刷新频率或数据新鲜度,后台引擎将自动进行增量刷新和内部优化,从而实现近实时数据分析Pipeline。其主要核心特性包括:

  • 声明式SQL。

  • 增量与全量数据一体化,支持统一的SQL、存储和计算。

  • 增量物化视图(MV)支持智能Pipeline编排 。

  • 增量CDC应用,支持周期性任务及流处理特性。

  • 数据新鲜度,提供实时或自定义的增量数据刷新。公式为:MV(T1) = delta(T0, T1) + MV(T0)

近实时数仓-MCQA2.0查询加速

MaxCompute的MCQA2.0查询加速引擎旨在满足对性能、隔离性以及稳定性有更高要求的业务需求。它构建类似Virtual Warehouse的资源隔离管理引擎,从而显著提升了交互式查询的性能。支持租户级独占计算资源,使用多线程Pipeline执行,以充分利用精准独享的计算资源管理。此外,还支持全链路的Cache能力,全类型的SQL作业、屏显作业以及DDL、DML作业等特性。

  • 支持单租户环境下构建多Quota组进行资源隔离,并支持对Quota组进行交互式查询。

  • 支持分时资源的分组管理。

  • 交互式查询性能加速,与上一版本相比,性能提升1倍。

操作视频

操作步骤

步骤一:MaxCompute项目准备

步骤

操作

预期结果

Step 0

注册MaxCompute新功能邀测申请表单

  1. 打开MaxCompute新功能邀测申请表单

  2. 选中Delta Table增量表格式基于增量物化视图的增量计算MCQA2.0 SQL引擎查询加速image

  3. 单击注册

开通MaxCompute新功能。

Step 1

初始化MaxCompute新项目

  1. 打开MaxCompute控制台,单击新建项目

  2. 新建项目对话框中配置如下参数。

    项目名称设置为delta_compute_yunqi项目类型选择云栖新功能邀测项目,其他参数按需设置或保持默认。image (2)

    地域可选择北京、上海或杭州。

  3. 并单击确定

  1. 在控制台左侧导航栏中,选择工作区 > SQL分析,打开临时SQL分析页面。

  2. SQL分析页面左侧,可以看到新建项目delta_compute_yunqi2024-09-18_11-31-45.png

Step 2

配额(Quota)管理上新购Quota

  1. 在MaxCompute控制台左侧导航栏中,选择工作区 > 配额(Quota)管理,单击新购Quota

  2. 创建一个包年包月的一级Quota,并根据页面提示完成支付。

  3. 在已创建的一级Quota组下,创建2个不同二级Quota组:

    • 批处理Quota组:batch_demo_yunqi

    • 交互式Quota组:mcqa2_demo_yunqi

    截屏2024-09-13 17

  4. 分别设置批处理Quota组和交互式Quota组资源。image (5)

在配额(Quota)管理页面,确认批处理Quota组和交互式Quota组已成功创建。截屏2024-09-03 21.50.19.png

Step 3

新建MaxCompute Delta Table增量表

SQL分析页面,执行如下示例命令,创建2个MaxCompute内部表:

  • 创建yunqi_github_events_odps_cdc表,后续会将数据实时通过FlinkCDC写入至该表中。

    --设置ODPS SQL支持upsertable
    SET odps.sql.upsertable.table.enable=true;
    SET odps.storage.orc.enable.memcmp.sort.key=true; 
    
    --FlinkCDC
    CREATE TABLE IF NOT EXISTS yunqi_github_events_odps_cdc
    (
        id                     BIGINT NOT NULL COMMENT '事件ID'
        ,actor_id              BIGINT COMMENT '事件发起人ID'
        ,actor_login           STRING COMMENT '事件发起人登录名'
        ,repo_id               BIGINT COMMENT 'repoID'
        ,repo_name             STRING COMMENT 'repo全名:owner/Repository_name'
        ,org_id                BIGINT COMMENT 'repo所属组织ID'
        ,org_login             STRING COMMENT 'repo所属组织名称'
        ,`type`                STRING COMMENT '事件类型'
        ,created_at            TIMESTAMP NOT NULL COMMENT '事件发生时间'
        ,action                STRING COMMENT '事件行为'
        ,commit_id             STRING COMMENT '提交记录ID'
        ,member_id             BIGINT COMMENT '成员ID'
        ,language              STRING COMMENT '编程语言'
        ,PRIMARY KEY(id, created_at))
    STORED AS ALIORC  
    TBLPROPERTIES ('acid.data.retain.hours'='24',
         'acid.incremental.query.out.of.time.range.enabled'='true',
         'columnar.nested.type'='true',
         'transactional'='true',
         'write.bucket.num'='64',
         'acid.ingest.commit.num.check.limit'='10',
         'cdc.insert.into.passthrough.enable'='true', 
         'acid.cdc.mode.enable' = 'true',--打开异步的话,就要额外设置
         'acid.cdc.build.async'='true', 
         'acid.cdc.build.interval'='60') 
    LIFECYCLE 36500;
    
  • 创建yunqi_github_events_odps_dw表,后续会将数据实时通过DataWorks数据集成写入至该表中。

    --设置ODPS SQL支持upsertable
    SET odps.sql.upsertable.table.enable=true;
    SET odps.storage.orc.enable.memcmp.sort.key=true; 
    
    --DataWorks实时数据同步
    CREATE TABLE IF NOT EXISTS yunqi_github_events_odps_dw
    (
        id                     BIGINT NOT NULL COMMENT '事件ID'
        ,actor_id              BIGINT COMMENT '事件发起人ID'
        ,actor_login           STRING COMMENT '事件发起人登录名'
        ,repo_id               BIGINT COMMENT 'repoID'
        ,repo_name             STRING COMMENT 'repo全名:owner/Repository_name'
        ,org_id                BIGINT COMMENT 'repo所属组织ID'
        ,org_login             STRING COMMENT 'repo所属组织名称'
        ,`type`                STRING COMMENT '事件类型'
        ,created_at            TIMESTAMP COMMENT '事件发生时间'
        ,action                STRING COMMENT '事件行为'
        ,commit_id             STRING COMMENT '提交记录ID'
        ,member_id             BIGINT COMMENT '成员ID'
        ,language              STRING COMMENT '编程语言'
        ,PRIMARY KEY(id))
    STORED AS ALIORC  
    TBLPROPERTIES ('acid.data.retain.hours'='24',
         'acid.incremental.query.out.of.time.range.enabled'='true',
         'columnar.nested.type'='true',
         'transactional'='true',
         'write.bucket.num'='64',
         'acid.ingest.commit.num.check.limit'='10',
         'cdc.insert.into.passthrough.enable'='true', 
         'acid.cdc.mode.enable' = 'true',--默认同步CDC
         'acid.cdc.build.async'='true', --打开异步的话,就要额外设置
         'acid.cdc.build.interval'='60') 
    LIFECYCLE 36500;

SQL分析页面,选择项目delta_compute_yunqi,确认以下两个Delta Table表已存在:

  • yunqi_github_events_odps_cdc

  • yunqi_github_events_odps_dw

步骤二:实时增全量数据写入

通过FlinkCDC或DataWorks数据集成能力获取基于GitHub Archive公开实时数据集。

您可以按需选择以下任意一种方式写入数据。

Flink CDC

步骤

操作

预期结果

Step 1

实时MySQL数据源对接与验证

MySQL数据源连接信息格式如下。

配置模式: 选择连接串模式。
JDBC连接地址: 单击新增地址,配置信息如下:
主机地址IP:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
端口号:3306

输入数据库名称后,完整的JDBC URL为
jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share
数据库名称: github_events_share
用户名: workshop
密码: workshop#2017
此密码仅为本教程示例,请勿在实际业务中使用。
说明

请确保MySQL数据源可以通过公网访问,如无法通过公网访问,可以配置公网NAT网关。

确认公开Github数据源可用。

--MySQL PublicData

id  '事件ID';
actor_id  '事件发起人ID';
actor_login  '事件发起人登录名';
repo_id  'repoID';
repo_name  'repo名称';
org_id  'repo所属组织ID';
org_login  'repo所属组织名称';
type  '事件类型';
created_at  '事件发生时间';
action  '事件行为';
commit_id  '提交记录ID';
member_id  '成员ID';
language  '编程语言';

Step 2

通过Flink

CDC写入数据

实时GitHub Event数据通过FlinkCDC实时数据写入MaxCompute Delta Table增量表,更多信息,请参见利用Flink CDC实现数据同步至Delta Table

  1. 选择数据链接器。

  2. 编写任务配置YAML文件。整库同步的示例文件rds-to-maxcompute.yaml如下。

    # Author:         openlake@test.aliyunid.com
    # Created Time:   2024-08-27 12:46:11
    # Description:    Write your description here
    
    source:
      type: mysql
      hostname: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
      port: 3306
      username: workshop
      password: workshop#2017
      tables: github_events_share.\.*
      server-id: 5400-5404
    
    sink:
       type: maxcompute
       name: MaxComputeSink
       accessId: xxxxxxxxxxxxxxxxxxxxxx
       accessKey: xxxxxxxxxxxxxxxxxxxxxxx
       endpoint: http://xxx.xxx.xxx.xxx:8008
       tunnelEndpoint: http://xxx.xxx.xxx.xxx:8009
       project: delta_compute_yunqi
       bucketSize: 8
    
    pipeline:
       parallelism: 4
  3. 执行flink-cdc命令,启动数据实时同步。

    ./bin/flink-cdc.sh rds-to-maxcompute.yaml

查看MaxCompute的yunqi_github_events_odps_cdc数据表的数据变化。

SELECT COUNT(1) FROM yunqi_github_events_odps_cdc; 

DataWorks数据集成

步骤

操作

预期结果

Step 1

实时MySQL数据源对接与验证

MySQL数据源连接信息格式如下。

配置模式: 选择连接串模式。
JDBC连接地址: 单击新增地址,配置信息如下:
主机地址IP:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
端口号:3306

输入数据库名称后,完整的JDBC URL为
jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share
数据库名称: github_events_share
用户名: workshop
密码: workshop#2017
此密码仅为本教程示例,请勿在实际业务中使用。
说明

请确保MySQL数据源可以通过公网访问,如无法通过公网访问,可以配置公网NAT网关。

确认公开Github数据源可用。

--MySQL PublicData

id  '事件ID';
actor_id  '事件发起人ID';
actor_login  '事件发起人登录名';
repo_id  'repoID';
repo_name  'repo名称';
org_id  'repo所属组织ID';
org_login  'repo所属组织名称';
type  '事件类型';
created_at  '事件发生时间';
action  '事件行为';
commit_id  '提交记录ID';
member_id  '成员ID';
language  '编程语言';

Step 2

配置DataWorks实时数据源

实时GitHub Event数据通过DataWorks数据集成实时数据写入MaxCompute Delta Table增量表(实时数据源)。

  1. 创建同步任务所需的数据源。

    登录DataWorks控制台,切换至北京或上海地域,在左侧导航栏选择数据开发与治理 > 数据集成

    2024-09-18_14-29-21.png

    您可以选择已有工作空间或新建工作空间,然后单击进入数据集成

  2. DataWorks数据集成页面,单击左侧导航栏中的数据源,单击新增数据源

  3. 新增数据源对话框中,根据界面引导分配创建MySQL和MaxCompute数据源。

    • MySQL配置页面示例如下,填入Step 1中提供的MySQL数据源信息。更多说明,请参见MySQL数据源截屏2024-09-13 20.29.12.png

    • MaxCompute配置页面示例如下,更多说明,请参见创建MaxCompute数据源1111111.png

确认来源和去向数据源联通正常。

Step 3

DataWorks实时任务同步

实时GitHub Event数据通过DataWorks数据集成实时数据写入MaxCompute Delta Table增量表。

  1. 在DataWorks数据集成页面,单击左侧导航栏中的同步任务,然后单击新建同步任务

  2. 新建同步任务页面,配置以下信息。

    • 数据来源选择MySQL。

    • 数据去向选择MaxCompute。

    • 示例任务名为sync_mysql_to_odps。

    • 同步类型为整库实时

    • 资源组选择开通DataWorks时创建的资源组,占用的CU量配置为5CU

      说明
      • 为保持公共数据源连接稳定,资源组与公共MySQL数据源创建连接后7天将进行释放,不影响资源组与您的MySQL创建的连接。

      • 使用新版资源组运行数据集成整库任务,最低要求配置2CU,详情请参见Serverless资源组计费

    • 来源数据源选择mysql_github_source。

    • 去向数据源选择delta_compute_demo。

    2222.png

  3. 单击测试连通性,保障数据源与资源组网络连通。更多网络连通介绍,请参见网络连通方案

  4. 测试通过后,单击下一步

  5. 选择要同步的库表页面,选中MySQL中的github表,移动至已选库表5eecdaf48460cde5216439d7342f912e6fbae11900bdcd4058e70b814913bc360a414d3de9277d871abf3af1cbd752494530dd9cb3fcc75c56ea1a4ad9c1ccc252b12b1c048286d7ebcf1c5741ba735290cf61cee1c45abefc653b69905bac42.png

  6. 目标表映射区域,勾选对应表,单击批量刷新映射。基于上述已创建的MaxCompute Delta表,将目标表名改为 yunqi_github_events_odps_dw,单击完成配置

    image

  7. 任务列表页面启动任务,查看执行详情。

    公共数据源MySQL中保留近7天数据,离线数据将通过全量进行同步,实时数据将在全量初始化完成后,实时写入MaxCompute。

    image

  8. 待数据同步成功后,前往MaxCompute TopConsole进行近实时数据分析。

    image

查看MaxCompute的yunqi_github_events_odps_dw数据表的数据变化。

SELECT COUNT(1) FROM yunqi_github_events_odps_dw; 

步骤三:近实时查询分析和增量计算

  • 近实时数据分析:使用交互式Quota组-mcqa2_demo_yunqi(64CU)在MaxCompute TopConsole SQL数据分析。

  • 增量MV-自动化动态表:使用批处理Quota组-batch_demo_yunqi(128CU)在MaxCompute TopConsole进行动态表增量计算。

  • 增量计算-CDC/Stream/周期性Tasks:使用批处理Quota组-batch_demo_yunqi在MaxCompute TopConsole进行自定义增量计算。

步骤

操作

预期结果

Step 1

使用交互式资源组-近实时数据分析

  1. 在MaxCompute控制台左侧导航栏中,选择工作区 > SQL分析,打开临时SQL分析页面。

  2. 选择步骤一创建的交互式Quota组,进行近实时查询分析。555.png

具体SQL分析步骤如下:

  • Q1:过去24小时最活跃项目

    --Q1:过去24小时最活跃项目
    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 5;
  • Q2:过去24小时最活跃开发者

    --Q2:过去24小时最活跃开发者
    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        actor_login
    ORDER BY
        events 
    LIMIT 5;
  • Q3:今日公开事件总数

    --Q3:今日公开事件总数
    SELECT count(*) FROM yunqi_github_events_odps_dw WHERE TO_DATE(created_at) >= date_add(now(),-1);
  • Q4:实时事件展示

    --Q4:实时事件展示
    SELECT
        cast(created_at as STRING ),
        actor_login,
        type,
        repo_name
    FROM
        yunqi_github_events_odps_dw
    ORDER BY
        created_at DESC
    LIMIT 5;

观察查询SQL结果。

Step 2

使用批处理资源组-增量计算-增量物化视图MV聚合查询

  1. 在MaxCompute控制台左侧导航栏中,选择工作区 > SQL分析,打开临时SQL分析页面。

  2. 选择步骤一创建的批处理Quota组,批量MV应用。666.png

具体SQL分析步骤如下:

  1. 确认Delta Table数据表已生成,历史和增量数据更新中。

    创建跟踪Delta Table数据表的CDC信息 (可选:默认CDC自动生成)。

    --Q1: 创建跟踪Delta Table数据表的CDC信息
    ALTER TABLE yunqi_github_events_odps_dw build cdc; --触发继续CDC后续流程
  2. 支持实时增量物化视图MV查询。基于历史和增量数据,创建轻度聚合-增量MV:按照repo_name/language/create_date聚合统计事件数量。

    --Q2: 生成增量物化视图MV1-yunqi_incre_mv1
    SET useTunnel=FALSE;
    CREATE materialized VIEW IF NOT EXISTS yunqi_incre_mv1
    refresh every 2 minutes
    tblproperties("enable_auto_refresh"="true", "refresh_mode"="incremental")
    AS SELECT
        repo_name,
        language,
        COUNT(*) AS events,
        to_date(created_at) AS create_date
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-07-26' AND language IS NOT NULL
    GROUP BY
      repo_name,language,to_date(created_at);
    --Q3: 观测源数据表状态 - 验证show history查看表行为 / CDC Build状态确认
    SHOW history FOR TABLE yunqi_github_events_odps_dw;
    SELECT * FROM table_changes('yunqi_github_events_odps_dw', 1);
  3. SQL分析:按照repo_name/language信息,查询在一定时间内的各repo的总共事件数量和语言。

    --Q4:按照repo_name / language 信息,降序查询总共事件数据与Language
    SELECT repo_name, language, SUM(events) AS events_sum, COUNT(create_date) AS day_count 
    FROM yunqi_incre_mv1 
    WHERE events > 100 
    GROUP BY repo_name,language ORDER BY events_sum DESC;
    --Q5:增量物化视图MV1历史操作变更
    SHOW history FOR TABLE yunqi_incre_mv1;
    --数据更新批次5分钟间隔,如无变更需执行
    ALTER materialized VIEW yunqi_incre_mv1 rebuild;
    SELECT * FROM table_changes('yunqi_github_events_odps_dw', 1);
  4. 构建级联增量物化视图MV2。基于已有的增量物化视图MV1,创建Aggragate重度汇聚层。

    • 生成增量物化视图MV2-yunqi_incre_mv2。在增量MV1的基础上,物化SQL分析结果到增量MV2。参考Q4 SQL。

      --Q6:生成级联增量物化视图MV2-yunqi_incre_mv2
      SET useTunnel=FALSE;
      CREATE materialized VIEW IF NOT EXISTS yunqi_incre_mv2
      refresh every 2 minutes
      tblproperties("enable_auto_refresh"="true", "refresh_mode"="incremental")
      AS 
      SELECT repo_name, language, SUM(events) AS events_sum, COUNT(create_date) AS day_count 
      FROM yunqi_incre_mv1 
      WHERE events > 100 
      GROUP BY repo_name,language;
    • SQL分析:在一定时间内按照编程language分类,按照Repo数量多少来排序查询每个语言对应的总共事件数量。

      --Q7: 编程Language热门排序
      SELECT language, COUNT(repo_name) AS repo_cnt,MAX(events_sum) AS max_events FROM yunqi_incre_mv2 
      GROUP BY language ORDER BY repo_cnt DESC;
      --Q8:观测增量MV2历史操作状态 / 表变化信息
      SHOW history FOR TABLE yunqi_incre_mv2;
      --数据更新批次5分钟间隔,如无变更需执行
      ALTER materialized VIEW yunqi_incre_mv2 rebuild;
      SELECT * FROM table_changes('yunqi_incre_mv2', 2);

观察查询SQL结果。

Step 3

使用批处理资源组-增量计算-Stream&Task应用

  1. 在MaxCompute控制台左侧导航栏中,选择工作区 > SQL分析,打开临时SQL分析页面。

  2. 选择步骤一创建的批处理Quota组,增量CDC。

具体SQL分析步骤如下:

  1. 创建stream对象,查询新增的cdc记录,进行后续的增量计算。

    --Q1: 查询最新的表数据的版本信息
    SELECT get_latest_version('yunqi_github_events_odps_dw'); 

    创建Stream对象。通过上面get_lastest_version信息,设置开始跟踪的增量数据的起始版本。选择小于等于最新版本的一个历史版本开始跟踪增量的CDC记录。

    --Q2:创建Stream对象
    CREATE stream stream_githubs ON TABLE yunqi_github_events_odps_dw version AS OF 10 strmproperties('read_mode'='cdc') ;
    
    DESC stream stream_githubs;

  2. 创建并执行Task,Task自动执行增量更新repo_name。

    • 创建聚合表yunqi_github_events_ads。

      --创建聚合表yunqi_github_events_ads
      SET odps.sql.upsertable.table.enable=true;
      CREATE TABLE IF NOT EXISTS 
      yunqi_github_events_ads
      (
          repo_name             STRING COMMENT 'repo全名:owner/Repository_name'
          ,language              STRING COMMENT '编程语言'
          ,events_cnt            BIGINT COMMENT '事件数量'
          ,create_date           DATE COMMENT '事件发生时间'
      )
      STORED AS ALIORC  
      TBLPROPERTIES (
           'transactional'='true',
           'write.bucket.num'='64') 
      LIFECYCLE 36500;
    • Task自动执行增量更新repo_name。

      --Task自动执行增量更新repo_name
      SET odps.sql.periodic.task.enabled=true;
      CREATE task yunqi_incre_task1
      schedule='45 second' 
      taskproperties('schedule_strategy'='200')
      WHEN stream_has_data('stream_githubs')
      AS INSERT overwrite yunqi_github_events_ads SELECT
          repo_name,
          language,
          COUNT(*) AS events_cnt,
          to_date(created_at) AS create_date
      FROM
          yunqi_github_events_odps_dw
      WHERE
          date_add(created_at,-1)>'2024-08-26' AND language IS NOT NULL
      GROUP BY
      repo_name,language,to_date(created_at);
      

      每隔一分钟,自动查询出来DWD的CDC增量数据,进行分析处理后写入ADS表-yunqi_github_events_ads,自定义Pipeline,写起来复杂但用户可以灵活通过SQL自定义增量计算分析处理逻辑。(时长设置:1分钟内可按照秒级设置schdule,超过1分钟按照分钟级设置schdule,最长设置为59分钟)

  3. 更新Task状态,Task自动执行增量更新repo_name。

    --更新Task状态,Task自动执行增量更新repo_name
    ALTER task  yunqi_incre_task1  suspend;  --暂停
    ALTER task  yunqi_incre_task1  resume; --恢复
  4. 查询CDC build task情况。

    --查询CDC build task情况
    SHOW history FOR task yunqi_incre_task1;
    SELECT *  FROM stream_githubs

观察查询SQL结果。

步骤四:多Quota资源配置和交互式查询加速

交互式Quota组扩容(64CU->96CU),在新规格的Quota组情况下,在MaxCompute TopConsole进行交互式查询, 观测查询加速性能提升。

步骤

操作

预期结果

Step 1

Quota扩容

  1. 在MaxCompute控制台左侧导航栏中,选择工作区 > 配额(Quota)管理,单击目标一级Quota组右侧的Quota配置2024-09-18_15-49-38.png

  2. 基础配置页面,单击编辑基础配置

  3. 修改交互式Quota组mcqa2_demo_yunqi,将其从64CU扩容至128CU。2024-09-18_15-54-23.png

目标Quota组资源扩容成功。

Step 2

查询对比分析与性能优化

  1. 在MaxCompute控制台左侧导航栏中,选择工作区 > SQL分析,打开临时SQL分析页面。

  2. 选择步骤一创建的交互式Quota组,进行近实时查询分析,观测数据变化。

具体SQL分析步骤如下:

  • Q1(可选): 执行Major Compaction操作,优化查询。

    --Q1: 执行Major Compaction操作,优化查询
    SET odps.merge.task.mode=service;
    ALTER TABLE yunqi_github_events_odps_dw compact major;

    MCQA2.0 查询对比性能-交互式资源组。

    --Q2: 近实时查询-实时事件展示
    SELECT
        cast(created_at AS STRING ),
        actor_login,
        type,
        repo_name
    FROM
        yunqi_github_events_odps_dw
    ORDER BY
        created_at DESC
    LIMIT 5;
  • 增量MV1查询:按照repo_name/language信息,查询在一定时间内的各repo的总共事件数量和语言。

    --Q3:按照repo_name / language 信息,降序查询总共事件数据与Language
    SELECT repo_name, language, SUM(events) AS events_sum, COUNT(create_date) AS day_count 
    FROM yunqi_incre_mv1 
    WHERE events > 100 
    GROUP BY repo_name,language ORDER BY events_sum DESC;
  • 增量MV2查询:在一定时间内按照编程language分类,查询每个语言对应的repo数量与总共事件数量。

    --Q4: 编程Language热门排序
    SELECT language, COUNT(repo_name) AS repo_cnt,MAX(events_sum) AS max_events FROM yunqi_incre_mv2 
    GROUP BY language;

观察查询SQL结果。

步骤五(可选):增全量一体交互式分析

使用DataWorks IDE模块-实现增全量一体的近实时数据分析。

步骤

操作

预期结果

Step 1

在DataWorks IDE 数据开发模块配置MaxCompute计算资源

  1. 访问DataWorks IDE数据开发页面,在左侧导航栏单击数据目录.png图标,打开数据目录页面。777.png

  2. 单击右上角的工作空间管理,选择计算资源,单击绑定计算资源888.png

  3. 在页面右侧配置计算资源计算配额资源组999.png

  4. 绑定MaxCompute项目,执行MaxComputeSQL任务。000.png

Step 2

在DataWorks IDE数据开发平台进行数据分析

具体SQL分析步骤如下:

  • Q1:过去24小时最活跃项目

    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 5;
  • Q2:过去24小时最活跃开发者

    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        actor_login
    ORDER BY
        events 
    LIMIT 5;
  • Q3:今日公开事件总数

    SELECT count(*) FROM yunqi_github_events_odps_dw WHERE TO_DATE(created_at) >= date_add(now(),-1);
  • Q4:实时事件展示

    SELECT
        cast(created_at AS STRING ),
        actor_login,
        type,
        repo_name
    FROM
        yunqi_github_events_odps_dw
    ORDER BY
        created_at DESC
    LIMIT 5;

观察查询SQL结果。

总结

本次示例展示了MaxCompute全新构建的基于近实时数仓产品特性的产品方案实践。MaxCompute提供增全量一体的数据处理和近实时查询能力,从数据存储层(Delta Table统一表格式)、计算层(增量计算:CDC/Task/增量MV)、加速层(MCQA2.0查询加速引擎)等三层架构来实现MaxCompute近线计算能力的全面升级。通过此次典型的Demo创建,您能够深度了解如何在MaxCompute产品上构建完备的近实时以及增全量计算的计算任务,简化数据全生命周期的计算优化工作。