加工数据

本文为您介绍如何用Spark SQL创建外部用户信息表ods_user_info_d_spark以及日志信息表ods_raw_log_d_spark访问存储在私有OSS中的用户与日志数据,通过DataWorksEMR Spark SQL节点进行加工得到目标用户画像数据,阅读本文后,您可以了解如何通过Spark SQL来计算和分析已同步的数据,完成数仓简单数据加工场景。

前提条件

开始本实验前,请先完成同步数据中的操作。

  • 已通过EMR Spark SQL节点创建ods_user_info_d_spark外部表,可成功访问同步至私有OSS的用户基本信息。

  • 已通过EMR Spark SQL节点创建ods_raw_log_d_spark外部表,可成功访问同步至私有OSS的日志信息。

注意事项

由于EMR Serverless Spark空间不支持注册函数,无法通过注册新函数的方式将日志信息进行切分并转换IP为地域。本案例通过Spark SQL自有的函数对ods_raw_log_d_spark日志表进行切分的方式生成dwd_log_info_di_spark,从而实现进一步的用户画像分析。

章节目标

本小节将对ods_user_info_d_sparkods_raw_log_d_spark外部表进行加工处理,并生成基本用户画像表。

  1. 通过Spark SQLods_raw_log_d_spark表进行处理,生成新的明细日志表dwd_log_info_di_spark

  2. 利用明细日志表dwd_log_info_di_spark和用户表ods_user_info_d_sparkuid字段进行关联,生成汇总用户日志表dws_user_info_all_di_spark

  3. dws_user_info_all_di_spark表,直接应用于数据消费,表数据较多,所以将其进一步加工为ads_user_info_1d_spark表。

步骤一:设计工作流程

在上一同步数据章节中完成了用户表用户画像分析(Spark版)的数据同步流程。在数据加工阶段将会新增dwd_log_info_di_spark节点对日志表进行细分,dws_user_info_all_di_spark节点对日志明细表和用户表进行连接生成新表后,再通过ads_user_info_1d_spark节点对用户日志明细表进一步处理,实现用户画像表的输出。

  1. 进入数据开发。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 同步数据阶段,已经成功用EMR Spark SQL节点创建外部表访问私有OSS数据,接下来的流程的目标是对数据进行进一步加工,以输出基本用户画像数据。

    • 各层级节点以及工作逻辑。

      在业务流程画布中单击新建节点,创建以下节点,以供加工数据使用。

      节点分类

      节点类型

      节点名称

      (以最终产出表命名)

      代码逻辑

      EMR

      imageEMR Spark SQL

      dwd_log_info_di_spark

      ods_raw_log_d_spark日志表进行拆分,生成新的日志表,以供后续关联使用。

      EMR

      imageEMR Spark SQL

      dws_user_info_all_di_spark

      用户基本信息初步加工后的日志数据进行汇总,合并为一张表。

      EMR

      imageEMR Spark SQL

      ads_user_info_1d_spark

      进一步加工产出基本用户画像。

    • 流程DAG图。

      将节点组件拖拽至业务流程画布,并通过拉线设置节点上下游依赖的方式,设计数据加工阶段的业务流程。

      image

步骤二:配置EMR Spark SQL节点

配置完成业务流程后,在EMR Spark SQL节点中利用Spark SQL函数对ods_raw_log_d_spark表进行切分处理,从而通过对切分后的日志表与用户表进行关联成新的明细表之后进一步对数据表进行清洗、处理操作后,从而实现对不同用户的一个用户画像。

配置dwd_log_info_di_spark节点

