本文利用DataWorks赋值节点与for-each节点的特性,实现将MaxCompute中以年月日和地域划分的二级分区数据,批量同步到以二级分区内容为特征的MySQL表中,本文以「年月日_地域」为后缀特征介绍。
场景介绍
在生产过程中,MaxCompute数据仓库每天会产生大量数据,这些数据通常按照年月日和地域进行分区拆分,并同步至带有年月日及地域后缀的MySQL表中,以便进行业务处理。由于DataWorks的标准离线同步任务无法直接实现这一需求,我们可以通过创建赋值节点来获取每日的地域二级分区信息,并利用for-each节点的循环特性,将这些二级分区数据作为参数传递给离线同步脚本。离线同步脚本会根据获取的分区参数信息,将数据同步到相应命名的MySQL表中。
前提条件
已在DataWorks工作空间的数据源管理页面,新增MaxCompute来源数据源。
说明本文使用的测试数据,请参见MaxCompute示例数据。
数据源支持的数据同步能力,请参见MaxCompute数据源。
已在DataWorks工作空间的数据源管理页面,新增MySQL去向数据源。
说明数据源支持的数据同步能力,请参见MySQL数据源。
已在MySQL数据源中执行以
_年月日_地域
为后缀的建表语句,请参见MySQL建表语句。已购买Serverless资源组,并为资源组绑定工作空间、完成网络连通配置。
说明本文仅支持Serverless资源组。
购买与配置操作,请参见新增和使用Serverless资源组。
实现原理
业务实现原理,如下图所示:
使用赋值节点来获取MaxCompute表中当前日期的所有二级分区信息。
利用for-each节点的循环遍历特性,将从赋值节点获取的分区信息作为参数,循环传递给循环体内的离线同步节点。
通过离线同步节点脚本内的动态参数配置,实现从MaxCompute二级分区到MySQL特定表的数据迁移。
操作步骤
步骤一:创建赋值节点
进入数据开发页面,在下拉框中选择对应工作空间后单击进入数据开发。
新建赋值节点。
右键单击目标业务流程,选择
。在新建节点对话框中,输入名称,并选择节点类型及路径。单击确认,进入赋值节点编辑页面。
配置调度信息。
单击赋值节点编辑页面右侧的调度配置,进入调度配置页面。
在调度参数的参数名中输入
dt_time
,在参数值内点击下拉选择今天$[yyyymmdd]
,将当前时间年月日赋值给参数dt_time
。在调度依赖的依赖的上游节点对话框内点击勾选使用工作空间根节点为节点配置上游依赖。
开发赋值节点任务。
赋值节点支持ODPS SQL、SHELL和Python三种语言编写开发任务,本文采用ODPS SQL语言进行示例说明。
SELECT region FROM sales_data WHERE dt = ${dt_time} GROUP BY region;
说明使用动态参数
dt_time
获取每日年月日信息,可完成对每日全部地域分区的获取。该节点输出参数默认为右侧调度配置中的本节点输出参数
outputs
,节点输出值会自动被DataWorks捕获,传递给下游for-each节点。如果您配置了多个MaxCompute数据源,则需在节点上方的MaxCompute引擎实例对话框中选择所需执行的数据源实例。
保存提交节点任务。
单击工具栏中的图标,保存编写的SQL语句。
单击工具栏中的图标,提交节点任务。
说明如您创建的工作空间为标准模式,则需单击节点上方发布按钮进行任务发布。
步骤二:创建for-each节点
新建for-each节点。
右键单击目标业务流程,选择
。在新建节点对话框中,输入名称,并选择节点类型及路径。单击确认,进入for-each节点编辑页面。
配置调度信息。
单击赋值节点编辑页面右侧的调度配置,进入调度配置页面。
在调度依赖的依赖的上游节点下拉选择节点名称,在输入框中输入步骤一中创建的赋值节点名称,选择后缀名为
_out
的赋值节点,单击添加按钮。单击打开节点上下文参数内容,在本节点输入参数中找到参数名为
loopDataArray
的内容,单击内容右侧编辑按钮。单击选择取值来源信息并进行保存。
系统支持的最大循环次数为128次。如果您实际业务情况循环次数大于128次,您可在页面提示框进行配置修改。
步骤三:创建离线同步节点
新建离线同步节点。
单击for-each节点编辑页面左侧的
节点。在新建节点对话框中,输入节点名称,单击新建。
网络与资源配置。
选择离线同步节点,右键单击打开节点,进入网络与资源配置页面。
配置来源数据源,数据来源选择
MaxCompute(ODPS)
,在数据源名称下拉选择您所需同步的MaxCompute数据源。在新建独享数据集成资源组模块选择您所创建的资源组。
配置去向数据源,数据去向选择
MySQL
,在数据源名称下拉选择您所需接收的MySQL数据源。
配置数据来源与去向。
点击下一步进入配置数据来源与去向页面。
在数据来源中配置schema和表名信息,在schema下拉框选择您所创建的schema,在表名下拉框选择示例表名
sales_data
。在数据去向中配置表名信息,在表名下拉框选择示例表名
prefix_20240913_beijing
。
配置动态分区参数。
找到配置数据来源与去向页面上方的转换脚本按钮,单击提示框的确定按钮。
找到参数
partition
,将里面的参数值dt=${bizdate},region=
信息更换为动态分区dt=${offline_time},region=${dag.foreach.current}
。说明在for-each节点内,可以通过参数
${dag.foreach.current}
获取当前遍历值。找到MySQL表名,将里面的表名
prefix_20240912_shanghai
信息更换为动态表名prefix_${offline_time}_${dag.foreach.current}
。在右侧的调度配置对话框中,设置调度参数
offline_time
,在其参数值内通过下拉菜单选择今天$[yyyymmdd]
,从而将当前日期的年月日赋值给参数offline_time
。在调度依赖的依赖的上游节点的节点输出对话框中输入
out
,选择节点名称:start
的节点进行添加。
单击工具栏中的图标,保存离线同步任务。
步骤四:保存提交for-each循环流程
进入步骤二中创建的for-each节点页面。
如下图所示,配置连接for-each节点与离线同步节点。
单击工具栏中的图标,保存循环流程。
单击工具栏中的图标,在提交页面勾选全部节点信息并点击确定按钮进行提交。
如您创建的工作空间为标准模式,则需单击节点上方发布按钮进行任务发布。
步骤五:执行节点任务
单击for-each节点页面左上角的运维,进入运维中心,并在左侧任务运维导航栏中选择
。找到您所创建的赋值节点,单击
进入补数据页面。勾选选择下游任务的任务信息,点击提交并跳转按钮执行该任务。说明您可以在跳转页面查看对应任务的运行状况。
结果验证
任务执行成功后,您可以在MySQL数据库中检查prefix_20240913_beijing
、prefix_20240913_shanghai
、prefix_20240913_hangzhou
这三个表的数据,确认它们与MaxCompute表当前日对应的二级分区数据是否一致。
MaxCompute表数据:
您可根据以下示例SQL语句查询出相应结果。
SELECT * FROM sales_data where dt = '20240913';
MySQL表数据:
您可根据以下示例SQL语句查询出相应结果。
SELECT * FROM `prefix_20240913_beijing`;
SELECT * FROM `prefix_20240913_shanghai`;
SELECT * FROM `prefix_20240913_hangzhou`;
示例数据
MaxCompute示例数据
-- 创建分区表
-- dt为一级年月日分区,region为二级地域分区。
CREATE TABLE IF NOT EXISTS sales_data (
id BIGINT,
product_name STRING,
quantity INT,
price DECIMAL(10,2)
)
PARTITIONED BY (dt STRING, region STRING);
-- 插入测试数据
INSERT INTO TABLE sales_data PARTITION (dt='20240913', region='beijing')
VALUES (1, 'phone', 10, 99.99);
INSERT INTO TABLE sales_data PARTITION (dt='20240913', region='shanghai')
VALUES (2, 'book', 5, 70.00);
INSERT INTO TABLE sales_data PARTITION (dt='20240913', region='hangzhou')
VALUES (3, 'book', 5, 70.00);
MySQL建表语句
-- MySql 建表
-- 表名格式prefix_年月日_地域
CREATE TABLE IF NOT EXISTS prefix_20240913_beijing (
id BIGINT,
product_name varchar(100),
quantity INT,
price DECIMAL(10,2)
);
CREATE TABLE IF NOT EXISTS prefix_20240913_shanghai (
id BIGINT,
product_name varchar(100),
quantity INT,
price DECIMAL(10,2)
);
CREATE TABLE IF NOT EXISTS prefix_20240913_hangzhou (
id BIGINT,
product_name varchar(100),
quantity INT,
price DECIMAL(10,2)
);
在创建MySQL数据表时,需确保其按照prefix_年月日_地域
的命名规则,并且数量与MaxCompute表当前日的二级分区数量一致,否则同步过程会报错。