本教程以MySQL数据源中的用户基本信息ods_user_info_d
表和HttpFile中的网站访问日志数据user_log.txt
文件为例,通过数据集成离线同步任务分别同步至私有OSS中,再通过Spark SQL创建外部表来访问私有OSS数据存储。本章节旨在完成数据同步操作。
章节目标
本章节通过数据集成将平台提供的MySQL数据源内的用户基本信息数据与HttpFile数据源内的用户网站访问日志数据同步至私有OSS对象存储创建的数据源中。
源端数据源类型
源端待同步数据
源端表结构
目标端数据源类型
MySQL
表:ods_user_info_d
用户基本信息数据
uid 用户名
gender 性别
age_range 年龄分段
zodiac 星座
OSS
HttpFile
文件:user_log.txt
用户网站访问日志数据
一行为一条用户访问记录。
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];
OSS
完成同步任务后,通过EMR Spark SQL创建外部表来访问私有OSS数据存储。
操作步骤
步骤一:设计业务流程
本步骤内,将数据集成节点以及EMR Spark SQL 节点相结合,形成用户画像分析任务流程中获取数据部分的流程。主要是通过ods_raw_log_d_2oss_spark
节点从HttpFile数据源获取日志数据至私有OSS数据源中,再通过ods_raw_log_d_spark
节点生成简单的日志外部表,从私有OSS数据存储中获取用户日志数据。以及通过ods_user_info_d_2oss_spark
从MySQL数据源同步用户基本信息至私有OSS数据源中,再通过ods_user_info_d_spark
实现外部表的创建,从私有OSS数据存储中获取用户基本信息数据。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入数据开发。
设计业务流程
新建业务流程。
数据开发需基于业务流程使用对应的开发组件进行具体开发操作。在创建节点之前,您需要先新建业务流程。具体操作方法可参见创建业务流程。
该业务流程的命名为:
用户画像分析_Spark
。设计业务流程。
业务流程新建完成后,将自动展开该业务流程画布。请根据业务流程设计,在业务流程画布中单击新建节点,通过将节点组件拖拽至业务流程画布,并通过拉线设置节点上下游依赖的方式,设计数据同步阶段的业务流程。
在本教程中,由于虚拟节点和同步节点之间并无血缘关系,因此我们通过业务流程拉线的方式来设置节点的依赖关系。有关更多依赖关系设置方式的详细信息,详情请参见调度依赖配置指引。以下为各个节点的类型、命名以及作用的介绍。
节点分类
节点类型
节点命名
节点作用
通用
虚拟节点
workshop_start_spark
用于统筹管理整个用户画像分析业务流程,例如业务流程起调时间。当业务流程较复杂时,可使数据流转路径更清晰。该节点为空跑任务,无须编辑代码。
数据集成
离线同步
ods_raw_log_d_2oss_spark
用于将HttpFile数据源存储的用户网站访问记录,通过离线同步的方式同步至私有OSS数据源中,以供后续Spark SQL获取。
数据集成
离线同步
ods_user_info_d_2oss_spark
用于将MySQL数据源存储的用户基本信息数据,通过离线同步的方式同步至私有OSS数据源中,以供后续Spark SQL获取。
EMR
EMR Spark SQL
ods_raw_log_d_spark
在EMR Spark SQL节点中,创建表
ods_raw_log_d_spark
,并通过该外部表访问私有OSS中的用户网站访问记录数据。EMR
EMR Spark SQL
ods_user_info_d_spark
在EMR Spark SQL节点中,创建表
ods_user_info_d_spark
,并通过该外部表访问私有OSS中的用户基本信息数据。
配置调度逻辑
本案例通过虚拟节点workshop_start_Spark
控制整个业务流程每天00:30调度执行,以下为虚拟节点关键调度配置,其他节点调度无须变更,实现逻辑详情请参见:场景:如何配置业务流程定时时间。其他调度配置相关说明,请参见:任务调度属性配置概述。
调度配置 | 图片示例 | 说明 |
调度时间配置 | 虚拟节点配置调度时间为00:30,该虚拟节点会在每日00:30调起当前业务流程并执行。 | |
调度依赖配置 | 由于虚拟节点 |
DataWorks中的所有节点都需要依赖于上游节点,数据同步阶段的所有任务都以虚拟节点workshop_start_spark
为依赖,通过workshop_start_spark
节点来触发数据同步业务流程的执行。
步骤二:搭建同步链路
配置完成业务流程后,分别双击ods_user_info_d_2oss_spark
以及ods_raw_log_d_2oss_spark
数据集成节点,配置用户数据同步至私有OSS和配置用户日志同步至私有OSS,并且通过ods_raw_log_d_spark
和ods_user_info_d_spark
采用 Spark SQL代码,实现通过Spark SQL创建的外表来访问存储于私有OSS的数据。
用户数据与日志同步至OSS数据源
使用数据集成将平台提供的用户数据与用户日志同步至私有OSS对象存储的Bucket目录下。
配置用户日志同步至OSS
通过离线数据集成任务,实现从平台的HttpFile数据源内的获取用户日志信息,同步至私有OSS数据源中。
同步HttpFile数据源的日志信息至自建的OSS。
在数据开发页面,双击ods_raw_log_d_2oss_spark节点,进入节点配置页面。
配置同步网络链接。
完成以下网络与资源配置后,单击下一步,并根据界面提示完成连通性测试。
参数
描述
数据来源
数据来源:HttpFile
数据源名称:user_behavior_analysis_httpfile
我的资源组
选择已购买的Serverless资源组。
数据去向
数据去向:OSS
数据源名称:选择前文创建的私有OSS数据源,此处示例为test_g。
配置同步任务。
参数
描述
数据来源
文本类型:选择text类型。
文件路径:/user_log.txt。
列分隔符:输入列分隔符为|。
压缩格式:包括None、Gzip、Bzip2和Zip四种类型,此处选择None。
是否跳过表头:选择No。
数据去向
文本类型:选择text类型。
文件名(含路径):根据您自建OSS的目录进行输入,示例为ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt,其中ods_raw_log_d为您自建的目录名,$bizdate表示获取前一天的日期。
列分隔符:输入列分隔符为|。
调度设置。
配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息。以下为配置的内容。
说明DataWorks提供调度参数,可实现在调度场景下,将每日数据写入不同的OSS路径及文件下,并以业务日期对路径目录与文件进行命名。
在实际场景下,您可以在数据去向的文件名(含目录)配置中通过
${变量名}
格式自定义路径中的变量,并通过在调度配置页面为变量赋值调度参数的方式,实现调度场景下动态生成数据去向目录与文件名。配置项
配置内容
图示
调度参数
在调度参数项中单击新增参数,添加:
参数名:
bizdate
参数值:
$[yyyymmdd-1]
详情可参见:配置调度参数。
调度依赖
在调度依赖确认产出表已作为本节点输出。
格式为
workspacename.节点名
。详情可参见:配置调度依赖。
配置完成后,单击工具栏中的图标,进行保存。
配置用户数据同步至OSS
通过离线数据集成任务,实现从平台的MySQL数据源内的获取用户数据信息,同步至私有OSS数据源中。
在数据开发页面,双击ods_user_info_d_2oss_spark节点,进入节点配置页面。
配置同步网络链接。
完成以下网络与资源配置后,单击下一步,并根据界面提示完成连通性测试。
参数
描述
数据来源
数据来源:MySQL
数据源名称:user_behavior_analysis_mysql
我的资源组
选择已购买的Serverless资源组。
数据去向
数据去向:OSS
数据源名称:选择前文创建的私有OSS数据源,此处示例为test_g。
配置同步任务。
参数
描述
数据来源
表:选择数据源中的ods_user_info_d。
切分键:建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。此处设置切分键为uid。
数据去向
文本类型:选择text类型。
文件名(含路径):根据您自建OSS的目录进行输入,示例为ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。其中ods_user_info_d为您自建的目录名,$bizdate表示获取前一天的日期。
列分隔符:输入列分隔符为|。
调度设置
配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息。以下为配置的内容。
说明DataWorks提供调度参数,可实现在调度场景下,将每日数据写入不同的OSS路径及文件下,并以业务日期对路径目录与文件进行命名。
在实际场景下,您可以在数据去向的文件名(含目录)配置中通过
${变量名}
格式自定义路径中的变量,并通过在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下动态生成数据去向目录与文件名。配置项
配置内容
图示
调度参数
在调度参数项中单击新增参数,添加:
参数名:
bizdate
参数值:
$[yyyymmdd-1]
详情可参见:配置调度参数。
调度依赖
在调度依赖确认产出表已作为本节点输出。
格式为
workspacename.节点名
。详情可参见:配置调度依赖。
配置完成后,单击工具栏中的图标。
创建Spark外部表加载OSS数据
数据通过离线集成任务同步至私有OSS数据源后,基于生成的OSS文件,通过Spark SQL的create
语法创建ods_raw_log_d_spark
与ods_user_info_d_spark
表,并通过LOCATION
来获取OSS中的用户信息文件、用户日志信息以供后续数据加工使用。
配置ods_raw_log_d_spark节点
基于通过EMR Spark SQL创建的外部表ods_raw_log_d_spark
,用LOCATION
来访问离线数据集成任务写入私有OSS对象存储Bucket的日志信息。
代码配置。
-- 场景:以下SQL为Spark SQL,通过EMR Spark SQL创建的外部表ods_raw_log_d_spark,用LOCATION来获取离线数据集成任务写入私有OSS对象存储Bucket的日志信息,并添加对应的dt分区。 -- 补充: -- DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。 -- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/' ;
说明上述代码中的location为示例路径,与之前配置离线同步任务时的数据去向相同,需要输入您建立的文件路径名称,其中dw-emr-demo是您准备环境时创建的OSS Bucket域名。
配置调度配置。
为
ods_raw_log_d_spark
节点配置任务调度,通过配置的调度参数来获取对应业务日期的私有OSS日志文件,并写入同样业务日期分区的Spark表内。配置项
配置内容
图示
调度参数
在调度参数项中单击新增参数,添加:
参数名:
bizdate
参数值:
$[yyyymmdd-1]
,详情可参见:配置调度参数。调度依赖
在调度依赖确认产出表已作为本节点输出。
格式为
workspacename.节点名
。详情可参见:配置调度依赖。
说明本章节在SQL中配置了调度参数
${bizdate}
,并将其赋值为T-1
。在离线计算场景下bizdate为业务交易发生的日期,也常被称为业务日期。例如,今天统计前一天的营业额,此处的前一天指的是交易发生的日期,也就是业务日期。完成配置后,单击保存节点。
配置ods_user_info_d_spark节点
基于通过EMR Spark SQL节点创建的外部表ods_user_info_d_spark
,用LOCATION
来访问离线数据集成任务写入私有OSS对象存储Bucket的用户信息。
代码配置。
-- 场景:以下SQL为Spark SQL,通过EMR Spark SQL节点创建的外部表ods_user_info_d_spark,用LOCATION来获取离线数据集成任务写入私有OSS对象存储Bucket的用户信息,并写入对应的dt分区。 -- 补充: -- DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。 -- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT '用户ID' ,`gender` STRING COMMENT '性别' ,`age_range` STRING COMMENT '年龄段' ,`zodiac` STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ;
说明上述代码中的location为示例路径,与之前配置离线同步任务时的数据去向相同,需要输入您建立的文件路径名称,其中dw-emr-demo是您准备环境时创建的OSS Bucket域名。
配置调度配置。
为
ods_user_info_d_spark
节点配置任务调度,通过配置的调度参数来获取对应业务日期的私有OSS用户信息文件,并写入同样业务日期分区的Spark表内。配置项
配置内容
图示
调度参数
在调度参数项中单击新增参数,添加:
参数名:
bizdate
参数值:
$[yyyymmdd-1]
,详情可参见:配置调度参数。调度依赖
在调度依赖确认产出表已作为本节点输出。
格式为
worksspacename.节点名
详情可参见:配置调度依赖。
完成配置后,单击保存节点。
步骤三:验证同步数据
在确保该章节内的所有节点运行成功的情况下,在左侧导航栏的临时查询中新建EMR Spark SQL临时查询,编写SQL查看EMR Spark SQL节点创建的外部表是否正常产出。
-- 您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20240808,则业务日期为20240807,即任务运行日期的前一天。
SELECT * FROM ods_raw_log_d_spark WHERE dt ='业务日期';--查询ods_raw_log_d_spark表
SELECT * FROM ods_user_info_d_spark WHERE dt ='业务日期';--查询ods_user_info_d_spark表
在验证同步数据的SQL中,可将WHERE条件替换为"dt = ${bizdate}"
,在临时查询任务中单击带参运行,为SQL占位符${bizdate}
赋值后运行即可。
后续步骤
现在,您已经完成了同步数据,您可以继续下一个教程。在下一个教程中,您将学习将用户基本信息数据、用户网站访问日志数据在Spark中进行加工处理。详情请参见加工数据。