在业务流程面板,双击EMR Spark SQL节点dwd_log_info_di_spark节点,进入dwd_log_info_di_spark节点的编辑页面,编写处理上游ods_raw_log_d_spark表,将日志明细数据写入dwd_log_info_di_spark表中。

  1. 配置代码

    双击dwd_log_info_di_spark节点,进入节点配置页面,编写如下语句。

    -- 场景:以下SQLSpark SQL,通过Spark SQL函数将加载至Spark中的ods_raw_log_d_spark按"##@@"进行切分后生成多个字段,并写入新表dwd_log_info_di_spark。
    -- 补充:
    --      DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。
    --      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。  
    
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
      ip STRING COMMENT 'ip地址',
      uid STRING COMMENT '用户ID',
      tm STRING COMMENT '时间yyyymmddhh:mi:ss',
      status STRING COMMENT '服务器返回状态码',
      bytes STRING COMMENT '返回给客户端的字节数',
      method STRING COMMENT'请求方法',
      url STRING COMMENT 'url',
      protocol STRING COMMENT '协议',
      referer STRING ,
      device STRING,
      identity STRING
      )
    
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt='${bizdate}')
    SELECT ip, 
           uid, 
           tm, 
           status, 
           bytes, 
           regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
           regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
           regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
           regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
           CASE 
               WHEN lower(agent) RLIKE 'android' THEN 'android' 
               WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' 
               WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' 
               WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' 
               WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' 
               WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' 
               ELSE 'unknown' 
           END AS device, 
           CASE 
               WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' 
               WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' 
               WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' 
               ELSE 'unknown' 
           END AS identity
    FROM (
        SELECT 
            SPLIT(col, '##@@')[0] AS ip, 
            SPLIT(col, '##@@')[1] AS uid, 
            SPLIT(col, '##@@')[2] AS tm, 
            SPLIT(col, '##@@')[3] AS request, 
            SPLIT(col, '##@@')[4] AS status, 
            SPLIT(col, '##@@')[5] AS bytes, 
            SPLIT(col, '##@@')[6] AS referer, 
            SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_spark
        WHERE dt = '${bizdate}'
    ) a;
  2. 配置调度属性

    配置项

    图示

    新增参数

    调度参数项中单击新增参数,添加:

    • 参数名:bizdate

    • 参数值:$[yyyymmdd-1]

    详情可参见:配置调度参数

    image

    调度依赖

    调度依赖确认产出表已作为本节点输出。

    格式为worksspacename.节点名

    详情可参见:配置调度依赖

    image

    说明

    时间属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_spark的定时调度时间控制,即每日00:30后才会调度。

  3. Spark系统参数配置(可选)

    您可在节点高级设置处配置Spark特有属性参数,本实验基于EMR Serverless SparkSpark SQL任务的系统参数,可参考以下表格内容配置高级参数:

    高级参数

    配置说明

    SERVERLESS_RELEASE_VERSION

    变更Serverless Spark引擎版本,示例如下:

    "SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"

    SERVERLESS_QUEUE_NAME

    变更资源队列,示例如下:

    "SERVERLESS_QUEUE_NAME": "dev_queue"

    SERVERLESS_SQL_COMPUTE

    修改SQL Compute,示例如下:

    "SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"

    FLOW_SKIP_SQL_ANALYZE

    SQL语句执行方式。取值如下:

    • true:表示每次执行多条SQL语句。

    • false:表示每次执行一条SQL语句。

    说明

    该参数仅支持用于数据开发环境测试运行流程。

    其他

    • 您可以直接在高级配置里追加自定义SPARK参数。例如, spark.eventLog.enabled : false ,DataWorks会自动在最终下发到Spark工作空间的EMR集群代码将代码补全为Spark工作空间所支持的代码,格式为:--conf key=value

    • 还支持配置全局Spark参数,详情请参见设置全局Spark参数

    若您想查看更多Spark属性参数设置,可参考Spark Configuration

  4. 保存配置

    本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

  5. 验证日志表拆分情况

    在确保上游节点以及本节点运行成功的情况下,在左侧导航栏的临时查询中新建EMR Spark SQL临时查询,编写SQL查看EMR Spark SQL节点创建的表是否正常产出。

    -- 您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20230222,则业务日期为20230221,即任务运行日期的前一天。
    SELECT * FROM dwd_log_info_di_spark WHERE dt ='业务日期';
    说明

    在本教程在SQL中配置了调度参数${bizdate},并将其赋值为T-1。在离线计算场景下bizdate为业务交易发生的日期,也常被称为业务日期(business date)。例如,今天统计前一天的营业额,此处的前一天指的是交易发生的日期,也就是业务日期。

配置dws_user_info_all_di_spark节点

