加工数据

本文为您介绍如何将同步至MaxCompute的用户信息表ods_user_info_d_odps及访问日志数据ods_raw_log_d_odps,通过DataWorksMaxCompute节点加工得到目标用户画像数据,阅读本文后,您可以了解如何通过DataWorks+MaxCompute产品组合来计算和分析已同步的数据,完成数仓简单数据加工场景。

前提条件

开始本实验前,请先完成同步数据中的操作。

步骤一:搭建数据加工链路

同步数据阶段,已经成功将数据同步至MaxCompute,本阶段需要对数据进行进一步加工,以输出基本用户画像数据。

  1. Data Studio左侧导航栏单击image,进入数据开发页面,然后在项目目录区域找到已创建好的工作流,单击进入工作流编辑页。

  2. 单击编辑工作流,在工作流编辑页面,从左侧拖拽MaxCompute SQL节点至画布中,分别设置节点名称。

    本教程节点名称示例及作用如下:

    节点类型

    节点名称

    节点作用

    imageMaxCompute SQL

    dwd_log_info_di_odps

    使用内置函数、自定义函数(getregion)等完成原始日志ods_raw_log_d_odps数据拆分写入dwd_log_info_di_odps表多个字段。

    imageMaxCompute SQL

    dws_user_info_all_di_odps

    用户基本信息表ods_user_info_d_odps)和初步加工后的日志数据表dwd_log_info_di_odps)进行汇总,将数据写入dws_user_info_all_di_odps表中。

    imageMaxCompute SQL

    ads_user_info_1d_odps

    dws_user_info_all_di_odps表中数据进一步加工,将数据写入ads_user_info_1d_odps表,产出基本用户画像。

  3. 手动拖拽连线,配置各节点的上游节点。最终效果如下:

    image
    说明

    工作流中支持通过手动连线方式设置各节点的上下游依赖关系,也支持在子节点中,使用代码解析自动识别节点上下游依赖关系。本教程采用手动连线方式,代码解析的更多信息,请参见自动解析机制

  4. 在顶部工具栏单击保存

步骤二:注册自定义函数

为了后续数据处理任务的顺利进行,您需要注册MaxCompute自定义函数(getregion),将同步数据阶段同步至MaxCompute日志数据结构拆解成表格。

重要
  • 本教程已为您提供用于将IP解析为地域的函数所需资源,您仅需将其下载至本地,并在DataWorks注册函数前,将函数涉及的资源上传至DataWorks空间即可。

  • 该函数仅为本教程使用(IP资源样例),若需在正式业务中实现IP到地理位置的映射功能,需前往专业IP网站获取相关IP转换服务。

上传资源(ip2region.jar)

  1. 下载ip2region.jar

    说明

    ip2region.jar资源样例仅为教程使用。

  2. Data Studio页面左侧导航栏单击image,进入资源管理页面,单击image > 新建资源 > MaxCompute Jar,设置资源名称后,进入资源上传页面。

    说明

    资源名称无需与上传的文件名保持一致。

  3. 文件来源选择本地,单击文件内容后的点击上传,选择已下载至本地的ip2region.jar

  4. 数据源选择准备环境阶段绑定的MaxCompute计算资源。

  5. 在节点工具栏单击保存,然后单击发布,根据发布面板提示,将资源发布至开发环境和生产环境对应的MaxCompute项目中。

注册函数(getregion)

  1. 在资源管理页面,单击image > 新建函数 > MaxCompute Function,设置资源名称后,进入注册函数页面(本教程函数命名为getregion)。

  2. 注册函数页面,配置相关参数。以下为本教程所需配置的关键参数,未说明参数保持默认即可。

    参数

    描述

    函数类型

    选择OTHER

    数据源

    选择准备环境阶段绑定的MaxCompute计算资源。

    类名

    输入org.alidata.odps.udf.Ip2Region

    资源列表

    选择ip2region.jar

    描述

    IP地址转换地域。

    命令格式

    输入getregion('ip')

    参数说明

    IP地址。

  3. 在节点工具栏单击的保存,然后单击发布,根据发布面板提示,将函数发布至开发环境和生产环境对应的MaxCompute项目中。

步骤三:配置数据加工节点

