MaxCompute湖上数据加工和多场景联动实践

场景介绍

通过MaxLake实现数据入湖入仓及多场景分析联动,以车联网数据为例,通过车辆上报的GPS定位信息分析行驶里程和速度,并联动多引擎满足即时查询报表、跨团队协作与脱敏分享、AI训练等应用场景,实现一份数据,多重价值。整体流程如下图所示:

image

自动发现原始数据,建立ODS

ODS层存储在OSS中,本示例中数据即车联网原始数据表,存储原始车联网定位信息,按小时分区。

  1. MaxCompute中创建数据湖连接(CONNECTION)管理外部存储访问凭证。

  2. 创建数据发现(DataScan)任务,通过CONNECTION访问OSS,自动将parquetorc等数据文件有组织地注册成外部表。

增量数据加工,输出DWD、ADS

ODS层基础上,过滤无效数据,统一字段格式(时间格式化、经纬度校验等)。

多引擎计算,满足细分应用场景

  • OLAP+AI:使用StarRocks引擎做即时查询和报表,如查询车辆的总里程和平均时速。

  • 跨团队协作与数据安全分享:对第三方使用Spark引擎做数据分析的团队开启数据动态脱敏。

  • AI训练:处理过的数据可进一步应用于模型训练。

    对车辆总行驶里程表去标识化,对车辆唯一编号进行脱敏,只保留首尾字符,其他字符用“*”代替,使用脱敏后的数据统计每日最高时速

