本文为您介绍如何用Spark SQL创建外部用户信息表ods_user_info_d_spark
以及日志信息表ods_raw_log_d_spark
访问存储在私有OSS中的用户与日志数据,通过DataWorks的EMR Spark SQL节点进行加工得到目标用户画像数据,阅读本文后,您可以了解如何通过Spark SQL来计算和分析已同步的数据,完成数仓简单数据加工场景。
前提条件
开始本实验前,请先完成同步数据中的操作。
步骤一:搭建数据加工链路
在同步数据阶段,已经成功将数据用Spark进行加载,接下来的流程的目标是对数据进行进一步加工,以输出基本用户画像数据。
在Data Studio左侧导航栏单击
,然后在项目目录区域找到已创建好的工作流
User_profile_analysis_Spark
,单击进入工作流看板。单击编辑工作流,在工作流开发页面,从左侧拖拽EMR Spark SQL节点至画布中,分别设置节点名称。
本教程节点名称示例及作用如下:
节点类型
节点名称
节点作用
节点类型
节点名称
节点作用
EMR Spark SQL
dwd_log_info_di_spark
通过Spark SQL对
ods_raw_log_d_spark
表进行处理,将数据写入dwd_log_info_di_spark
表内。EMR Spark SQL
dws_user_info_all_di_spark
利用明细日志表
dwd_log_info_di_spark
和用户表ods_user_info_d_spark
的uid字段进行关联,生成汇总用户日志表。对用户基本信息表(
ods_user_info_d_spark
)和初步加工后的日志数据表(dwd_log_info_di_spark
)进行汇总,将数据写入dws_user_info_all_di_spark
表中。EMR Spark SQL
ads_user_info_1d_spark
对
dws_user_info_all_di_spark
表中数据进一步加工,将数据写入ads_user_info_1d_spark
表,产出基本用户画像。手动拖拽连线,配置各节点的上游节点。最终效果如下:
在顶部工具栏单击保存。
步骤二:配置数据加工节点
配置完成业务流程后,使用EMR Spark SQL节点对用户基本信息表和明细日志表进行处理,最终生成初步用户画像表ads_user_info_1d_spark
。
配置dwd_log_info_di_spark节点
在本节点的示例代码中,利用Spark自带的函数处理上游表ods_raw_log_d_spark
字段的SQL代码,并将其写入dwd_log_info_di_spark
表中。
在Workflow画布中,鼠标悬停至
dwd_log_info_di_spark
节点上,单击打开节点。将如下代码粘贴至SQL编辑页面。
-- 场景:以下SQL为Spark SQL,通过Spark SQL函数将加载至Spark中的ods_raw_log_d_spark按"##@@"进行切分后生成多个字段,并写入新表dwd_log_info_di_spark。 -- 补充: -- DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。 -- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'ip地址', uid STRING COMMENT '用户ID', tm STRING COMMENT '时间yyyymmddhh:mi:ss', status STRING COMMENT '服务器返回状态码', bytes STRING COMMENT '返回给客户端的字节数', method STRING COMMENT'请求方法', url STRING COMMENT 'url', protocol STRING COMMENT '协议', referer STRING, device STRING, identity STRING ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/'; ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;
上述代码中的location地址,需根据您实际情况替换,其中
dw-spark-demo
是您OSS对象存储环境准备时创建的OSS Bucket名。在EMR Spark SQL编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在离线同步任务配置页面右侧单击调度配置,确认如下关键参数取值是否与本教程一致。调度配置中其他参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存工作流。
配置dws_user_info_all_di_spark节点
本节点将用户基本信息表(ods_user_info_d_spark
)和初步加工后的日志数据表(dwd_log_info_di_spark
)进行汇总,并将结果写入dws_user_info_all_di_spark
表中。
在Workflow画布中,鼠标悬停至
dws_user_info_all_di_spark
节点上,单击打开节点。将如下代码粘贴至SQL编辑页面。
-- 场景:以下SQL为Spark SQL,通过uid将dwd_log_info_di_spark和ods_user_info_d_spark进行关联,并写入对应的dt分区。 -- 补充: -- DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。 -- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT '用户ID', gender STRING COMMENT '性别', age_range STRING COMMENT '年龄段', zodiac STRING COMMENT '星座', device STRING COMMENT '终端类型 ', method STRING COMMENT 'http请求类型', url STRING COMMENT 'url', `time` STRING COMMENT '时间yyyymmddhh:mi:ss' ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/'; --添加分区 ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); --插入user表与日志表数据 INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;
上述代码中的location地址,需根据您实际情况替换,其中
dw-spark-demo
是您OSS对象存储环境准备时创建的OSS Bucket名。在EMR Spark SQL编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在离线同步任务配置页面右侧单击调度配置,确认如下关键参数取值是否与本教程一致。调度配置中其他参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存工作流。
配置ads_user_info_1d_spark节点
本节点对dws_user_info_all_di_spark
表中数据进一步加工,将数据写入ads_user_info_1d_spark
表,产出基本用户画像。
在Workflow画布中,鼠标悬停至
ads_user_info_1d_spark
节点上,单击打开节点。将如下代码粘贴至SQL编辑页面。
-- 场景:以下SQL为Spark SQL,通过Spark SQL函数将Spark中的dws_user_info_all_di_spark表进一步的加工,并写入新表ads_user_info_1d_spark。 -- 补充: -- DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。 -- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT '用户ID', device STRING COMMENT '终端类型 ', pv BIGINT COMMENT 'pv', gender STRING COMMENT '性别', age_range STRING COMMENT '年龄段', zodiac STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/'; ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;
上述代码中的location地址,需根据您实际情况替换,其中
dw-spark-demo
是您OSS对象存储环境准备时创建的OSS Bucket名。在EMR Spark SQL编辑页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在离线同步任务配置页面右侧单击调度配置,确认如下关键参数取值是否与本教程一致。调度配置中其他参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存工作流。
步骤三:加工数据
同步数据。
在Workflow画布顶部工具栏中,单击运行,设置各节点定义的参数变量在本次运行中的取值(本教程使用
20250223
,您可以按需修改),单击确定后,等待运行完成。查询数据加工结果。
进入SQL查询页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,单击进入数据分析页面,单击左侧导航栏的SQL查询进入SQL查询页面。
配置SQL查询文件。
单击我的文件后的
按钮新建文件,自定义SQL查询文件名。
单击已新建的文件,进入文件编辑页面。
在文件编辑页面单击右上角的
按钮,配置需进行SQL查询的工作空间等信息,配置详情如下:
配置项
说明
配置项
说明
工作空间
选择
User_profile_analysis_Spark
工作流所在的工作空间。数据源类型
下拉选择
EMR Spark SQL
。数据源名称
选择在准备环境时绑定的EMR Serverless Spark为计算资源开发环境。
SQL Compute
选择EMR Serverless Spark中创建的SQL会话。
单击确认按钮,完成查询数据源的配置。
编辑查询SQL。
在确保该章节内的所有节点运行成功的情况下,编写以下SQL查询以检查EMR Spark SQL节点创建的外部表是否正常产出。
-- 您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20250223,则业务日期为20250222,即任务运行日期的前一天。 SELECT * FROM dwd_log_info_di_spark WHERE dt ='业务日期';
步骤四:在生产环境运行任务
任务发布后,在次日才会生成实例运行,您可以通过补数据来对已发布流程进行补数据操作,以便查看任务在生产环境是否可以运行,详情可参见执行补数据并查看补数据实例(新版)。
任务发布成功后,单击右上角的运维中心。
您也可以单击左上方的
图标,选择 。
单击左侧导航栏中的
,进入周期任务页面,单击workshop_start_spark
虚节点。在右侧的DAG图中,右键单击
workshop_start_spark
节点,选择 。勾选需要补数据的任务,设置业务日期,单击提交并跳转。
在补数据页面单击刷新,直至SQL任务全部运行成功即可。
后续步骤
- 本页导读
- 前提条件
- 步骤一:搭建数据加工链路
- 步骤二:配置数据加工节点
- 配置dwd_log_info_di_spark节点
- 配置dws_user_info_all_di_spark节点
- 配置ads_user_info_1d_spark节点
- 步骤三:加工数据
- 步骤四:在生产环境运行任务
- 后续步骤