本文将为您介绍如何通过DataWorks中的EMR Hive节点加工采集的日志数据。
前提条件
开始本实验前,请首先完成采集数据中的操作。
创建函数
上传资源
在数据开发页面打开WorkShop业务流程,右键单击EMR,选择
,配置新建资源参数。关键参数配置如下:
存储路径:选择准备环境中创建的Bucket。
上传文件:选择已下载ip2region-emr.jar文件。
其他参数保持默认或根据实际情况配置。
单击工具栏,将资源提交至开发环境对应的EMR引擎项目。
注册函数
在数据开发页面打开业务流程,右键单击EMR,选择新建函数。
在新建函数对话框中,函数名称输入getregion,单击新建,配置函数信息。
关键参数配置如下:
所属资源:选择ip2region-emr.jar。
类名:输入org.alidata.emr.udf.Ip2Region。
其他参数保持默认或根据实际情况配置。
单击工具栏,将函数提交至开发环境对应的EMR引擎项目。
设计业务流程
业务流程节点间依赖关系的配置请参见采集数据。
双击新建的业务流程打开编辑页面,鼠标单击EMR Hive并拖拽至右侧的编辑页面。在新建节点对话框中,输入节点名称,单击提交。
此处需要新建3个EMR Hive节点,依次命名为ods_log_info_d、dw_user_info_all_d和rpt_user_info_d,并配置如下图所示的依赖关系。
配置EMR Hive节点
配置ods_log_info_d节点。
双击ods_log_info_d节点,进入节点配置页面。
在节点编辑页面,编写如下语句。
说明如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。
--创建ODS层表 CREATE TABLE IF NOT EXISTS ods_log_info_d ( ip STRING COMMENT 'ip地址', uid STRING COMMENT '用户ID', `time` STRING COMMENT '时间yyyymmddhh:mi:ss', status STRING COMMENT '服务器返回状态码', bytes STRING COMMENT '返回给客户端的字节数', region STRING COMMENT '地域,根据ip得到', method STRING COMMENT 'http请求类型', url STRING COMMENT 'url', protocol STRING COMMENT 'http协议版本号', referer STRING COMMENT '来源url', device STRING COMMENT '终端类型 ', identity STRING COMMENT '访问类型 crawler feed user unknown' ) PARTITIONED BY ( dt STRING ); ALTER TABLE ods_log_info_d ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); set hive.vectorized.execution.enabled = false; INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt='${bizdate}') SELECT ip , uid , tm , status , bytes , getregion(ip) AS region --使用自定义UDF通过ip得到地域。 , regexp_extract(request, '(^[^ ]+) .*') AS method --通过正则把request差分为三个字段。 , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url , regexp_extract(request, '.* ([^ ]+$)') AS protocol , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --通过正则清洗refer,得到更精准的url。 , CASE WHEN lower(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。 WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device , CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^[Mozilla|Opera]' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip , SPLIT(col, '##@@')[1] AS uid , SPLIT(col, '##@@')[2] AS tm , SPLIT(col, '##@@')[3] AS request , SPLIT(col, '##@@')[4] AS status , SPLIT(col, '##@@')[5] AS bytes , SPLIT(col, '##@@')[6] AS referer , SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d WHERE dt = '${bizdate}' ) a;
在调度配置中调度资源组选择为独享调度资源组。
单击工具栏中的。
配置dw_user_info_all_d节点。
双击dw_user_info_all_d节点,进入节点配置页面。
在节点编辑页面,编写如下语句。
说明如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。
--创建DW层表 CREATE TABLE IF NOT EXISTS dw_user_info_all_d ( uid STRING COMMENT '用户ID', gender STRING COMMENT '性别', age_range STRING COMMENT '年龄段', zodiac STRING COMMENT '星座', region STRING COMMENT '地域,根据ip得到', device STRING COMMENT '终端类型 ', identity STRING COMMENT '访问类型 crawler feed user unknown', method STRING COMMENT 'http请求类型', url STRING COMMENT 'url', referer STRING COMMENT '来源url', `time` STRING COMMENT '时间yyyymmddhh:mi:ss' ) PARTITIONED BY ( dt STRING ); ALTER TABLE dw_user_info_all_d ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt='${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid , b.gender , b.age_range , b.zodiac , a.region , a.device , a.identity , a.method , a.url , a.referer , a.`time` FROM ( SELECT * FROM ods_log_info_d WHERE dt = '${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d WHERE dt = '${bizdate}' ) b ON a.uid = b.uid;
在调度配置中调度资源组选择为独享调度资源组。
单击工具栏中的。
配置rpt_user_info_d节点。
双击rpt_user_info_d节点,进入节点配置页面。
在节点编辑页面,编写如下语句。
说明如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。
--创建RPT层表 CREATE TABLE IF NOT EXISTS rpt_user_info_d ( uid STRING COMMENT '用户ID', region STRING COMMENT '地域,根据ip得到', device STRING COMMENT '终端类型 ', pv BIGINT COMMENT 'pv', gender STRING COMMENT '性别', age_range STRING COMMENT '年龄段', zodiac STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ); ALTER TABLE rpt_user_info_d ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt='${bizdate}') SELECT uid , MAX(region) , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dw_user_info_all_d WHERE dt = '${bizdate}' GROUP BY uid;
在调度配置中调度资源组选择为独享调度资源组。
单击工具栏中的。
提交业务流程
在业务流程的编辑页面,单击,运行业务流程。
待业务流程中的所有节点后出现,单击,提交运行成功的业务流程。
选择提交对话框中需要提交的节点,勾选忽略输入输出不一致的告警。
单击提交。
在生产环境运行任务
任务发布成功后,单击右上角的运维中心。
您也可以进入业务流程的编辑页面,单击工具栏中的前往运维,进入运维中心页面。
单击左侧导航栏中的 ,进入周期任务页面,单击workstart虚节点。
在右侧的DAG图中,右键单击workstart节点,选择 。
勾选需要补数据的任务,输入业务日期,单击确定,自动跳转至补数据实例页面。
单击刷新,直至SQL任务全部运行成功即可。
后续步骤
任务周期性调度场景下,为保障任务产出的表数据符合预期,我们可以对任务产出的表数据进行数据质量监控,详情请参见配置数据质量监控。
- 本页导读 (1)