本教程以MySQL中的用户基本信息ods_user_info_d
表和OSS中的网站访问日志数据user_log.txt
文件为例,通过数据集成离线同步任务分别同步至StarRocks的ods_user_info_d_starrocks
、ods_raw_log_d_starrocks
表。旨在介绍如何通过DataWorks数据集成实现异构数据源间的数据同步,完成数仓数据同步操作。
前提条件
在进行数据同步之前,请确保已准备好所需的工作环境。具体操作步骤请参见准备环境。
章节目标
将案例提供的公共数据源中的数据同步至StarRocks,完成业务流程设计中的数据同步部分的内容。
源端数据源类型 | 源端待同步数据 | 源端表结构 | 目标端数据源类型 | 接收源端数据的目标表 | 目标表结构 |
MySQL | 表:ods_user_info_d 用户基本信息数据 |
| StarRocks |
|
|
HttpFile | 文件:user_log.txt 用户网站访问日志数据 | 一行为一条用户访问记录
| StarRocks |
|
|
进入数据开发
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入数据开发。
操作步骤
步骤一:设计业务流程
设计业务流程
新建业务流程。
数据开发需基于业务流程使用对应的开发组件进行具体开发操作。在创建节点之前,您需要先新建业务流程。具体操作方法可参见创建业务流程。
该业务流程的命名为:用户画像分析_StarRocks。
设计业务流程
业务流程新建完成后,将自动展开该业务流程画布。请根据业务流程设计,在业务流程画布中单击新建节点,通过将节点组件拖拽至业务流程画布,并通过拉线设置节点上下游依赖的方式,设计数据同步阶段的业务流程。
在本教程中,由于虚拟节点和同步节点之间并无血缘关系,因此我们通过业务流程拉线的方式来设置节点的依赖关系。有关更多依赖关系设置方式的详细信息,详情请参见调度依赖配置指引。以下为各个节点的类型、命名以及作用的介绍。
节点分类
节点类型
节点命名
(以最终产出表命名)
节点作用
通用
虚拟节点
workshop_start_starrocks
用于统筹管理整个用户画像分析业务流程,例如业务流程起调时间。当空间业务流程较复杂时,可使数据流转路径更清晰。该节点为空跑任务,无须编辑代码。
数据库
StarRocks
ddl_ods_user_info_d_starrocks
同步任务前创建,用于接收源端MySQL存储的用户基本信息数据的StarRocks表
ods_user_info_d_starrocks
。数据库
StarRocks
ddl_ods_raw_log_d_starrocks
同步任务前创建,用于接收源端OSS存储的用户网站访问记录的StarRocks表
ods_raw_log_d_starrocks
。数据集成
离线同步
ods_user_info_d_starrocks
用于将存储于MySQL的用户基本信息数据同步至StarRocks表
ods_user_info_d_starrocks
。数据集成
离线同步
ods_raw_log_d_starrocks
用于将存储于OSS的用户网站访问记录同步至StarRocks表
ods_raw_log_d_starrocks
。
配置调度逻辑
本案例通过虚拟节点workshop_start_starrocks
控制整个业务流程每天00:30调度执行,以下为虚拟节点关键调度配置,其他节点调度无须变更,实现逻辑详情请参见:场景:如何配置业务流程定时时间。其他调度配置相关说明,请参见:任务调度属性配置概述。
调度配置 | 图片展示 | 说明 |
调度时间配置 | 虚拟节点配置调度时间为00:30,该虚拟节点会在每日00:30调起当前业务流程并执行。 | |
调度依赖配置 | 由于虚拟节点 |
DataWorks中的所有节点都需要依赖于上游节点,数据同步阶段的所有任务都以虚拟节点workshop_start_starrocks
为依赖。换句话说,通过workshop_start_starrocks
节点来触发数据同步业务流程的执行。
步骤二:搭建同步链路
创建目标StarRocks表
在进行同步操作之前,需要提前创建目标StarRocks表,以便存储后续同步过来的原始数据。
本案例Starrocks表基于源端表结构生成,具体请参见章节目标。在业务流程面板,双击数据库ddl_ods_user_info_d_starrocks
节点和数据库ddl_ods_raw_log_d_starrocks
节点,进入节点编辑页,分别填入对应的StarRocks建表命令,并单击保存。
ddl_ods_user_info_d_starrocks
CREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks ( uid STRING COMMENT '用户ID', gender STRING COMMENT '性别', age_range STRING COMMENT '年龄段', zodiac STRING COMMENT '星座', dt STRING not null COMMENT '时间' ) DUPLICATE KEY(uid) COMMENT '用户行为分析案例-用户基本信息表' PARTITION BY(dt) PROPERTIES("replication_num" = "1");
ddl_ods_raw_info_d_starrocks
CREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks ( col STRING COMMENT '日志', dt DATE not null COMMENT '时间' ) DUPLICATE KEY(col) COMMENT '用户行为分析案例-网站访问日志原始数据表' PARTITION BY(dt) PROPERTIES ("replication_num" = "1");
配置用户数据同步链路
在业务流程面板,双击离线同步ods_user_info_d_starrocks
节点,进入ods_user_info_d_starrocks
节点的配置面板,完成用户基本信息数据从案例提供的MySQL表ods_user_info_d
同步至StarRocks表ods_user_info_d_starrocks
的同步链路配置操作。
网络与资源配置。
在配置好数据来源、我的资源组、数据去向后,单击下一步,根据页面提示完成连通性测试。详细配置如下。
配置项
配置内容
数据来源
数据来源:MySQL
数据源名称:
user_behavior_analysis_mysql
我的资源组
选择在准备环境阶段新建的Serverless资源组。
数据去向
数据去向:StarRocks
数据源名称:
Doc_StarRocks_Storage_Compute_Tightly_01
配置任务。
配置数据来源与去向。
模块
配置项
配置内容
数据来源
表
选择MySQL表
ods_user_info_d
。切分键
建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。
此处配置切分键为
uid
字段。数据去向
表
选择StarRocks表
ods_user_info_d_starrocks
。导入前准备语句
本案例按
dt
字段动态分区,为避免节点重跑数据重复写入,通过以下SQL语句实现每次同步前删除已有目标分区。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE
其中${var}
为参数,后续在调度配置阶段为其赋值调度参数,以实现调度场景下的动态入参,详情请参见调度设置。StreamLoad请求参数
StreamLoad 的请求参数,需为JSON格式。
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
字段映射。
通过字段映射关系确定源端字段与目标端字段的写入关系,并通过变量赋值调度参数的方式,实现动态为StarRocks分区字段赋值,即每日数据写入StarRocks对应业务分区。
单击同名映射,源端MySQL字段将自动映射目标表同名字段,即源端字段数据将默认写入与源端字段同名的目标端字段。
单击添加一行,输入
'${var}'
,并手动设置该字段与StarRocks的dt字段进行映射。调度配置。
配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息,详情可参见任务调度配置。以下为配置的内容。
配置项
配置内容
图示
调度参数
在调度参数项中单击新增参数,添加:
参数名:var
参数值:$[yyyymmdd-1]
调度依赖
在调度依赖确认产出表已作为本节点输出。
格式为
worksspacename.tablename
配置用户日志同步链路
在业务流程面板,双击离线同步ods_raw_log_d_starrocks
节点,进入ods_raw_log_d_starrocks
节点的配置面板,完成用户网站访问信息数据从平台提供的公共数据源HttpFile文件user_log.txt
同步到StarRocks表ods_raw_log_d_starrocks
的同步链路配置操作。
网络与资源配置。
在配置好数据来源、我的资源组和数据去向后,请单击下一步,根据页面提示完成连通性测试。详细配置如下。
配置项
配置内容
数据来源
数据来源:HttpFile
数据源名称:
user_behavior_analysis_HttpFile
我的资源组
选择在准备环境阶段购买的Serverless资源组。
数据去向
数据去向:StarRocks
数据源名称:
Doc_StarRocks_Storage_Compute_Tightly_01
任务配置。
配置数据来源与去向
模块
配置项
配置内容
数据来源
文件路径
/user_log.txt
文件类型
text
列分隔符
|
No
完成以上数据来源配置后,单击确认表数据结构。
数据去向
表
ods_raw_log_d_starrocks
导入前准备语句
本案例按
dt
字段动态分区,为避免节点重跑数据重复写入,通过以下SQL语句实现每次同步前删除已有目标分区。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE
其中
${var}
为变量参数,后续在调度配置阶段为其赋值调度参数,以实现调度场景下的动态入参。StreamLoad请求参数
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
字段映射。
单击节点页面工具栏,将任务配置方式从向导模式转换为脚本模式,完成HttpFile数据源的字段映射与StarRocks动态分区字段dt的动态赋值。
来源HttpFile端Column配置新增:
{ "type": "STRING", "value": "${var}" }
ods_raw_log_d_starrocks
节点完整脚本示例:{ "type": "job", "version": "2.0", "steps": [ { "stepType": "httpfile", "parameter": { "fileName": "/user_log.txt", "nullFormat": "", "compress": "", "requestMethod": "GET", "connectTimeoutSeconds": 60, "column": [ { "index": 0, "type": "STRING" }, { "type": "STRING", "value": "${var}" } ], "skipHeader": "false", "encoding": "UTF-8", "fieldDelimiter": "|", "fieldDelimiterOrigin": "|", "socketTimeoutSeconds": 3600, "envType": 0, "datasource": "user_behavior_analysis", "bufferByteSizeInKB": 1024, "fileFormat": "text" }, "name": "Reader", "category": "reader" }, { "stepType": "starrocks", "parameter": { "loadProps": { "row_delimiter": "\\x02", "column_separator": "\\x01" }, "envType": 0, "datasource": "Doc_StarRocks_Storage_Compute_Tightly_01", "column": [ "col", "dt" ], "tableComment": "", "table": "ods_raw_log_d_starrocks", "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE ; " }, "name": "Writer", "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "category": "processor" } ], "setting": { "errorLimit": { "record": "0" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
调度设置。
配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息。以下为配置的内容。
配置项
配置内容
图示
调度参数
在调度参数项中单击新增参数,添加:
参数名:var
参数值:$[yyyymmdd-1]
调度依赖
在调度依赖确认产出表已作为本节点输出。
格式为
worksspacename.tablename
步骤三:验证同步数据
运行业务流程
进入业务流程面板。
双击业务流程下的用户画像分析_Starrocks版,即可进入该业务流程画布。
运行业务流程。
在业务流程画布,单击工具栏中的图标,将按照上下游依赖关系运行数据集成阶段的业务流程。
查看任务运行状态。
节点处于状态,即代表同步执行过程无问题。
查看任务执行日志。
右键画布中的
ods_user_info_d_starrocks
节点、ods_raw_log_d_starrocks
节点,选择查看日志,即可查看详细的同步过程。
查看同步结果
新建临时查询文件。
在数据开发页面的左侧导航栏,单击,进入临时查询面板。右键单击临时查询,选择。
查询同步结果表。
--查询语句中的分区列需要更新为业务日期。例如,任务运行的日期为20240102,则业务日期为20240101,即任务运行日期的前一天。 SELECT * from ods_raw_log_d_starrocks where dt=业务日期; SELECT * from ods_user_info_d_starrocks where dt=业务日期;
后续步骤
现在,您已经完成了同步数据,您可以继续下一个教程。在下一个教程中,您将学习将用户基本信息数据、用户网站访问日志数据在StarRocks中进行加工处理。详情请参见加工数据。