文档

加工数据

更新时间:

本文将为您介绍如何通过DataWorks中的EMR Hive节点加工采集的日志数据。

前提条件

开始本实验前,请首先完成采集数据中的操作。

创建函数

上传资源

  1. 下载ip2region-emr.jar

  2. 数据开发页面打开WorkShop业务流程,右键单击EMR,选择新建资源 > EMR JAR,配置新建资源参数。image

    关键参数配置如下:

    • 存储路径:选择准备环境中创建的Bucket

    • 上传文件:选择已下载ip2region-emr.jar文件。

    其他参数保持默认或根据实际情况配置。

  3. 单击工具栏image.png,将资源提交至开发环境对应的EMR引擎项目。

注册函数

  1. 数据开发页面打开业务流程,右键单击EMR,选择新建函数

  2. 新建函数对话框中,函数名称输入getregion,单击新建,配置函数信息。image

    关键参数配置如下:

    • 所属资源:选择ip2region-emr.jar。

    • 类名:输入org.alidata.emr.udf.Ip2Region。

    其他参数保持默认或根据实际情况配置。

  3. 单击工具栏image.png,将函数提交至开发环境对应的EMR引擎项目。

设计业务流程

业务流程节点间依赖关系的配置请参见采集数据

双击新建的业务流程打开编辑页面,鼠标单击EMR Hive并拖拽至右侧的编辑页面。在新建节点对话框中,输入节点名称,单击提交

此处需要新建3个EMR Hive节点,依次命名为ods_log_info_ddw_user_info_all_drpt_user_info_d,并配置如下图所示的依赖关系。EMR