操作步骤

  1. 上传测试数据

    1. 登录对象存储OSS控制台

    2. 在左侧导航栏单击Bucket 列表

      Bucket 列表页面,单击创建 Bucket

      本示例中Bucket名称为vehicle-raw

    3. Bucket 列表页面,单击目标Bucket 名称,进入文件列表页面。

      单击上传文件,上传解压后的测试数据Maxlake_example_parquet.zip

  2. 授权

    1. 如果以RAM用户身份创建和管理CONNECTION,请先获得租户级Connection_Admin角色。授权方式参考:租户级别角色授权

    2. 阿里云账号或者具备租户级Super_AdministratorAdmin角色的用户可执行Connection_Admin角色授权。

      1. 登录MaxCompute控制台,在左上角选择地域。

      2. 在左侧导航栏,选择管理配置 > 租户管理 。

      3. 租户管理页面,单击角色管理页签。

      4. 角色管理页签,选择Connection_AdminDatascan_Admin,单击对应的操作列的新增授权

      5. 在弹出的新增授权对话框,添加需要授权的用户,单击确定完成授权。

  3. MaxCompute中创建数据湖连接(CONNECTION)

    1. 登录MaxCompute控制台,在左上角选择地域。

    2. 在左侧导航栏,选择MaxLake > 数据湖连接

    3. 数据湖连接(CONNECTION)页面,单击创建数据湖连接

    4. 在弹出的创建数据湖连接对话框,填写如下参数,然后单击确定完成创建数据湖连接。

      参数名称

      说明

      数据湖连接名称

      数据湖连接名称,在租户内命名唯一。

      RAMRoleARN

      选择RAM Role中具有访问OSS权限的RAMRoleARN信息。

      可以创建和填写自定义角色的RAMRoleARN信息,创建详情请参见STS模式授权

      数据湖连接描述

      数据湖连接描述。

  4. 创建数据发现(DataScan)任务

    1. 登录MaxCompute控制台,在左上角选择地域。

    2. 在左侧导航栏,选择MaxLake > 数据发现

    3. 数据发现页面,单击创建数据发现任务

    4. 在弹出的创建任务对话框,填写如下参数,然后单击创建

      • 基本配置

        参数名称

        说明

        任务名称

        任务名称,在租户内命名唯一。

        任务周期

        5分钟

      • 湖数据配置

        参数名称

        说明

        数据湖连接

        选择数据湖连接(CONNECTION)作为外部存储访问凭证。

        LOCATION

        填写数据文件所在OSS路径。

        • 格式oss://<Bucket名称>/<OSS 路径>/

          OSS Bucket必须和MaxCompute数据发现任务归属于同一阿里云主账号的相同Region下。

        • 示例oss://vehicle-raw/Maxlake_example_parquet

      • Catalog配置

        参数名称

        说明

        项目

        选择开启Schema级语法开关的项目。

        Schema

        选择Schema。

        建议选择和将要发现的外部表表名不冲突的Schema ,如果新发现的外部表和Schema中用户创建的表同名,发现任务将不会继续创建同名的外部表。

    5. 数据发现每5分钟运行一次,查看数据发现结果有3张车联网表,并会不断追加新分区。

      说明
      • 如果解析失败,排查文件中是否有.DS_Store文件,不符合数据要求,会导致扫描失败,需要手动删除此类脏数据。

      • 下一个发布的服务版本会支持自动忽略.DS_Store以及用户设置忽略文件。

  5. SQL数据查询脚本

    1. ODS: 原始上报数据表

      --业务场景设计
      --场景:车联网平台获取车辆的实时位置数据(GPS),并统计每小时每辆车的行驶里程和平均速度
      
      --数据源:车载终端每隔几秒上报一次位置、速度等
      --ODS:直接落原始上报数据
      --DWD:清洗、解析后转换成“每次上报记录”
      --ADS:统计每小时每辆车总行驶里程、平均速度
      
      -- ODS: 原始上报数据表
      --存储原始车联网定位数据,按小时分区。
      
      SHOW PARTITIONS ods_vehicle_gps_raw;
      SET odps.sql.allow.fullscan=true;
      
      SELECT  * FROM ods_vehicle_gps_raw WHERE dt='2025-09-17' AND hh='23';
      
      -- 部分返回结果示例
      +------------+------------+-------------+------------+------------+------------+-------------+------------+------------+
      | vin        | device_id  | report_time | lng        | lat        | speed      | raw_payload | dt         | hh         | 
      +------------+------------+-------------+------------+------------+------------+-------------+------------+------------+
      | VIN001     | DEV001     | 2025-09-16 00:00:00 | 120.00023573730152 | 30.39975989605289 | 73.3843581906447 | mock_payload | 2025-09-17 | 23         | 
      | VIN002     | DEV002     | 2025-09-16 00:00:00 | 120.00517998985256 | 30.33811818824062 | 67.43035716350673 | mock_payload | 2025-09-17 | 23         | 
      | VIN003     | DEV003     | 2025-09-16 00:00:00 | 120.24295999679852 | 30.143229002199707 | 40.8918776553552 | mock_payload | 2025-09-17 | 23         | 
      | VIN001     | DEV001     | 2025-09-16 00:30:00 | 120.24754980497414 | 30.373484773735274 | 49.50436236779409 | mock_payload | 2025-09-17 | 23         | 
      | VIN002     | DEV002     | 2025-09-16 00:30:00 | 120.00510501582413 | 30.42486370328109 | 55.8400627485663 | mock_payload | 2025-09-17 | 23         | 
      | VIN003     | DEV003     | 2025-09-16 00:30:00 | 120.36073125682805 | 30.065016013833237 | 61.82996654036919 | mock_payload | 2025-09-17 | 23         | 
      | VIN002     | DEV002     | 2025-09-16 19:30:00 | 120.29488938268968 | 30.12474152125639 | 66.48209032904454 | mock_payload | 2025-09-17 | 23         | 
      | VIN002     | DEV002     | 2025-09-16 19:00:00 | 120.35157954057287 | 30.459823299646295 | 76.36574370617315 | mock_payload | 2025-09-17 | 23         | 
      | VIN001     | DEV001     | 2025-09-16 19:30:00 | 120.3113710027241 | 30.33402715522518 | 62.601762741153024 | mock_payload | 2025-09-17 | 23         | 
      +------------+------------+-------------+------------+------------+------------+-------------+------------+------------+
    2. DWD

      -- DWD 层(dwd_vehicle_gps)
      -- 在 ODS 基础上,过滤无效数据,统一字段格式(时间格式化、经纬度校验等)
      CREATE TABLE IF NOT EXISTS dwd_vehicle_gps (
        vin          STRING COMMENT '车辆唯一识别码',
        event_time   DATETIME COMMENT '上报时间',
        lng          DOUBLE COMMENT '经度',
        lat          DOUBLE COMMENT '纬度',
        speed        DOUBLE COMMENT '速度(km/h)',
        loc_valid    BOOLEAN COMMENT '定位是否有效'
      )
      PARTITIONED BY (dt STRING, hh STRING);
      
      -- 加工 SQL
      INSERT OVERWRITE TABLE dwd_vehicle_gps PARTITION (dt='2025-09-17', hh='23')
      SELECT
      vin,
      TO_DATE(report_time,'yyyy-MM-dd HH:mi:ss') AS event_time,
      lng, lat,
      speed,
      CASE WHEN lng BETWEEN 70 AND 140 AND lat BETWEEN 10 AND 60 THEN TRUE ELSE FALSE END AS loc_valid
      FROM ods_vehicle_gps_raw
      WHERE dt='2025-09-17' AND hh='23'
      AND speed >= 0
      AND vin IS NOT NULL;
      
      SELECT  * FROM dwd_vehicle_gps WHERE dt='2025-09-17' AND hh='23';
      
      -- 部分返回结果示例
      +------------+------------+------------+------------+------------+-----------+------------+------------+
      | vin        | event_time | lng        | lat        | speed      | loc_valid | dt         | hh         | 
      +------------+------------+------------+------------+------------+-----------+------------+------------+
      | VIN001     | 2025-09-16 00:00:00 | 120.00023573730152 | 30.39975989605289 | 73.3843581906447 | true      | 2025-09-17 | 23         | 
      | VIN002     | 2025-09-16 00:00:00 | 120.00517998985256 | 30.33811818824062 | 67.43035716350673 | true      | 2025-09-17 | 23         | 
      | VIN003     | 2025-09-16 00:00:00 | 120.24295999679852 | 30.143229002199707 | 40.8918776553552 | true      | 2025-09-17 | 23         | 
      | VIN001     | 2025-09-16 00:30:00 | 120.24754980497414 | 30.373484773735274 | 49.50436236779409 | true      | 2025-09-17 | 23         | 
      | VIN003     | 2025-09-16 00:30:00 | 120.36073125682805 | 30.065016013833237 | 61.82996654036919 | true      | 2025-09-17 | 23         | 
      | VIN001     | 2025-09-16 05:00:00 | 120.13891993725622 | 30.39267490566367 | 53.99676876794396 | true      | 2025-09-17 | 23         | 
      | VIN003     | 2025-09-16 05:30:00 | 120.04798104849084 | 30.012209889484666 | 65.01092831837522 | true      | 2025-09-17 | 23         | 
      | VIN002     | 2025-09-16 20:00:00 | 120.42721760246307 | 30.051330581564144 | 79.73892066615583 | true      | 2025-09-17 | 23         | 
      | VIN003     | 2025-09-16 20:00:00 | 120.47715870033818 | 30.302941456112517 | 58.61057150112957 | true      | 2025-09-17 | 23         | 
      | VIN001     | 2025-09-16 20:30:00 | 120.3067564206695 | 30.179763514166588 | 47.77533756931095 | true      | 2025-09-17 | 23         | 
      +------------+------------+-------------+------------+------------+------------+-------------+------------+------------+
    3. ADS 层

      -- ADS 层(ads_vehicle_hourly_stat)
      --按小时统计每辆车总行驶里程(简单用位置差计算)、平均速度
      CREATE TABLE IF NOT EXISTS ads_vehicle_hourly_stat (
        vin           STRING COMMENT '车辆唯一识别码',
        stat_hour     STRING COMMENT '统计小时(yyyy-MM-dd HH)',
        total_distance DOUBLE COMMENT '总行驶里程(公里)',
        avg_speed     DOUBLE COMMENT '平均速度(km/h)'
      )
      PARTITIONED BY (dt STRING, hh STRING);
      
      --统计加工 SQL
      --注意:这里的“距离”计算我用简化版公式(实际可用 haversine),以演示为主
      -- 简单按经纬度差近似求距离(展示用)
      SET odps.sql.type.system.odps2=true;
      
      WITH ordered AS (
        SELECT
        vin, event_time, lng, lat, speed,
        ROW_NUMBER() OVER (PARTITION BY vin ORDER BY event_time) AS rn
        FROM dwd_vehicle_gps
        WHERE dt='2025-09-17' AND hh='23' AND loc_valid = TRUE
      ),
      with_prev AS (
        SELECT
        a.vin, a.event_time, a.speed,
        -- 欧氏距离简单近似,这里1度经纬度差约111公里,不精准但可演示
        ABS(a.lng - b.lng)*111 AS dx,
        ABS(a.lat - b.lat)*111 AS dy
        FROM ordered a
        LEFT JOIN ordered b
        ON a.vin = b.vin AND a.rn = b.rn + 1
      )
      INSERT OVERWRITE TABLE ads_vehicle_hourly_stat PARTITION (dt='2025-09-17', hh='23')
        SELECT
        vin,
        '2025-09-17 23' AS stat_hour,
        SUM( SQRT( COALESCE(dx,0)*COALESCE(dx,0) + COALESCE(dy,0)*COALESCE(dy,0) ) ) AS total_distance,
        AVG(speed) AS avg_speed
        FROM with_prev
        GROUP BY vin;
      
      -- 查询VIN001车辆的行驶里程和平均时速
      SET odps.sql.allow.fullscan=true;
      SELECT * FROM ads_vehicle_hourly_stat WHERE vin='VIN001'
        ORDER BY stat_hour DESC;
        
      -- 返回结果。
      +------------+---------------+--------------------+-------------------+------------+------------+
      | vin        | stat_hour     | total_distance     | avg_speed         | dt         | hh         | 
      +------------+---------------+--------------------+-------------------+------------+------------+
      | VIN001     | 2025-09-17 23 | 1510.7384548492398 | 59.33624859907179 | 2025-09-17 | 23         | 
      +------------+---------------+--------------------+-------------------+------------+------------+
      
      SET odps.sql.allow.fullscan=true;
      SELECT
      vin AS 车辆编码,
      stat_hour AS 统计时间,
      CONCAT(CAST(ROUND(total_distance, 2) AS STRING), ' km') AS 行驶里程,
      CONCAT(CAST(ROUND(avg_speed, 2) AS STRING), ' km/h') AS 平均时速
      FROM ads_vehicle_hourly_stat WHERE vin='VIN001'
        ORDER BY stat_hour DESC;
        
      -- 返回结果。
      +------------+---------------+------------+------------+
      | 车辆编码    | 统计时间        | 行驶里程    | 平均时速     | 
      +------------+---------------+------------+------------+
      | VIN001     | 2025-09-17 23 | 1510.74 km | 59.34 km/h | 
      +------------+---------------+------------+------------+
    4. 跨团队协作与数据安全分享,对第三方使用Spark引擎做数据分析的团队开启数据动态脱敏。

      -- 跨团队协作与数据安全分享,对第三方使用Spark引擎做数据分析的团队开启数据动态脱敏
      CREATE role thirdparty;
      GRANT CreateInstance, List ON project <project_name> TO ROLE thirdparty;
      GRANT SELECT ON TABLE ods_vehicle_gps_raw TO ROLE thirdparty;
      
      ADD USER RAM$<your aliyun account>;
      GRANT thirdparty TO RAM$<your aliyun account>;
    5. 对车辆总行驶里程表去标识化,对车辆唯一编号脱敏,只保留首尾字符,其他字符用“*”代替

      -- 项目开启数据脱敏功能。
      setproject odps.data.masking.policy.enable=true;
      
      -- 对车辆总行驶里程表去标识化,对车辆唯一编号脱敏,只保留首尾字符,其他字符用“*”代替。
      CREATE data masking policy IF NOT EXISTS masking_vin
      TO role (thirdparty)
      USING MASKED_STRING_UNMASKED_BA(1, 1);
      
      apply data masking policy masking_vin bind TO
      TABLE ads_vehicle_hourly_stat COLUMN vin;

演示视频