本文将介绍如何创建HttpFile和MySQL数据源以访问用户信息和网站日志数据,配置数据同步链路将这些数据同步到在环境准备阶段创建的OSS存储中,并通过创建Spark外表解析OSS中存储的数据。通过查询验证数据同步结果,确认是否完成整个数据同步操作。
前提条件
开始本文的操作前,请准备好需要使用的环境。详情请参见准备环境。
步骤一:新建数据源
为确保后续数据处理流程的顺利进行,您需要在DataWorks工作空间中创建如下数据源,用于获取平台提供的初始数据。
MySQL数据源:本教程将数据源命名为
user_behavior_analysis_mysql
,用于获取存储在MySQL的用户基本信息数据(ods_user_info_d
)。Httpfile数据源:本教程将数据源命名为
user_behavior_analysis_httpfile
,用于获取存储在OSS的用户网站访问记录(user_log.txt
)。OSS数据源:用于存储从MySQL数据源和Httpfile数据源同步的用户基本信息数据和用户网站访问记录数据,以供后续Spark创建外部表后进行读取。
新建MySQL数据源(user_behavior_analysis_mysql)
本教程提供的用户基本信息存储在MySQL数据库中,您需要创建MySQL数据源,以便在后续将MySQL数据库中的用户基本信息数据(ods_user_info_d
)同步至准备环境阶段创建的自有OSS对象存储内。
进入数据源页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入管理中心。
单击左侧导航栏的数据源,进入数据源页面。
单击新增数据源,搜索选择数据源类型为MySQL。
在创建MySQL数据源页面,配置相关参数。在本教程中开发环境和生产环境都按如下示例值填写。
以下为本教程所需配置的关键参数,未说明参数保持默认即可。
参数
描述
参数
描述
数据源名称
输入数据源名称,本教程请填写
user_behavior_analysis_mysql
。数据源描述
DataWorks案例体验专用数据源,在离线同步配置时读取该数据源即可访问平台提供的测试数据,该数据源只支持数据集成场景读取,其他模块不支持使用。
配置模式
选择连接串模式。
连接地址
主机地址IP:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
端口号:
3306
数据库名称
输入数据库名,本教程请填写
workshop
。用户名
输入用户名,本教程请填写
workshop
。密码
输入密码,本教程请填写
workshop#2017
。认证选项
无认证。
在连通配置区域,分别单击生产环境和开发环境的测试连通性,确保连通状态为可连通。
需确保资源组已绑定至工作空间,并配置了公网访问能力,否则后续数据同步时将会报错。配置步骤请参见准备环境。
如果您无可选的资源组,可参考链接配置区域的说明提示,单击前往购买和绑定已购资源组。
单击完成创建。
新建HttpFile数据源(user_behavior_analysis_httpfile)
本教程提供的用户网站访问记录数据存储在OSS中,您需要创建Httpfile数据源,以便在后续将OSS中的用户网站访问记录(user_log.txt
)同步至准备环境阶段创建的自有OSS对象存储内。
在管理中心页面,单击左侧导航栏的数据源。
单击新增数据源,在新增数据源对话框中,搜索选择数据源类型为HttpFile。
在创建HttpFile数据源页面,配置相关参数。在本教程中开发环境和生产环境都按如下示例值填写。
以下为本教程所需配置的关键参数,未说明参数保持默认即可。
参数
描述
参数
描述
数据源名称
输入数据源名称,本教程请填写
user_behavior_analysis_httpfile
。数据源描述
DataWorks案例体验专用数据源,在离线同步配置时读取该数据源即可访问平台提供的测试数据,该数据源只支持数据集成场景读取,其他模块不支持使用。
URL域名
开发环境和生产环境的URL域名均配置为
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com
。在连通配置区域,分别单击生产环境和开发环境的测试连通性,确保连通状态为可连通。
需确保资源组已绑定至工作空间,并配置了公网访问能力,否则后续数据同步时将会报错。配置步骤请参见准备环境。
如果您无可选的资源组,可参考链接配置区域的说明提示,单击前往购买和绑定已购资源组。
单击完成创建。
新建OSS数据源
本次实验将MySQL数据源的用户信息和HttpFile数据源的日志信息同步至OSS数据源。
在管理中心页面,单击进入数据源页面后单击新增数据源。
在新增数据源对话框中,搜索选择数据源类型为OSS。
在创建OSS数据源页面,配置各项参数。在本教程中开发环境和生产环境都按示例值填写。
参数
描述
参数
描述
数据源名称
输入数据源的名称,本示例为test_g。
数据源描述
对数据源进行简单描述。
访问模式
选择Access Key模式。
AccessKey ID
当前登录账号的AccessKey ID,您可以进入AccessKey页面复制AccessKey ID。
AccessKey Secret
输入当前登录账号的AccessKey Secret。
AccessKey Secret只在创建时显示,不支持后续再次查看,请妥善保管。如果AccessKey泄露或丢失,请删除并创建新的AccessKey。
Endpoint
输入
http://oss-cn-shanghai-internal.aliyuncs.com
。Bucket
您准备环境时准备的私有OSS Bucket的名称,示例为
dw-spark-demo
。单击指定资源组连通状态(开发环境)和连通状态(生产环境)列的测试连通性,等待界面提示测试完成,连通状态为可连通。
需确保至少一个资源组为可连通状态,否则此数据源无法使用向导模式创建同步任务。
单击完成创建,创建OSS数据源。
步骤二:搭建同步链路
单击左上方的
图标,选择 。
在左侧导航栏单击
,在项目目录区域,单击
,选择新建工作流,设置工作流名称。本教程设置为
User_profile_analysis_spark
。在工作流开发页面,从左侧拖拽虚拟节点、离线同步以及EMR SPARK SQL节点至画布中,分别设置节点名称。
本教程节点名称示例及作用如下:
节点类型
节点名称
节点作用
节点类型
节点名称
节点作用
虚拟节点
workshop_start_spark
用于统筹管理整个用户画像分析工作流,例如工作流内部节点的启动时间。当工作流较复杂时,可使数据流转路径更清晰。该节点为空跑任务,无须编辑代码。
离线同步节点
ods_raw_log_d_2oss_spark
用于将存储于OSS的用户网站访问记录同步至您创建的OSS中。
离线同步节点
ods_raw_user_d_2oss_spark
用于将存储于MySQL的用户基本信息同步至您创建的OSS中。
EMR SPARK SQL
ods_raw_log_d_spark
用于创建
ods_raw_log_d_spark
外表,读取存储在OSS中的用户网站访问记录。EMR SPARK SQL
ods_user_info_d_spark
用于创建
ods_user_info_d_spark
外表,读取存储在OSS中的用户基本信息。手动拖拽连线,将
workShop_start_spark
节点设置为两个离线同步节点的上游节点。最终效果如下:工作流调度配置。
在工作流编辑页面右侧单击调度配置,配置相关参数。以下为本教程所需配置的关键参数,未说明参数保持默认即可。
调度配置参数
说明
调度配置参数
说明
调度参数
为整个工作流设置调度参数,工作流中的内部节点可直接使用。本教程配置为
bizdate=$[yyyymmdd-1]
,获取前一天的日期。调度周期
本教程配置为
日
。调度时间
本教程配置调度时间为
00:30
,该工作流会在每日00:30
启动。节点依赖配置
Workflow无上游依赖,可不配置。为了方便统一管理,您可以单击使用工作空间根节点,将工作流挂载到工作空间根节点下。
工作空间根节点命名格式为:
工作空间名_root
。在顶部工具栏单击保存,保存工作流。
步骤三:配置同步任务
配置初始节点
在Workflow画布中,鼠标悬停至
workshop_start_spark
节点上,单击打开节点。在
workshop_start_spark
节点编辑页面右侧单击调度配置,配置相关参数。以下为本示例所需配置的关键参数,未说明的参数请保持默认值可。调度配置参数
说明
调度配置参数
说明
调度类型
本教程配置为
空跑调度
。调度资源组
本教程配置为准备环境阶段创建的Serverless资源组。
节点依赖配置
由于
workshop_start_spark
为初始节点,无上游依赖,此时可以单击使用工作空间根节点,由工作空间根节点触发工作流执行。工作空间根节点命名为:
工作空间名_root
。
配置用户数据同步链路(ods_user_info_d_2oss_spark)
离线ods_user_info_d_2oss_Spark
节点实现将MySQL数据源内用户数据信息,同步至私有OSS数据源中。
在Workflow画布中,鼠标悬停至
ods_user_info_d_2oss_Spark
节点上,单击打开节点。配置同步链路网络与资源。
参数
描述
参数
描述
数据来源
数据来源:
MySQL
。数据源名称:
user_behavior_analysis_mysql
。
我的资源组
选择准备环境节点购买的Serverless资源组。
数据去向
数据去向:
OSS
。数据源名称:选择前文创建的私有OSS数据源,此处示例为
test_g
。
单击下一步,配置同步任务。
配置数据来源与去向
以下为本示例所需配置的关键参数,未说明的参数请保持默认值。
参数
描述
参数
描述
数据来源
表:选择数据源中的ods_user_info_d。
切分键:建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。此处设置切分键为uid。
数据去向
文本类型:选择text类型。
文件名(含路径):根据您自建OSS的目录进行输入,示例为ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。其中ods_user_info_d为您自建的目录名,$bizdate表示获取前一天的日期。
列分隔符:输入列分隔符为|。
确认字段映射及通道控制。
DataWorks通过配置源端与目标端字段映射关系,实现源端指定字段数据写入目标端指定字段,同时支持设置任务并发数、脏数据策略等。本教程脏数据策略配置为不容忍脏数据,其他配置保持默认。更多信息,请参见通过向导模式配置离线同步任务。
配置调试参数。
在离线同步任务配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
配置项
配置说明
配置项
配置说明
资源组
选择准备环境阶段购买的Serverless资源组。
脚本参数
单击添加参数,将bizdate赋值为
yyyymmdd
格式(例如bizdate=20250223
)。在调试时,Data Studio将会使用此常量替换任务中的定义的变量。(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在离线同步任务配置页面右侧单击调度配置,确认如下关键参数取值是否与本教程一致。调度配置中其他参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
配置用户日志同步链路(ods_raw_log_d_2oss_spark)
离线ods_raw_log_d_2oss_spark
节点实现将HttpFile数据源内获取的用户日志信息,同步至私有OSS数据源中。
在业务流程开发面板上,鼠标悬浮在
ods_raw_log_d_2oss_spark
节点上,单击打开节点按钮,进入节点配置页面。配置网络与资源配置。
参数
描述
参数
描述
数据来源
数据来源:
HttpFile
。数据源名称:
user_behavior_analysis_httpfile
。
我的资源组
选择准备环境节点购买的Serverless资源组。
数据去向
数据去向:
OSS
。数据源名称:选择前文创建的私有OSS数据源,此处示例为test_g。
单击下一步,配置同步任务。
配置数据来源与去向
以下为本示例所需配置的关键参数,未说明的参数请保持默认值。
参数
描述
参数
描述
数据来源
文件路径:/user_log.txt。
文本类型:选择text类型。
列分隔符:输入列分隔符为|。
压缩格式:包括None、Gzip、Bzip2和Zip四种类型,此处选择None。
是否跳过表头:选择No。
数据去向
文本类型:选择text类型。
文件名(含路径):根据您自建OSS的目录进行输入,示例为ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt,其中ods_raw_log_d为您自建的目录名,$bizdate表示获取前一天的日期。
列分隔符:输入列分隔符为|。
确认字段映射及通道控制。
DataWorks通过配置源端与目标端字段映射关系,实现源端指定字段数据写入目标端指定字段,同时支持设置任务并发数、脏数据策略等。本教程脏数据策略配置为不容忍脏数据,其他配置保持默认。更多信息,请参见通过向导模式配置离线同步任务。
配置调试参数。
在离线同步任务配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
配置项
配置说明
配置项
配置说明
资源组
选择准备环境阶段购买的Serverless资源组。
脚本参数
单击添加参数,将bizdate赋值为
yyyymmdd
格式(例如bizdate=20250223
)。在调试时,Data Studio将会使用此常量替换任务中的定义的变量。(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在离线同步任务配置页面右侧单击调度配置,确认如下关键参数取值是否与本教程一致。调度配置中其他参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
步骤四:同步数据
步骤五:解析数据
完成数据同步后,需使用Spark SQL创建外部表并解析OSS中存储的用户基本信息数据和用户网站访问记录数据。
创建日志表(ods_raw_log_d_spark)并解析数据
数据通过离线集成任务同步至私有OSS数据源后,基于生成的OSS文件,通过EMR SPARK SQL创建的外部表ods_raw_log_d_spark
,用LOCATION
来访问离线数据集成任务写入私有OSS对象存储Bucket的日志信息。
在Workflow画布中,鼠标悬停至
ods_raw_log_d_spark
节点上,单击打开节点。编辑建表语句。
-- 场景:以下SQL为Spark SQL,通过EMR Spark SQL创建的外部表ods_raw_log_d_spark,用LOCATION来获取离线数据集成任务写入私有OSS对象存储Bucket的日志信息,并添加对应的dt分区。 -- 补充: -- DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。 -- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'
上述代码中的location地址,需根据您实际情况替换,其中
dw-spark-demo
是您OSS对象存储环境准备时创建的OSS Bucket名。配置调试参数。
在EMR SPARK SQL任务配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在离线同步任务配置页面右侧单击调度配置,确认如下关键参数取值是否与本教程一致。调度配置中其他参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
创建用户表(ods_user_info_d_spark)并解析数据
数据通过离线集成任务同步至私有OSS数据源后,基于生成的OSS文件,通过EMR SPARK SQL节点创建的外部表ods_user_info_d_spark
,用LOCATION
来访问离线数据集成任务写入私有OSS对象存储Bucket的用户信息。
在Workflow画布中,鼠标悬停至
ods_user_info_d_spark
节点上,单击打开节点。配置同步链路网络与资源。
-- 场景:以下SQL为Spark SQL,通过EMR Spark SQL节点创建的外部表ods_user_info_d_spark,用LOCATION来获取离线数据集成任务写入私有OSS对象存储Bucket的用户信息,并写入对应的dt分区。 -- 补充: -- DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。 -- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT '用户ID' ,`gender` STRING COMMENT '性别' ,`age_range` STRING COMMENT '年龄段' ,`zodiac` STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ;
上述代码中的location地址,需根据您实际情况替换,其中
dw-spark-demo
是您OSS对象存储环境准备时创建的OSS Bucket名。配置调试参数。
在EMR SPARK SQL任务配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在离线同步任务配置页面右侧单击调度配置,确认如下关键参数取值是否与本教程一致。调度配置中其他参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
步骤六:运行任务
同步数据。
在工作流工具栏中,单击运行,设置各节点定义的参数变量在本次运行中的取值(本教程使用
20250223
,您可以按需修改),单击确定后,等待运行完成。查询数据同步结果。
进入SQL查询页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,单击进入数据分析页面,单击左侧导航栏的SQL查询进入SQL查询页面。
配置SQL查询文件。
单击我的文件后的
按钮新建文件,自定义SQL查询文件名。
单击已新建的文件,进入文件编辑页面。
在文件编辑页面单击右上角的
按钮,配置需进行SQL查询的工作空间等信息,配置详情如下:
配置项
说明
配置项
说明
工作空间
选择
User_profile_analysis_spark
工作流所在的工作空间。数据源类型
下拉选择
EMR Spark SQL
。数据源名称
选择在准备环境时绑定的EMR Serverless Spark为计算资源开发环境。
SQL Compute
选择EMR Serverless Spark 中创建的SQL会话。
单击确认按钮,完成查询数据源的配置。
编辑查询SQL。
在确保该章节内的所有节点运行成功的情况下,编写以下SQL查询以检查EMR SPARK SQL节点创建的外部表是否正常产出。
-- 您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20250223,则业务日期为20250222,即任务运行日期的前一天。 SELECT * FROM ods_raw_log_d_spark WHERE dt ='业务日期';--查询ods_raw_log_d_spark表 SELECT * FROM ods_user_info_d_spark WHERE dt ='业务日期';--查询ods_user_info_d_spark表
后续步骤
现在,您已经学习了如何进行日志数据同步,完成数据的同步,您可以继续下一个教程。在该教程中,您将学习如何对同步的数据进行计算与分析。详情请参见加工数据。
- 本页导读
- 前提条件
- 步骤一:新建数据源
- 新建MySQL数据源(user_behavior_analysis_mysql)
- 新建HttpFile数据源(user_behavior_analysis_httpfile)
- 新建OSS数据源
- 步骤二:搭建同步链路
- 步骤三:配置同步任务
- 配置初始节点
- 配置用户数据同步链路(ods_user_info_d_2oss_spark)
- 配置用户日志同步链路(ods_raw_log_d_2oss_spark)
- 步骤四:同步数据
- 步骤五:解析数据
- 创建日志表(ods_raw_log_d_spark)并解析数据
- 创建用户表(ods_user_info_d_spark)并解析数据
- 步骤六:运行任务
- 后续步骤