配置EMR Hive节点

  1. 配置ods_log_info_d节点。

    1. 双击ods_log_info_d节点,进入节点配置页面。

    2. 在节点编辑页面,编写如下语句。

      说明

      如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。

      --创建ODS层表
      CREATE TABLE IF NOT EXISTS ods_log_info_d (
        ip STRING COMMENT 'ip地址',
        uid STRING COMMENT '用户ID',
        `time` STRING COMMENT '时间yyyymmddhh:mi:ss',
        status STRING COMMENT '服务器返回状态码',
        bytes STRING COMMENT '返回给客户端的字节数',
        region STRING COMMENT '地域,根据ip得到',
        method STRING COMMENT 'http请求类型',
        url STRING COMMENT 'url',
        protocol STRING COMMENT 'http协议版本号',
        referer STRING COMMENT '来源url',
        device STRING COMMENT '终端类型 ',
        identity STRING COMMENT '访问类型 crawler feed user unknown'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      ALTER TABLE ods_log_info_d ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
      
      set hive.vectorized.execution.enabled = false;
      INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt='${bizdate}')
      SELECT ip
        , uid
        , tm
        , status
        , bytes 
        , getregion(ip) AS region --使用自定义UDF通过ip得到地域。 
        , regexp_extract(request, '(^[^ ]+) .*') AS method --通过正则把request差分为三个字段。
        , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
        , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
        , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  --通过正则清洗refer,得到更精准的url。
        , CASE
          WHEN lower(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。
          WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
          WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
          WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
          WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
          WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
          ELSE 'unknown'
        END AS device
        , CASE
          WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
          WHEN lower(agent) RLIKE 'feed'
          OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
          WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
          AND agent RLIKE '^[Mozilla|Opera]'
          AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
          ELSE 'unknown'
        END AS identity
        FROM (
          SELECT SPLIT(col, '##@@')[0] AS ip
          , SPLIT(col, '##@@')[1] AS uid
          , SPLIT(col, '##@@')[2] AS tm
          , SPLIT(col, '##@@')[3] AS request
          , SPLIT(col, '##@@')[4] AS status
          , SPLIT(col, '##@@')[5] AS bytes
          , SPLIT(col, '##@@')[6] AS referer
          , SPLIT(col, '##@@')[7] AS agent
          FROM ods_raw_log_d
        WHERE dt = '${bizdate}'
      ) a;
    3. 调度配置调度资源组选择为独享调度资源组。

    4. 单击工具栏中的保存

  2. 配置dw_user_info_all_d节点。

    1. 双击dw_user_info_all_d节点,进入节点配置页面。

    2. 在节点编辑页面,编写如下语句。

      说明

      如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。

      --创建DW层表
      CREATE TABLE IF NOT EXISTS dw_user_info_all_d (
        uid STRING COMMENT '用户ID',
        gender STRING COMMENT '性别',
        age_range STRING COMMENT '年龄段',
        zodiac STRING COMMENT '星座',
        region STRING COMMENT '地域,根据ip得到',
        device STRING COMMENT '终端类型 ',
        identity STRING COMMENT '访问类型 crawler feed user unknown',
        method STRING COMMENT 'http请求类型',
        url STRING COMMENT 'url',
        referer STRING COMMENT '来源url',
        `time` STRING COMMENT '时间yyyymmddhh:mi:ss'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      ALTER TABLE dw_user_info_all_d ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
      
      INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt='${bizdate}')
      SELECT COALESCE(a.uid, b.uid) AS uid
        , b.gender
        , b.age_range
        , b.zodiac
        , a.region
        , a.device
        , a.identity
        , a.method
        , a.url
        , a.referer
        , a.`time`
      FROM (
        SELECT *
        FROM ods_log_info_d
        WHERE dt = '${bizdate}'
      ) a
      LEFT OUTER JOIN (
        SELECT *
        FROM ods_user_info_d
        WHERE dt = '${bizdate}'
      ) b
      ON a.uid = b.uid;
    3. 调度配置调度资源组选择为独享调度资源组。

    4. 单击工具栏中的保存

  3. 配置rpt_user_info_d节点。

    1. 双击rpt_user_info_d节点,进入节点配置页面。

    2. 在节点编辑页面,编写如下语句。

      说明

      如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。

      --创建RPT层表
      CREATE TABLE IF NOT EXISTS rpt_user_info_d (
        uid STRING COMMENT '用户ID',
        region STRING COMMENT '地域,根据ip得到',
        device STRING COMMENT '终端类型 ',
        pv BIGINT COMMENT 'pv',
        gender STRING COMMENT '性别',
        age_range STRING COMMENT '年龄段',
        zodiac STRING COMMENT '星座'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      ALTER TABLE rpt_user_info_d ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
      
      INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt='${bizdate}')
      SELECT uid
        , MAX(region)
        , MAX(device)
        , COUNT(0) AS pv
        , MAX(gender)
        , MAX(age_range)
        , MAX(zodiac)
      FROM dw_user_info_all_d
      WHERE dt = '${bizdate}'
      GROUP BY uid;
    3. 调度配置调度资源组选择为独享调度资源组。

    4. 单击工具栏中的保存

提交业务流程

  1. 在业务流程的编辑页面,单击运行,运行业务流程。

  2. 待业务流程中的所有节点后出现成功,单击提交,提交运行成功的业务流程。

  3. 选择提交对话框中需要提交的节点,勾选忽略输入输出不一致的告警

  4. 单击提交

在生产环境运行任务

  1. 任务发布成功后,单击右上角的运维中心

    您也可以进入业务流程的编辑页面,单击工具栏中的前往运维,进入运维中心页面。

  2. 单击左侧导航栏中的周期任务运维 > 周期任务,进入周期任务页面,单击workstart虚节点。

  3. 在右侧的DAG图中,右键单击workstart节点,选择补数据 > 当前节点及下游节点

  4. 勾选需要补数据的任务,输入业务日期,单击确定,自动跳转至补数据实例页面。

  5. 单击刷新,直至SQL任务全部运行成功即可。

后续步骤

任务周期性调度场景下,为保障任务产出的表数据符合预期,我们可以对任务产出的表数据进行数据质量监控,详情请参见配置数据质量监控

  • 本页导读 (1)
文档反馈