基于dwd_log_info_di_spark日志表和ods_user_info_d_spark用户表,通过uid对两表进行关联,产出新的用户日志明细表dws_user_info_all_di_spark

  1. 编辑代码

    双击dws_user_info_all_di_spark节点,进入节点配置页面。在节点编辑页面,编写如下语句。

    -- 场景:以下SQLSpark SQL,通过uiddwd_log_info_di_sparkods_user_info_d_spark进行关联,并写入对应的dt分区。
    -- 补充:
    --      DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。
    --      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
        uid        STRING COMMENT '用户ID',
        gender     STRING COMMENT '性别',
        age_range  STRING COMMENT '年龄段',
        zodiac     STRING COMMENT '星座',
        device     STRING COMMENT '终端类型 ',
        method     STRING COMMENT 'http请求类型',
        url        STRING COMMENT 'url',
        `time`     STRING COMMENT '时间yyyymmddhh:mi:ss'
    )
    PARTITIONED BY (dt STRING);
    
    --添加分区
    ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    --插入user表与日志表数据
    INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
    SELECT 
        COALESCE(a.uid, b.uid) AS uid,
        b.gender AS gender,    
        b.age_range AS age_range,
        b.zodiac AS zodiac,
        a.device AS device,
        a.method AS method,
        a.url AS url,
        a.tm
    FROM (
      SELECT * 
      FROM dwd_log_info_di_spark 
      WHERE dt='${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT * 
      FROM ods_user_info_d_spark 
      WHERE dt='${bizdate}'
    ) b
    ON 
        a.uid = b.uid;
  2. 配置调度属性

    配置项

    配置内容

    图示

    新增参数

    调度参数项中单击新增参数,添加:

    • 参数名:bizdate

    • 参数值:$[yyyymmdd-1]

    详情可参见:配置调度参数

    image

    调度依赖

    调度依赖确认产出表已作为本节点输出。

    格式为worksspacename.节点名

    详情可参见:配置调度依赖

    image

    说明

    时间属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_spark的定时调度时间控制,即每日00:30后才会调度。

  3. Spark系统参数配置(可选)

    您可在节点高级设置处配置Spark特有属性参数,本实验基于EMR Serverless SparkSpark SQL任务的系统参数,可参考以下表格内容配置高级参数:

    高级参数

    配置说明

    SERVERLESS_RELEASE_VERSION

    变更Serverless Spark引擎版本,示例如下:

    "SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"

    SERVERLESS_QUEUE_NAME

    变更资源队列,示例如下:

    "SERVERLESS_QUEUE_NAME": "dev_queue"

    SERVERLESS_SQL_COMPUTE

    修改SQL Compute,示例如下:

    "SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"

    FLOW_SKIP_SQL_ANALYZE

    SQL语句执行方式。取值如下:

    • true:表示每次执行多条SQL语句。

    • false:表示每次执行一条SQL语句。

    说明

    该参数仅支持用于数据开发环境测试运行流程。

    其他

    • 您可以直接在高级配置里追加自定义SPARK参数。例如, spark.eventLog.enabled : false ,DataWorks会自动在最终下发到Spark工作空间的EMR集群代码将代码补全为Spark工作空间所支持的代码,格式为:--conf key=value

    • 还支持配置全局Spark参数,详情请参见设置全局Spark参数

    若您想查看更多Spark属性参数设置,可参考Spark Configuration

  4. 保存配置

    本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

  5. 验证用户日志明细表数据情况

    在确保上游节点以及本节点运行成功的情况下,在左侧导航栏的临时查询中新建EMR Spark SQL临时查询,编写SQL查看EMR Spark SQL节点创建的表是否正常产出。

    -- 您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20240808,则业务日期为20240807,即任务运行日期的前一天。
    SELECT * FROM dws_user_info_all_di_spark WHERE dt ='业务日期';
    说明

    在本教程在SQL中配置了调度参数${bizdate},并将其赋值为T-1。在离线计算场景下bizdate为业务交易发生的日期,也常被称为业务日期(business date)。例如,今天统计前一天的营业额,此处的前一天指的是交易发生的日期,也就是业务日期。

配置ads_user_info_1d_spark节点