数据加工需要将每层加工逻辑通过MaxCompute SQL调度实现,本教程已提供完整的数据加工SQL示例代码,您需要依次为dwd_log_info_di_odpsdws_user_info_all_di_odpsads_user_info_1d_odps节点配置。

配置dwd_log_info_di_odps节点

在本节点的示例代码中,利用创建的函数处理上游表ods_raw_log_d_odps字段的SQL代码,并将其写入dwd_log_info_di_odps表中。

  1. Data Studio左侧导航栏单击image,进入数据开发页面,然后在项目目录区域找到已创建好的工作流,单击进入工作流编辑页。

  2. 在工作流编辑页面中,鼠标悬停至dwd_log_info_di_odps节点上,单击打开节点

  3. 将如下代码粘贴至节点编辑页面。

    dwd_log_info_di_odps节点代码示例

    -- 创建dwd_log_info_di_odps表
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_odps (
     ip STRING COMMENT 'ip地址',
     uid STRING COMMENT '用户ID',
     time STRING COMMENT '时间yyyymmddhh:mi:ss',
     status STRING COMMENT '服务器返回状态码',
     bytes STRING COMMENT '返回给客户端的字节数',
     region STRING COMMENT '地域,根据ip得到',
     method STRING COMMENT 'http请求类型',
     url STRING COMMENT 'url',
     protocol STRING COMMENT 'http协议版本号',
     referer STRING COMMENT '来源url',
     device STRING COMMENT '终端类型 ',
     identity STRING COMMENT '访问类型 crawler feed user unknown'
    )
    PARTITIONED BY (
     dt STRING
    )
    LIFECYCLE 14;
    
    -- 加工数据
    -- 场景:以下SQL使用函数getregion对原始日志数据中的ip进行解析,并通过正则等方式,将原始数据拆解为可分析字段写入并写入dwd_log_info_di_odps表。
    --      本教程已为您准备好用于将IP解析为地域的函数getregion。
    -- 补充:
    --     1. 在DataWorks节点中使用函数前,您需要先将注册函数所需资源上传至DataWorks,再通过可视化方式使用该资源注册函数。
    --        本教程注册函数getregion所用的资源为ip2region.jar。
    --     2. DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
    --        在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
    INSERT OVERWRITE TABLE dwd_log_info_di_odps PARTITION (dt='${bizdate}')
    SELECT ip 
      , uid
      , time
      , status
      , bytes 
      , getregion(ip) AS region --使用自定义UDF通过IP得到地域。
      , regexp_substr(request, '(^[^ ]+ )') AS method --通过正则把request差分为3个字段。
      , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
      , regexp_substr(request, '([^ ]+$)') AS protocol 
      , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --通过正则清晰refer,得到更精准的URL。
      , CASE
        WHEN TOLOWER(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。
        WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
        WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
        WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
        WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
        WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
        ELSE 'unknown'
      END AS device
      , CASE
        WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
        WHEN TOLOWER(agent) RLIKE 'feed'
        OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
        WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
        AND agent RLIKE '^[Mozilla|Opera]'
        AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
        ELSE 'unknown'
      END AS identity
      FROM (
        SELECT SPLIT(col, '##@@')[0] AS ip
        , SPLIT(col, '##@@')[1] AS uid
        , SPLIT(col, '##@@')[2] AS time
        , SPLIT(col, '##@@')[3] AS request
        , SPLIT(col, '##@@')[4] AS status
        , SPLIT(col, '##@@')[5] AS bytes
        , SPLIT(col, '##@@')[6] AS referer
        , SPLIT(col, '##@@')[7] AS agent
      FROM ods_raw_log_d_odps  
      WHERE dt ='${bizdate}'
    ) a;
  4. 配置调试参数。

    MaxCompute SQL节点编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。

    配置项

    配置说明

    计算资源

    选择准备环境阶段绑定的MaxCompute计算资源以及其对应的计算配额。

    资源组

    选择准备环境阶段购买的Serverless资源组。

    脚本参数

    无需配置。本教程提供的示例代码中统一使用${bizdate}表示业务日期,在步骤四调试运行工作流时,设置本次运行值为具体常量(例如20250223),任务运行将会使用此常量替换任务中定义的变量。

  5. (可选)配置调度属性。

    本教程调度配置相关参数保持默认即可,您可以在MaxCompute SQL页面右侧单击调度配置。调度配置中参数的详细说明,详情可参见调度配置

    • 调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。

    • 调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。

  6. 在节点工具栏单击保存

配置dws_user_info_all_di_odps节点

本节点对用户基本信息表(ods_user_info_d_odps)和初步加工后的日志数据表(dwd_log_info_di_odps)进行汇总,将数据写入dws_user_info_all_di_odps表中。

  1. 在工作流编辑页面中,鼠标悬停至dws_user_info_all_di_odps节点上,单击打开节点

  2. 将如下代码粘贴至节点编辑页面。

    dws_user_info_all_di_odps节点代码示例

    -- 创建dws_user_info_all_di_odps表
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_odps (
     uid STRING COMMENT '用户ID',
     gender STRING COMMENT '性别',
     age_range STRING COMMENT '年龄段',
     zodiac STRING COMMENT '星座',
     region STRING COMMENT '地域,根据ip得到',
     device STRING COMMENT '终端类型 ',
     identity STRING COMMENT '访问类型 crawler feed user unknown',
     method STRING COMMENT 'http请求类型',
     url STRING COMMENT 'url',
     referer STRING COMMENT '来源url',
     time STRING COMMENT '时间yyyymmddhh:mi:ss'
    )
    PARTITIONED BY (
     dt STRING
    )
    LIFECYCLE 14;
    
    -- 加工数据
    -- 场景:将加工后的日志数据dwd_log_info_di_odps与用户基本信息数据ods_user_info_d_odps汇总写入dws_user_info_all_di_odps表。
    -- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
    --      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
    INSERT OVERWRITE TABLE dws_user_info_all_di_odps  PARTITION (dt='${bizdate}')
    SELECT COALESCE(a.uid, b.uid) AS uid
      , b.gender
      , b.age_range
      , b.zodiac
      , a.region
      , a.device
      , a.identity
      , a.method
      , a.url
      , a.referer
      , a.time
    FROM (
      SELECT *
      FROM dwd_log_info_di_odps  
      WHERE dt = '${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT *
      FROM ods_user_info_d_odps 
      WHERE dt = '${bizdate}'
    ) b
    ON a.uid = b.uid;
  3. 配置调试参数。

    MaxCompute SQL节点编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。

    配置项

    配置说明

    计算资源

    选择准备环境阶段绑定的MaxCompute计算资源以及其对应的计算配额。

    资源组

    选择准备环境阶段购买的Serverless资源组。

    脚本参数

    无需配置。本教程提供的示例代码中统一使用${bizdate}表示业务日期,在步骤四调试运行工作流时,设置本次运行值为具体常量(例如20250223),任务运行将会使用此常量替换任务中定义的变量。

  4. (可选)配置调度属性。

    本教程调度配置相关参数保持默认即可,您可以在MaxCompute SQL页面右侧单击调度配置。调度配置中参数的详细说明,详情可参见调度配置

    • 调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。

    • 调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。

  5. 在节点工具栏单击保存

配置ads_user_info_1d_odps节点

本节点对dws_user_info_all_di_odps表中数据进一步加工,将数据写入ads_user_info_1d_odps表,产出基本用户画像。

  1. 在工作流编辑页面中,鼠标悬停至ads_user_info_1d_odps节点上,单击打开节点

  2. 将如下代码粘贴至节点编辑页面。

    ads_user_info_1d_odps节点代码示例

    -- 创建ads_user_info_1d_odps表
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_odps (
     uid STRING COMMENT '用户ID',
     region STRING COMMENT '地域,根据ip得到',
     device STRING COMMENT '终端类型 ',
     pv BIGINT COMMENT 'pv',
     gender STRING COMMENT '性别',
     age_range STRING COMMENT '年龄段',
     zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
     dt STRING
    )
    LIFECYCLE 14;    
    
    -- 加工数据
    -- 场景:以下SQL用于对用户访问信息宽表dws_user_info_all_di_odps进一步加工产出基本的用户画像数据写入ads_user_info_1d_odps表。
    -- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
    --      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
    INSERT OVERWRITE TABLE ads_user_info_1d_odps  PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(region)
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_odps 
    WHERE dt = '${bizdate}'
    GROUP BY uid; 
  3. 配置调试参数。

    MaxCompute SQL节点编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。

    配置项

    配置说明

    计算资源

    选择准备环境阶段绑定的MaxCompute计算资源以及其对应的计算配额。

    资源组

    选择准备环境阶段购买的Serverless资源组。

    脚本参数

    无需配置。本教程提供的示例代码中统一使用${bizdate}表示业务日期,在步骤四调试运行工作流时,设置本次运行值为具体常量(例如20250223),任务运行将会使用此常量替换任务中定义的变量。

  4. (可选)配置调度属性。

    本教程调度配置相关参数保持默认即可,您可以在MaxCompute SQL页面右侧单击调度配置。调度配置中参数的详细说明,详情可参见调度配置

    • 调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。

    • 调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。

  5. 在节点工具栏单击保存

步骤四:加工数据

  1. 加工数据。

    在工作流工具栏中,单击运行,设置各节点定义的参数变量在本次运行中的取值(本教程使用20250223,您可以按需修改),单击确定后,等待运行完成。

  2. 查询数据加工结果。

    1. Data Studio左侧导航栏单击image,进入数据开发页面,然后在个人目录区域,单击image,创建一个后缀为.sql的文件(文件名称自定义即可)。

    2. 在页面底部如下位置确认语言模式是否为MaxCompute SQLimage

    3. SQL编辑窗口中输入如下SQL语句,查看最终结果表ads_user_info_1d_odps的记录数,确认是否产生数据加工结果。

      -- 此处您需要修改分区过滤条件为您当前操作的实际业务日期。本教程中,前文配置的调试参数bizdate(业务日期)为20250223。
      SELECT count(*) FROM ads_user_info_1d_odps WHERE dt='业务日期';
      • 上述命令查询存在数据,即表示数据加工已完成。

      • 如果没有数据,请确保运行工作流时,配置的本次运行值与此处查询时dt指定的业务日期一致,您可以单击工作流,单击右侧的运行历史,在运行记录右侧操作列单击查看,然后在工作流的运行日志中确认运行工作流时业务日期的取值(partition=[pt=xxx])。

步骤五:发布工作流

任务需要发布至生产环境后才可自动调度运行,您可以参考如下步骤,将工作流发布至生产环境。

说明

本教程已在工作流调度配置中统一配置了调度参数,发布前无需再为每个节点单独配置调度参数。

  1. Data Studio左侧导航栏单击image,进入数据开发页面,然后在项目目录区域找到已创建好的工作流,单击进入工作流编辑页面。

  2. 单击节点工具栏中的发布,打开发布面板。

  3. 单击开始发布生产,根据发布流程引导,完成发布。

步骤六:在生产环境运行任务

任务发布后,在次日才会生成实例运行,您可以通过补数据来对已发布流程进行补数据操作,以便查看任务在生产环境是否可以运行,详情可参见执行补数据并查看补数据实例(新版)

  1. 任务发布成功后,单击右上角的运维中心

    您也可以单击左上方的图标图标,选择全部产品 > 数据开发与运维 > 运维中心(工作流)

  2. 单击左侧导航栏中的周期任务运维 > 周期任务,进入周期任务页面,单击workshop_start_odps虚节点。

  3. 在右侧的DAG图中,右键单击workshop_start_odps节点,选择补数据 > 当前节点及下游节点

  4. 勾选需要补数据的任务,设置业务日期,单击提交并跳转

  5. 在补数据页面单击刷新,直至SQL任务全部运行成功即可。

说明

实验完成后,为了避免后续持续产生费用,您可以选择设置节点调度有效期或者冻结业务流程根节点(虚拟节点workshop_start_odps)。

后续步骤

  • 数据可视化展现:用户画像分析完成后,使用数据分析模块,将加工后的数据以图表形式直观展示,便于您快速提取关键信息,洞察数据背后的业务趋势。

  • 监控数据质量:为数据加工生成的表配置数据质量监控,提前识别脏数据并进行拦截,避免脏数据影响扩大。

  • 管理数据:用户画像分析任务流程完成后,在MaxCompute内将创建对应数据表。生成的数据表可在数据地图模块进行查看,可通过血缘查看生成表之间的关系。

  • API数据服务:获取最终加工后的数据后,使用数据服务模块,通过标准化的数据服务接口,实现数据的共享与应用,为其他使用API接收数据的业务模块提供数据。