本教程以MySQL中的用户基本信息ods_user_info_d
表和OSS中的网站访问日志数据user_log.txt
文件为例,通过数据集成离线同步任务分别同步至StarRocks的ods_user_info_d_starrocks
、ods_raw_log_d_starrocks
表。旨在介绍如何通过DataWorks数据集成实现异构数据源间的数据同步,完成数仓数据同步操作。
前提条件
请确保已准备好所需的工作环境。具体操作步骤请参见:准备环境。
步骤一:新建数据源
为确保后续数据处理流程的顺利进行,您需要在DataWorks工作空间中创建如下数据源,用于获取平台提供的初始数据。
MySQL数据源:本教程将数据源命名为
user_behavior_analysis_mysql
,用于获取存储在MySQL的用户基本信息数据(ods_user_info_d
)。Httpfile数据源:本教程将数据源命名为
user_behavior_analysis_httpfile
,用于获取存储在OSS的用户网站访问记录(user_log.txt
)。
新建MySQL数据源(user_behavior_analysis_mysql)
本教程提供的用户基本信息存储在MySQL数据库中,您需要创建MySQL数据源,以便在后续将MySQL数据库中的用户基本信息数据(ods_user_info_d
)同步至StarRocks
进入数据源页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入管理中心。
单击左侧导航栏的数据源,进入数据源页面。
单击新增数据源,搜索选择数据源类型为MySQL。
在创建MySQL数据源页面,配置相关参数。在本教程中开发环境和生产环境都按如下示例值填写。
以下为本教程所需配置的关键参数,未说明参数保持默认即可。
参数
描述
参数
描述
数据源名称
输入数据源名称,本教程请填写
user_behavior_analysis_mysql
。数据源描述
DataWorks案例体验专用数据源,在离线同步配置时读取该数据源即可访问平台提供的测试数据,该数据源只支持数据集成场景读取,其他模块不支持使用。
配置模式
选择连接串模式。
连接地址
主机地址IP:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
端口号:
3306
数据库名称
输入数据库名,本教程请填写
workshop
。用户名
输入用户名,本教程请填写
workshop
。密码
输入密码,本教程请填写
workshop#2017
。认证选项
无认证。
在连通配置区域,分别单击生产环境和开发环境的测试连通性,确保连通状态为可连通。
需确保资源组已绑定至工作空间,并配置了公网访问能力,否则后续数据同步时将会报错。配置步骤请参见准备环境。
如果您无可选的资源组,可参考链接配置区域的说明提示,单击前往购买和绑定已购资源组。
单击完成创建。
新建HttpFile数据源(user_behavior_analysis_httpfile)
本教程提供的用户网站访问记录数据存储在OSS中,您需要创建Httpfile数据源,以便在后续将OSS中的用户网站访问记录(user_log.txt
)同步至StarRocks。
在管理中心页面,单击左侧导航栏的数据源。
单击新增数据源,在新增数据源对话框中,搜索选择数据源类型为HttpFile。
在创建HttpFile数据源页面,配置相关参数。在本教程中开发环境和生产环境都按如下示例值填写。
以下为本教程所需配置的关键参数,未说明参数保持默认即可。
参数
描述
参数
描述
数据源名称
输入数据源名称,本教程请填写
user_behavior_analysis_httpfile
。数据源描述
DataWorks案例体验专用数据源,在离线同步配置时读取该数据源即可访问平台提供的测试数据,该数据源只支持数据集成场景读取,其他模块不支持使用。
URL域名
开发环境和生产环境的URL域名均配置为
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com
。在连通配置区域,分别单击生产环境和开发环境的测试连通性,确保连通状态为可连通。
需确保资源组已绑定至工作空间,并配置了公网访问能力,否则后续数据同步时将会报错。配置步骤请参见准备环境。
如果您无可选的资源组,可参考链接配置区域的说明提示,单击前往购买和绑定已购资源组。
单击完成创建。
步骤二:搭建同步链路
单击左上方的
图标,选择 。然后在页面顶部切换至本教程创建好的工作空间。
在左侧导航栏单击
,在项目目录区域,单击
,选择新建工作流,设置工作流名称。本教程设置为
user_profile_analysis_starrocks
。在工作流开发页面,从左侧拖拽虚拟节点、StarRocks节点和离线同步节点至画布中,分别设置节点名称。
本教程节点名称示例及作用如下:
节点类型
节点名称
节点作用
节点类型
节点名称
节点作用
虚拟节点
workshop_start_starrocks
用于统筹管理整个用户画像分析工作流,可使数据流转路径更清晰。该节点为空跑任务,无须编辑代码。
StarRocks节点
ddl_ods_user_info_d_starrocks
同步任务前创建,用于接收源端MySQL存储的用户基本信息数据的StarRocks表
ods_user_info_d_starrocks
。StarRocks节点
ddl_ods_raw_log_d_starrocks
同步任务前创建,用于接收源端OSS存储的用户网站访问记录的StarRocks表
ods_raw_log_d_starrocks
。离线同步节点
ods_user_info_d_starrocks
用于将存储于MySQL的用户基本信息数据同步至StarRocks表
ods_user_info_d_starrocks
。离线同步节点
ods_raw_log_d_starrocks
用于将存储于OSS的用户网站访问记录同步至StarRocks表
ods_raw_log_d_starrocks
。手动拖拽连线,将
workshop_start_starrocks
节点设置为两个离线同步节点的上游节点。最终效果如下:工作流调度配置。
在工作流编辑页面右侧单击调度配置,配置相关参数。以下为本教程所需配置的关键参数,未说明参数保持默认即可。
调度配置参数
说明
调度配置参数
说明
调度参数
为整个工作流设置调度参数,工作流中的内部节点可直接使用。本教程配置为
bizdate=$[yyyymmdd-1]
,获取前一天的日期。调度周期
本教程配置为
日
。调度时间
本教程配置调度时间为
00:30
,该工作流会在每日00:30
启动。节点依赖配置
Workflow无上游依赖,可不配置。为了方便统一管理,您可以单击使用工作空间根节点,将工作流挂载到工作空间根节点下。
工作空间根节点命名格式为:
工作空间名_root
。在顶部工具栏单击保存,保存工作流。
步骤三:配置同步任务
配置初始节点
在工作流画布中,鼠标悬停至
workshop_start_starrocks
节点上,单击打开节点。在
workshop_start_starrocks
节点编辑页面右侧单击调度配置,配置相关参数。以下为本示例所需配置的关键参数,未说明参数保持默认即可。调度配置参数
说明
调度配置参数
说明
调度类型
本教程配置为
空跑调度
。调度资源组
本教程配置为准备环境阶段创建的Serverless资源组。
节点依赖配置
由于
workshop_start_starrocks
为初始节点,无上游依赖,此时可以单击使用工作空间根节点,由工作空间根节点触发工作流执行。工作空间根节点命名为:
工作空间名_root
。
创建用户表ddl_ods_user_info_d_starrocks
同步数据至StarRocks前,需创建ddl_ods_user_info_d_starrocks
表,以接收从MySQL数据源中同步的用户基本信息数据,可通过以下方式在节点内建表,也可以通过数据目录手动创建以下数据表。
在工作流画布中,鼠标悬停至
ddl_ods_user_info_d_starrocks
节点上,单击打开节点。编辑建表语句。
CREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks ( uid STRING COMMENT '用户ID', gender STRING COMMENT '性别', age_range STRING COMMENT '年龄段', zodiac STRING COMMENT '星座', dt STRING not null COMMENT '时间' ) DUPLICATE KEY(uid) COMMENT '用户行为分析案例-用户基本信息表' PARTITION BY(dt) PROPERTIES("replication_num" = "1");
配置调试参数。
在StarRocks节点配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在节点编辑页面右侧单击调度配置。调度配置中参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
创建日志表ddl_ods_raw_log_d_starrocks
同步数据至StarRocks前,需创建ddl_ods_raw_log_d_starrocks
表,以接收从Httpfile数据源中同步的用户网站访问记录数据,可通过以下方式在节点内建表,也可以通过数据目录手动创建以下数据表。
在工作流画布中,鼠标悬停至
ddl_ods_raw_log_d_starrocks
节点上,单击打开节点。编辑建表语句。
CREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks ( col STRING COMMENT '日志', dt DATE not null COMMENT '时间' ) DUPLICATE KEY(col) COMMENT '用户行为分析案例-网站访问日志原始数据表' PARTITION BY(dt) PROPERTIES ("replication_num" = "1");
配置调试参数。
在离线同步任务配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在节点编辑页面右侧单击调度配置。调度配置中参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
配置用户数据同步链路(ods_user_info_d_starrocks
)
在工作流画布中,鼠标悬停至
ods_user_info_d_starrocks
节点上,单击打开节点。配置同步链路网络与资源。
参数
描述
参数
描述
数据来源
数据来源:MySQL
数据源名称:
user_behavior_analysis_mysql
我的资源组
选择在准备环境阶段新建的Serverless资源组。
数据去向
数据去向:StarRocks
数据源名称:
doc_starrocks_storage_compute_tightly_01
配置任务
配置数据来源与去向。
模块
配置项
配置内容
模块
配置项
配置内容
数据来源
表
选择MySQL表
ods_user_info_d
。切分键
建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。
此处配置切分键为
uid
字段。数据去向
表
选择StarRocks表
ods_user_info_d_starrocks
。导入前准备语句
本案例按
dt
字段动态分区,为避免节点重跑数据重复写入,通过以下SQL语句实现每次同步前删除已有目标分区。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE
其中${var}
为参数,后续在调度配置阶段为其赋值调度参数,以实现调度场景下的动态入参,详情请参见调度设置。StreamLoad请求参数
StreamLoad 的请求参数,需为JSON格式。
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
字段映射。
通过字段映射关系确定源端字段与目标端字段的写入关系,并通过变量赋值调度参数的方式,实现动态为StarRocks分区字段赋值,即每日数据写入StarRocks对应业务分区。
单击同名映射,源端MySQL字段将自动映射目标表同名字段,即源端字段数据将默认写入与源端字段同名的目标端字段。
单击添加一行,输入
'${var}'
,并手动设置该字段与StarRocks的dt字段进行映射。
通道控制。
本教程脏数据策略配置为不容忍脏数据,其他配置保持默认。更多信息,请参见通过向导模式配置离线同步任务。
配置调试参数。
在离线同步任务配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
配置项
配置说明
配置项
配置说明
资源组
选择准备环境阶段购买的Serverless资源组。
脚本参数
单击添加参数,配置为
var=yyyymmdd格式的具体常量
(例如var=20250223
)。在调试时,Data Studio将会使用此常量替换任务中的定义的变量。(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在节点编辑页面右侧单击调度配置。调度配置中参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
配置日志数据同步链路(ods_raw_log_d_starrocks)
在工作流画布中,鼠标悬停至
ods_raw_log_d_starrocks
节点上,单击打开节点。配置同步链路网络与资源。
在配置好数据来源、我的资源组、数据去向后,单击下一步,根据页面提示完成连通性测试。详细配置如下。
配置项
配置内容
配置项
配置内容
数据来源
数据来源:HttpFile
数据源名称:
user_behavior_analysis_HttpFile
我的资源组
选择在准备环境阶段新建的Serverless资源组。
数据去向
数据去向:StarRocks
数据源名称:
doc_starrocks_storage_compute_tightly_01
单击下一步,配置同步任务。
配置数据来源与去向。
模块
配置项
配置内容
模块
配置项
配置内容
数据来源
文件路径
/user_log.txt
文件类型
text
列分隔符
|
No
完成以上数据来源配置后,单击确认表数据结构。
数据去向
表
选择StarRocks表
ods_raw_log_d_starrocks
。导入前准备语句
本案例按
dt
字段动态分区,为避免节点重跑数据重复写入,通过以下SQL语句实现每次同步前删除已有目标分区。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE
其中${var}
为参数,后续在调度配置阶段为其赋值调度参数,以实现调度场景下的动态入参,详情请参见调度设置。StreamLoad请求参数
StreamLoad 的请求参数,需为JSON格式。
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
通道控制。
本教程脏数据策略配置为不容忍脏数据,其他配置保持默认。更多信息,请参见通过向导模式配置离线同步任务。
字段映射。
单击节点页面工具栏
按钮,将任务配置方式从向导模式转换为脚本模式,完成HttpFile数据源的字段映射与StarRocks动态分区字段dt的动态赋值。
来源HttpFile端Column配置新增:
{ "type": "STRING", "value": "${var}" }
ods_raw_log_d_starrocks
节点完整脚本示例:{ "type": "job", "version": "2.0", "steps": [ { "stepType": "httpfile", "parameter": { "fileName": "/user_log.txt", "nullFormat": "", "compress": "", "requestMethod": "GET", "connectTimeoutSeconds": 60, "column": [ { "index": 0, "type": "STRING" }, { "type": "STRING", "value": "${var}" } ], "skipHeader": "false", "encoding": "UTF-8", "fieldDelimiter": "|", "fieldDelimiterOrigin": "|", "socketTimeoutSeconds": 3600, "envType": 0, "datasource": "user_behavior_analysis", "bufferByteSizeInKB": 1024, "fileFormat": "text" }, "name": "Reader", "category": "reader" }, { "stepType": "starrocks", "parameter": { "loadProps": { "row_delimiter": "\\x02", "column_separator": "\\x01" }, "envType": 0, "datasource": "Doc_StarRocks_Storage_Compute_Tightly_01", "column": [ "col", "dt" ], "tableComment": "", "table": "ods_raw_log_d_starrocks", "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE ; " }, "name": "Writer", "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "category": "processor" } ], "setting": { "errorLimit": { "record": "0" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置调试参数。
在离线同步任务配置页面右侧单击调试配置,配置以下参数,用于在步骤四调试运行中使用调试配置的相关参数测试运行。
配置项
配置说明
配置项
配置说明
资源组
选择准备环境阶段购买的Serverless资源组。
脚本参数
单击添加参数,配置为
var=yyyymmdd格式的具体常量
(例如var=20250223
)。在调试时,Data Studio将会使用此常量替换任务中的定义的变量。(可选)配置调度属性。
本教程调度配置相关参数保持默认即可,您可以在节点编辑页面右侧单击调度配置。调度配置中参数的详细说明,详情可参见调度配置。
调度参数:本教程已在工作流调度参数中统一配置,工作流内部节点无需配置,在任务或代码中可直接使用。
调度策略:您可以在延时执行时间参数中指定子节点在工作流执行后,延迟多久再执行,本教程不设置。
在顶部工具栏单击保存,保存当前节点。
步骤四:运行任务
同步数据。
在工作流工具栏中,单击运行,设置各节点定义的参数变量在本次运行中的取值(本教程使用
20250223
,您可以按需修改),单击确定后,等待运行完成。查询结果。
进入SQL查询页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,单击进入数据分析页面,单击左侧导航栏的SQL查询进入SQL查询页面。
配置SQL查询文件。
单击我的文件后的
按钮新建文件,自定义SQL查询文件名。
单击已新建的文件,进入文件编辑页面。
在文件编辑页面单击右上角的
按钮,配置需进行SQL查询的工作空间等信息,配置详情如下:
配置项
说明
配置项
说明
工作空间
选择
user_profile_analysis_starrocks
工作流所在的工作空间。数据源类型
下拉选择
StarRocks
。数据源名称
选择在准备环境时绑定的Starrocks开发环境。
单击确认按钮,完成查询数据源的配置。
编辑查询SQL。
在确保该章节内的所有节点运行成功的情况下,编写以下SQL查询以检查StarRocks节点创建的外部表是否正常产出。
--查询语句中的分区列需要更新为业务日期。例如,任务运行的日期为20250223,则业务日期为20250222,即任务运行日期的前一天。 SELECT * FROM ods_raw_log_d_starrocks WHERE dt=业务日期; SELECT * FROM ods_user_info_d_starrocks WHERE dt=业务日期;
后续步骤
现在,您已经学习了如何进行数据同步,并已完成数据的同步,您可以继续下一个教程。在该教程中,您将学习如何对同步的数据进行计算与分析。详情请参见加工数据。
- 本页导读 (1)
- 前提条件
- 步骤一:新建数据源
- 新建MySQL数据源(user_behavior_analysis_mysql)
- 新建HttpFile数据源(user_behavior_analysis_httpfile)
- 步骤二:搭建同步链路
- 步骤三:配置同步任务
- 配置初始节点
- 创建用户表ddl_ods_user_info_d_starrocks
- 创建日志表ddl_ods_raw_log_d_starrocks
- 配置用户数据同步链路(ods_user_info_d_starrocks)
- 配置日志数据同步链路(ods_raw_log_d_starrocks)
- 步骤四:运行任务
- 后续步骤