基于dws_user_info_all_di_spark表,进行最大值、以及计数计算,产出ads_user_info_1d_spark表作为用户画像表进行消费。

  1. 编辑代码

    双击ads_user_info_1d_spark节点,进入节点配置页面。在节点编辑页面,编写如下语句。

    -- 场景:以下SQLSpark SQL,通过Spark SQL函数将Spark中的dws_user_info_all_di_spark表进一步的加工,并写入新表ads_user_info_1d_spark。
    -- 补充:
    --      DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。
    --      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
      uid STRING COMMENT '用户ID',
      device STRING COMMENT '终端类型 ',
      pv BIGINT COMMENT 'pv',
      gender STRING COMMENT '性别',
      age_range STRING COMMENT '年龄段',
      zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
      dt STRING
    );
    ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_spark
    WHERE dt = '${bizdate}'
    GROUP BY uid;
  2. 调度配置

    配置项

    配置内容

    图示

    新增参数

    调度参数项中单击新增参数,添加:

    • 参数名:bizdate

    • 参数值:$[yyyymmdd-1]

    详情可参见:配置调度参数

    image

    调度依赖

    调度依赖确认产出表已作为本节点输出。

    格式为worksspacename.节点名

    详情可参见:配置调度依赖

    image

    说明

    时间属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_spark的定时调度时间控制,即每日00:30后才会调度。

  3. Spark系统参数配置(可选)

    您可在节点高级设置处配置Spark特有属性参数,本实验基于EMR Serverless SparkSpark SQL任务的系统参数,可参考以下表格内容配置高级参数:

    高级参数

    配置说明

    SERVERLESS_RELEASE_VERSION

    变更Serverless Spark引擎版本,示例如下:

    "SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"

    SERVERLESS_QUEUE_NAME

    变更资源队列,示例如下:

    "SERVERLESS_QUEUE_NAME": "dev_queue"

    SERVERLESS_SQL_COMPUTE

    修改SQL Compute,示例如下:

    "SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"

    FLOW_SKIP_SQL_ANALYZE

    SQL语句执行方式。取值如下:

    • true:表示每次执行多条SQL语句。

    • false:表示每次执行一条SQL语句。

    说明

    该参数仅支持用于数据开发环境测试运行流程。

    其他

    • 您可以直接在高级配置里追加自定义SPARK参数。例如, spark.eventLog.enabled : false ,DataWorks会自动在最终下发到Spark工作空间的EMR集群代码将代码补全为Spark工作空间所支持的代码,格式为:--conf key=value

    • 还支持配置全局Spark参数,详情请参见设置全局Spark参数

    若您想查看更多Spark属性参数设置,可参考Spark Configuration

  4. 保存配置

    本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

  5. 验证用户画像表数据情况

    在确保上游节点以及本节点运行成功的情况下,在左侧导航栏的临时查询中新建EMR Spark SQL临时查询,编写SQL查看该节点创建的表是否正常产出。

    -- 您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20230222,则业务日期为20230221,即任务运行日期的前一天。
    SELECT * FROM ads_user_info_1d_spark WHERE dt ='业务日期';
    说明

    在本教程在SQL中配置了调度参数${bizdate},并将其赋值为T-1。在离线计算场景下bizdate为业务交易发生的日期,也常被称为业务日期(business date)。例如,今天统计前一天的营业额,此处的前一天指的是交易发生的日期,也就是业务日期。

步骤四:提交业务流程

完成业务流程所有配置后,测试该流程是否能正常运行,测试成功后,需要提交流程等待发布。

  1. 在业务流程的编辑页面,单击运行,运行业务流程。

  2. 待业务流程中的所有节点后出现成功,单击提交,提交运行成功的业务流程。

  3. 选择提交对话框中需要提交的节点,勾选忽略输入输出不一致的告警

  4. 单击提交

  5. 提交成功后,即可在发布页面发布流程节点。

步骤五:在生产环境运行任务

任务发布后,次日才会生成实例运行,您可以通过补数据来对已发布流程进行补数据操作,以便查看任务在生产环境是否可以运行,详情可参见执行补数据并查看补数据实例(新版)

  1. 任务发布成功后,单击右上角的运维中心

    您也可以进入业务流程的编辑页面,单击工具栏中的前往运维,进入运维中心页面。

  2. 单击左侧导航栏中的周期任务运维 > 周期任务,进入周期任务页面,单击workshop_start_spark虚节点。

  3. 在右侧的DAG图中,右键单击workshop_start_spark节点,选择补数据 > 当前节点及下游节点

  4. 勾选需要补数据的任务,输入业务日期,单击确定,自动跳转至补数据实例页面。

  5. 单击刷新,直至SQL任务全部运行成功即可。