加工数据

更新时间:2025-03-14 01:35:36

本文为您介绍如何使用DataWorks中的EMR Hive节点,对同步至OSS的用户信息表(ods_user_info_d_emr)及访问日志数据表(ods_raw_log_d_emr)中的数据进行加工,进而得到目标用户画像数据。

前提条件

开始本实验前,您需完成同步数据中的操作。

步骤一:搭建数据加工链路

双击打开您在同步数据中创建的工作流workshop_emr,单击工作流编辑页面上方的编辑工作流,对工作流进行重新编辑。

此处需要新建3EMR Hive节点,依次命名为dwd_log_info_di_emrdws_user_info_all_di_emrads_user_info_1d_emr,并配置如下图所示的依赖关系。

本教程节点名称示例及作用如下:

节点类型

节点名称

节点作用

节点类型

节点名称

节点作用

imageEMR Hive节点

dwd_log_info_di_emr

用于对原始OSS日志数据进行清洗,使用内置函数、自定义函数(getregion)等完成原始日志ods_raw_log_d_emr数据拆分写入dwd_log_info_di_emr表多个字段。

imageEMR Hive节点

dws_user_info_all_di_emr

用于将清洗后的日志数据和用户基本信息数据进行汇总,对用户基本信息表ods_user_info_d_emr)和初步加工后的日志数据表dwd_log_info_di_emr)进行汇总,将数据写入dws_user_info_all_di_emr表中。

imageEMR Hive节点

ads_user_info_1d_emr

dws_user_info_all_di_emr表中数据进一步加工,将数据写入ads_user_info_1d_emr表,用于生成最终用户画像数据。

image

步骤二:注册自定义函数

为了后续数据处理任务的顺利进行,您需要注册EMR自定义函数(getregion),将同步数据阶段同步至EMR日志数据结构拆解成表格。

上传EMR Jar资源(ip2region.jar)

  1. 下载资源JAR包。

    下载ip2region.jar

  2. 创建EMR Jar资源。

    1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

    2. 在左侧导航栏单击image,进入资源管理。

    3. 资源管理页面,单击新建,选择EMR Jar资源类型,自定义资源名。

    4. 进入资源编辑页配置EMR Jar资源信息,关键参数配置如下:

      参数

      说明

      参数

      说明

      文件来源

      选择本地。

      文件内容

      点击上传下载的资源JARip2region-emr.jar

      存储路径

      选择OSS

      选择准备环境中创建EMR集群配置的OSS Bucket。

      数据源

      选择您在同步数据中绑定的计算资源。

      资源组

      选择您在准备环境中创建的Serverless资源组。

    5. 单击保存,并发布资源至开发、生产环境。

注册EMR函数(getregion)

  1. 新建函数。

    右键您所创建的EMR Jar资源,选择新建函数... > EMR Function,自定义函数名为getregion,敲击回车确认新建。

  2. 注册函数。

    进入新建函数配置页配置EMR Function函数信息,关键参数配置如下:

    参数

    说明

    参数

    说明

    函数类型

    选择OTHER

    数据源

    选择您在同步数据中绑定的计算资源。

    EMR数据库

    选择default

    资源组

    选择您在准备环境中创建的Serverless资源组。

    责任人

    您可在此选择有权限的责任人。

    类名

    org.alidata.emr.udf.Ip2Region

    资源列表

    选择您创建的EMR Jar资源名称。

  3. 发布函数。

    单击保存,并发布函数至开发、生产环境。

步骤三:配置EMR节点

数据加工需要将每层加工逻辑通过EMR Hive调度实现,本教程已提供完整的数据加工SQL示例代码,您需要依次为dwd_log_info_di_emrdws_user_info_all_di_emrads_user_info_1d_emr节点配置。

