本文将为您介绍如何通过DataWorks中的EMR Hive节点加工采集的日志数据。
前提条件
开始本实验前,请首先完成
采集数据中的操作。
在OSS上传资源
- 下载ip2region-emr.jar存放至本地。
- 登录OSS控制台。
- 在左侧存储空间列表中,单击目标存储空间(示例为dw-emr-demo)。
- 单击文件管理,打开在 《环境准备》章节新建的用于存储JAR资源的目录,示例的目录名为ip2region。
- 单击上传文件,在上传文件对话框中,设置上传文件的参数。

参数 |
描述 |
上传到 |
选择当前目录,示例的目录路径为oss://dw-emr-demo/ip2region/。
|
文件ACL |
默认为继承Bucket,即单个文件的读写权限以Bucket的读写权限为准。
|
上传文件 |
单击直接上传,选择已下载的ip2region-emr.jar文件。
|
设计业务流程
业务流程节点间依赖关系的配置请参见采集数据。
双击新建的业务流程打开编辑页面,鼠标单击EMR Hive并拖拽至右侧的编辑页面。在新建节点对话框中,输入节点名称,单击提交。
此处需要新建3个EMR Hive节点,依次命名为
ods_log_info_d、
dw_user_info_all_d和
rpt_user_info_d,并配置如下图所示的依赖关系。

配置EMR Hive节点
- 配置ods_log_info_d节点。
- 双击ods_log_info_d节点,进入节点配置页面。
- 在节点编辑页面,编写如下语句。
说明 如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。
--创建ODS层表
CREATE TABLE IF NOT EXISTS ods_log_info_d (
ip STRING COMMENT 'ip地址',
uid STRING COMMENT '用户ID',
`time` STRING COMMENT '时间yyyymmddhh:mi:ss',
status STRING COMMENT '服务器返回状态码',
bytes STRING COMMENT '返回给客户端的字节数',
region STRING COMMENT '地域,根据ip得到',
method STRING COMMENT 'http请求类型',
url STRING COMMENT 'url',
protocol STRING COMMENT 'http协议版本号',
referer STRING COMMENT '来源url',
device STRING COMMENT '终端类型 ',
identity STRING COMMENT '访问类型 crawler feed user unknown'
)
PARTITIONED BY (
dt STRING
);
create function getregion as 'org.alidata.emr.udf.Ip2Region'
using jar 'oss://dw-emr-demo/ip2region/ip2region-emr.jar';
ALTER TABLE ods_log_info_d ADD IF NOT EXISTS PARTITION (dt=${bizdate});
set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt=${bizdate})
SELECT ip
, uid
, tm
, status
, bytes
, getregion(ip) AS region --使用自定义UDF通过ip得到地域。
, regexp_extract(request, '(^[^ ]+) .*') AS method --通过正则把request差分为三个字段。
, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
, regexp_extract(request, '.* ([^ ]+$)') AS protocol
, regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --通过正则清洗refer,得到更精准的url。
, CASE
WHEN lower(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。
WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device
, CASE
WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN lower(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^[Mozilla|Opera]'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS tm
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d
WHERE dt = ${bizdate}
) a;
- 单击工具栏中的
。
- 配置dw_user_info_all_d节点。
- 双击dw_user_info_all_d节点,进入节点配置页面。
- 在节点编辑页面,编写如下语句。
说明 如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。
--创建DW层表
CREATE TABLE IF NOT EXISTS dw_user_info_all_d (
uid STRING COMMENT '用户ID',
gender STRING COMMENT '性别',
age_range STRING COMMENT '年龄段',
zodiac STRING COMMENT '星座',
region STRING COMMENT '地域,根据ip得到',
device STRING COMMENT '终端类型 ',
identity STRING COMMENT '访问类型 crawler feed user unknown',
method STRING COMMENT 'http请求类型',
url STRING COMMENT 'url',
referer STRING COMMENT '来源url',
`time` STRING COMMENT '时间yyyymmddhh:mi:ss'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE dw_user_info_all_d ADD IF NOT EXISTS PARTITION (dt = ${bizdate});
INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt=${bizdate})
SELECT COALESCE(a.uid, b.uid) AS uid
, b.gender
, b.age_range
, b.zodiac
, a.region
, a.device
, a.identity
, a.method
, a.url
, a.referer
, a.`time`
FROM (
SELECT *
FROM ods_log_info_d
WHERE dt = ${bizdate}
) a
LEFT OUTER JOIN (
SELECT *
FROM ods_user_info_d
WHERE dt = ${bizdate}
) b
ON a.uid = b.uid;
- 单击工具栏中的
。
- 配置rpt_user_info_d节点。
- 双击rpt_user_info_d节点,进入节点配置页面。
- 在节点编辑页面,编写如下语句。
说明 如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。
--创建RPT层表
CREATE TABLE IF NOT EXISTS rpt_user_info_d (
uid STRING COMMENT '用户ID',
region STRING COMMENT '地域,根据ip得到',
device STRING COMMENT '终端类型 ',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT '性别',
age_range STRING COMMENT '年龄段',
zodiac STRING COMMENT '星座'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE rpt_user_info_d ADD IF NOT EXISTS PARTITION (dt=${bizdate});
INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt=${bizdate})
SELECT uid
, MAX(region)
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dw_user_info_all_d
WHERE dt = ${bizdate}
GROUP BY uid;
- 单击工具栏中的
。
提交业务流程
- 在业务流程的编辑页面,单击
,运行业务流程。
- 待业务流程中的所有节点后出现
,单击
,提交运行成功的业务流程。
- 选择提交对话框中需要提交的节点,勾选忽略输入输出不一致的告警。
- 单击提交。
在生产环境运行任务
- 任务发布成功后,单击右上角的运维中心。
您也可以进入业务流程的编辑页面,单击工具栏中的前往运维,进入运维中心页面。
- 单击左侧导航栏中的,进入周期任务页面,单击workstart虚节点。
- 在右侧的DAG图中,右键单击workstart节点,选择。
- 勾选需要补数据的任务,输入业务日期,单击确定,自动跳转至补数据实例页面。
- 单击刷新,直至SQL任务全部运行成功即可。
后续步骤
现在,您已经学习了如何创建EMR Hive节点、如何处理原始日志数据。您可以继续下一个教程,学习如何在数据地图模块开启元数据收集功能,并查看数据表信息。详情请参见收集和查看元数据。