配置dwd_log_info_di_emr节点

  1. 节点内容编辑。

    鼠标悬浮于EMR Hive节点dwd_log_info_di_emr上,单击上方出现的打开节点,在温馨提示弹框中选择保存并打开,进入EMR Hive节点编辑页面。输入以下示例SQL语句。

    --创建ODS层表
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
      ip STRING COMMENT 'ip地址',
      uid STRING COMMENT '用户ID',
      `time` STRING COMMENT '时间yyyymmddhh:mi:ss',
      status STRING COMMENT '服务器返回状态码',
      bytes STRING COMMENT '返回给客户端的字节数',
      region STRING COMMENT '地域,根据ip得到',
      method STRING COMMENT 'http请求类型',
      url STRING COMMENT 'url',
      protocol STRING COMMENT 'http协议版本号',
      referer STRING COMMENT '来源url',
      device STRING COMMENT '终端类型 ',
      identity STRING COMMENT '访问类型 crawler feed user unknown'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    set hive.vectorized.execution.enabled = false;
    INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
    SELECT ip
      , uid
      , tm
      , status
      , bytes 
      , getregion(ip) AS region --使用自定义UDF通过ip得到地域。 
      , regexp_extract(request, '(^[^ ]+) .*') AS method --通过正则把request差分为三个字段。
      , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
      , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
      , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  --通过正则清洗refer,得到更精准的url。
      , CASE
        WHEN lower(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。
        WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
        WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
        WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
        WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
        WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
        ELSE 'unknown'
      END AS device
      , CASE
        WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
        WHEN lower(agent) RLIKE 'feed'
        OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
        WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
        AND agent RLIKE '^[Mozilla|Opera]'
        AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
        ELSE 'unknown'
      END AS identity
      FROM (
        SELECT SPLIT(col, '##@@')[0] AS ip
        , SPLIT(col, '##@@')[1] AS uid
        , SPLIT(col, '##@@')[2] AS tm
        , SPLIT(col, '##@@')[3] AS request
        , SPLIT(col, '##@@')[4] AS status
        , SPLIT(col, '##@@')[5] AS bytes
        , SPLIT(col, '##@@')[6] AS referer
        , SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_emr
      WHERE dt = '${bizdate}'
    ) a;
  2. 配置调试参数。

    EMR Hive节点编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。

    配置项

    配置说明

    计算资源

    选择准备环境阶段绑定的EMR计算资源以及其对应的计算配额。

    资源组

    选择准备环境阶段购买的Serverless资源组。

    脚本参数

    无需配置。本教程提供的示例代码中统一使用${bizdate}表示业务日期,在步骤四调试运行工作流时,设置本次运行值为具体常量(例如20250223),任务运行将会使用此常量替换任务中定义的变量。

  3. 配置完成后,单击工具栏中的image图标保存任务节点。

配置dws_user_info_all_di_emr节点

  1. 节点内容编辑。

    鼠标悬浮于EMR Hive节点dws_user_info_all_di_emr上,单击上方出现的打开节点,在温馨提示弹框中选择保存并打开,进入EMR Hive节点编辑页面。输入以下示例SQL语句。

    --创建DW层表
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
      uid STRING COMMENT '用户ID',
      gender STRING COMMENT '性别',
      age_range STRING COMMENT '年龄段',
      zodiac STRING COMMENT '星座',
      region STRING COMMENT '地域,根据ip得到',
      device STRING COMMENT '终端类型 ',
      identity STRING COMMENT '访问类型 crawler feed user unknown',
      method STRING COMMENT 'http请求类型',
      url STRING COMMENT 'url',
      referer STRING COMMENT '来源url',
      `time` STRING COMMENT '时间yyyymmddhh:mi:ss'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
    SELECT COALESCE(a.uid, b.uid) AS uid
      , b.gender
      , b.age_range
      , b.zodiac
      , a.region
      , a.device
      , a.identity
      , a.method
      , a.url
      , a.referer
      , a.`time`
    FROM (
      SELECT *
      FROM dwd_log_info_di_emr
      WHERE dt = '${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT *
      FROM ods_user_info_d_emr
      WHERE dt = '${bizdate}'
    ) b
    ON a.uid = b.uid;
  2. 配置调试参数。

    EMR Hive节点编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。

    配置项

    配置说明

    计算资源

    选择准备环境阶段绑定的EMR计算资源以及其对应的计算配额。

    资源组

    选择准备环境阶段购买的Serverless资源组。

    脚本参数

    无需配置。本教程提供的示例代码中统一使用${bizdate}表示业务日期,在步骤四调试运行工作流时,设置本次运行值为具体常量(例如20250223),任务运行将会使用此常量替换任务中定义的变量。

  3. 配置完成后,单击工具栏中的image图标保存任务节点。

配置ads_user_info_1d_emr节点

  1. 节点内容编辑。

    鼠标悬浮于EMR Hive节点ads_user_info_1d_emr上,单击上方出现的打开节点,在温馨提示弹框中选择保存并打开,进入EMR Hive节点编辑页面。输入以下示例SQL语句。

    --创建RPT层表
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
      uid STRING COMMENT '用户ID',
      region STRING COMMENT '地域,根据ip得到',
      device STRING COMMENT '终端类型 ',
      pv BIGINT COMMENT 'pv',
      gender STRING COMMENT '性别',
      age_range STRING COMMENT '年龄段',
      zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(region)
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_emr
    WHERE dt = '${bizdate}'
    GROUP BY uid;
  2. 配置调试参数。

    EMR Hive节点编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。

    配置项

    配置说明

    计算资源

    选择准备环境阶段绑定的EMR计算资源以及其对应的计算配额。

    资源组

    选择准备环境阶段购买的Serverless资源组。

    脚本参数

    无需配置。本教程提供的示例代码中统一使用${bizdate}表示业务日期,在步骤四调试运行工作流时,设置本次运行值为具体常量(例如20250223),任务运行将会使用此常量替换任务中定义的变量。

  3. 配置完成后,单击工具栏中的image图标保存任务节点。

步骤四:加工数据

  1. 加工数据。

    Workflow画布顶部工具栏中,单击运行,设置各节点定义的参数变量在本次运行中的取值(本教程使用20250223,您可以按需修改),单击确定后,等待运行完成。

  2. 查询数据加工结果。

    1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

    2. Data Studio一级功能入口单击image,进入数据开发,在二级功能入口找到项目目录区域。

    3. 右键单击您所创建的work目录,选择新建节点... > EMR > EMR Hive,自定义EMR Hive节点名,单击回车完成节点的创建。

    4. 进入EMR Hive节点编辑页面,将下面脚本中的业务日期替换为当前业务日期,确认同步数据写入结果。查看导入ods_raw_log_d_emrods_user_info_d_emr的记录数。

      说明

      查询语句中的分区列需要更新为业务日期。例如,任务运行的日期为20250223,则业务日期为20250222,即任务运行日期的前一天。

      SELECT * FROM ads_user_info_1d_emr WHERE dt=业务日期; 
      • 上述命令查询存在数据,即表示数据加工已完成。

      • 如果没有数据,请确保运行工作流时,配置的本次运行值与此处查询时dt指定的业务日期一致,您可以单击工作流,单击右侧的运行历史,在运行记录右侧操作列单击查看,然后在工作流的运行日志中确认运行工作流时业务日期的取值(partition=[pt=xxx])。

步骤五:发布工作流

任务需要发布至生产环境后才可自动调度运行,您可以参考如下步骤,将工作流发布至生产环境。

说明

本教程已在工作流调度配置中统一配置了调度参数,发布前无需再为每个节点单独配置调度参数。

  1. Data Studio左侧导航栏单击image,然后在项目目录区域找到已创建好的工作流,单击进入工作流看板。

  2. 单击页面顶部工具栏中的发布,打开发布面板。

  3. 单击开始发布生产,根据发布流程引导,完成发布。

步骤六:在生产环境运行任务

任务发布后,在次日才会生成实例运行,您可以通过补数据来对已发布流程进行补数据操作,以便查看任务在生产环境是否可以运行,详情可参见执行补数据并查看补数据实例(新版)

  1. 任务发布成功后,单击右上角的运维中心

    您也可以单击左上方的图标图标,选择全部产品 > 数据开发与运维 > 运维中心(工作流)

  2. 单击左侧导航栏中的周期任务运维 > 周期任务,进入周期任务页面,单击workshop_start_emr虚节点。

  3. 在右侧的DAG图中,右键单击workshop_start_emr节点,选择补数据 > 当前节点及下游节点

  4. 勾选需要补数据的任务,设置业务日期,单击提交并跳转

  5. 在补数据页面单击刷新,直至任务全部运行成功即可。

说明

实验完成后,为了避免后续持续产生费用,您可以选择设置节点调度有效期或者冻结业务流程根节点(虚拟节点workshop_start_emr)。

后续步骤

  • 数据可视化展现:用户画像分析完成后,使用数据分析模块,将加工后的数据以图表形式直观展示,便于您快速提取关键信息,洞察数据背后的业务趋势。

  • 监控数据质量:为数据加工生成的表配置数据质量监控,提前识别脏数据并进行拦截,避免脏数据影响扩大。

  • 管理数据:用户画像分析任务流程完成后,在EMR Hive节点内将创建对应数据表。生成的数据表可在数据地图模块进行查看,可通过血缘查看生成表之间的关系。

  • API数据服务:获取最终加工后的数据后,使用数据服务模块,通过标准化的数据服务接口,实现数据的共享与应用,为其他使用API接收数据的业务模块提供数据。

  • 本页导读 (1)
  • 前提条件
  • 步骤一:搭建数据加工链路
  • 步骤二:注册自定义函数
  • 上传EMR Jar资源(ip2region.jar)
  • 注册EMR函数(getregion)
  • 步骤三:配置EMR节点
  • 配置dwd_log_info_di_emr节点
  • 配置dws_user_info_all_di_emr节点
  • 配置ads_user_info_1d_emr节点
  • 步骤四:加工数据
  • 步骤五:发布工作流
  • 步骤六:在生产环境运行任务
  • 后续步骤