同步数据

本教程以MySQL中的用户基本信息(ods_user_info_d)表及OSS中的网站访问日志数据(user_log.txt)文件,通过数据集成离线同步任务分别同步至MaxCompute的ods_user_info_d、ods_raw_log_d表为例,为您介绍如何通过DataWorks数据集成实现异构数据源间的数据同步,完成数仓数据同步操作。

前提条件

已为您准备该实验所需的用户信息数据和用户网站访问记录数据,并分别存放于平台提供的RDS MySQL及OSS上,您可直接在DataWorks注册使用。无需单独开通RDS服务与OSS服务,无需单独准备测试数据,但需自行创建DataWorks工作空间,并为工作空间绑定MaxCompute数据源。

  • 创建DataWorks工作空间

    本教程以标准模式工作空间为例进行说明,创建的工作空间名称为WorkShop2024_01,您也可以自定义该名称。

  • 创建MaxCompute数据源

    本教程中创建的MaxCompute数据源的名称为odps_first,创建数据源时生产环境使用的MaxCompute项目名称为workshop2024_01,开发环境使用的MaxCompute项目名称为workshop2024_01_dev

  • (可选)如果您使用RAM账号操作本实践教程,您需要确保操作的RAM账号拥有AliyunBSSOrderAccessAliyunDataWorksFullAccess权限,授权操作详情请参见为RAM用户授权

快速体验

本案例中,数据同步和数据加工的部分任务可以通过ETL工作流模板一键导入。在导入模板后,您可以前往目标空间,并自行完成后续的数据质量监控和数据可视化操作。

背景信息

数据集成是稳定高效、弹性伸缩的数据同步平台,致力于提供复杂网络环境下丰富的异构数据源之间高速稳定的数据移动及同步能力,提供离线同步、增量同步、全增量数据实时同步等多种同步方案。

本教程采用离线同步方案,DataWorks将数据集成离线同步能力封装为离线同步节点,一个离线同步节点代表一个同步任务,节点内通过数据来源与数据去向定义数据源之间的数据传输,通过字段映射的方式定义源端字段与目标端字段的数据读取与写入关系。

重要
  • 平台已提供本教程所需的测试数据及数据源,您需将该数据源添加至您的工作空间,即可在工作空间访问平台提供的测试数据。

  • 本教程提供数据仅作为阿里云大数据开发治理平台DataWorks数据应用实操使用,所有数据均为人工Mock数据,并且只支持在数据集成模块读取数据。

章节目标

将MySQL存储的用户基本信息及OSS存储的网站访问日志数据,通过数据集成服务同步至MaxCompute,具体数据写入情况如下:

源端数据源

目标数据源(MaxCompute)

MySQL

表:ods_user_info_d

  • uid 用户名

  • gender 性别

  • age_range 年龄分段

  • zodiac 星座

表:ods_user_info_d

  • uid string

  • gender string

  • age_range string

  • zodiac string

  • 分区字段:dt

  • 生命周期:7天

OSS

文件:user_log.txt

$remote_addr - $remote_user [$time_local] "$request" $status 
$body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

表:ods_raw_log_d

  • col string

  • 分区字段:dt

  • 生命周期:7天

步骤一:购买并配置Serverless资源组

本教程需将存储在OSS、MySQL中的数据同步至MaxCompute,同步任务需使用DataWorks的Serverless资源组,因此您需要先购买Serverless资源组,并完成前期的准备工作。

  1. 购买Serverless资源组。

    1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的资源组,进入资源组列表页面。

    2. 单击新建资源组,在资源组购买页面,选择地域和可用区华东2(上海)、设置资源组名称,其他参数可根据界面提示进行配置,完成后根据界面提示完成付款。Serverless资源组的计费说明请参见Serverless资源组计费

      说明

      本教程将使用华东2(上海)地域的Serverless资源组进行示例演示,需注意Serverless资源组不支持跨地域操作。

  2. 配置Serverless资源组。

    1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的资源组,进入资源组列表页面。

    2. 找到购买的Serverless资源组,单击操作列的修改归属工作空间,根据界面提示将资源组绑定至已创建的DataWorks工作空间。

    3. 为资源组配置公网访问能力。

      1. 登录专有网络-公网NAT网关控制台,在顶部菜单栏切换至华东2(上海)地域。

      2. 单击创建NAT网关。配置相关参数。

        参数

        取值

        所属地域

        华东2(上海)。

        所属专有网络

        选择资源组绑定的VPC和交换机。

        您可以前往DataWorks管理控制台,切换地域后,在左侧导航栏单击资源组列表,找到已创建的资源组,然后单击操作列的网络设置,在数据调度 & 数据集成区域查看绑定的专有网络交换机。VPC和交换机的更多信息,请参见什么是专有网络

        关联交换机

        访问模式

        VPC全通模式(SNAT)。

        弹性公网IP

        新购弹性公网IP。

        关联角色创建

        首次创建NAT网关时,需要创建服务关联角色,请单击创建关联角色

        说明

        上表中未说明的参数保持默认值即可。

      3. 单击立即购买,勾选服务协议后,单击确认订单,完成购买。

更多新增和使用Serverless资源组的操作指导请参见新增和使用Serverless资源组

步骤二:创建数据源

本教程需要分别在您当前空间创建名为user_behavior_analysis_httpfile的HttpFile数据源、名为user_behavior_analysis_mysql的MySQL数据源用于访问平台提供的测试数据,测试使用的数据源基本信息已提供。

说明
  • 在数据集成同步任务配置前,您可以在DataWorks的数据源页面,配置好您需要同步的源端和目标端数据库或数据仓库的相关信息,以便在同步任务配置过程中,可通过选择数据源名称来控制同步读取和写入的数据库或数据仓库。

  • 本教程提供数据仅作为阿里云大数据开发治理平台DataWorks数据应用实操使用,所有数据均为人工Mock数据,并且只支持在数据集成模块读取数据。

创建HttpFile数据源(user_behavior_analysis_httpfile)

在您的工作空间中新建HttpFile数据源,用于后续读取平台提供的存放于OSS中的用户网站访问测试数据,并测试该数据源与同步数据的资源组网络是否连通。

  1. 进入数据源页面。

    1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心

    2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。

  2. 新建HttpFile数据源。

    1. 管理中心页面,单击进入数据源 > 数据源列表页面后单击新增数据源

    2. 新增数据源对话框中,搜索选择数据源类型为HttpFile

    3. 创建HttpFile数据源对话框,配置各项参数,主要参数配置描述如下:

      主要参数

      描述

      数据源名称

      输入数据源名称,该名称为数据源在您工作空间中的标识,本案例中设置数据源名称为user_behavior_analysis_httpfile

      数据源描述

      输入DataWorks案例体验专用数据源,在离线同步配置时读取该数据源即可访问平台提供的测试数据,该数据源只支持数据集成场景去读取,其他模块不支持使用。

      适用环境

      勾选开发环境、生产环境

      说明

      您需要同步创建开发环境生产环境的数据源,否则任务生产执行会报错。

      URL域名

      输入https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com

      资源组连通性

      连接配置区域的Serverless资源组后,单击连通状态列的测试连通性,分别测试资源组与开发、生产环境数据源的网络是否连通,等待界面提示测试完成,连通状态为可连通

创建MySQL数据源(user_behavior_analysis_mysql

在您的工作空间中新建MySQL数据源,用于后续读取平台提供的存放于MySQL中的用户信息数据,并测试该数据源与同步数据的资源组网络是否连通。

  1. 进入数据源页面。

    1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心

    2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。

  2. 新建MySQL数据源。

    1. 管理中心页面,单击进入数据源 > 数据源列表页面后单击新增数据源

    2. 新增数据源对话框中,搜索选择数据源类型为MySQL

    3. 创建MySQL数据源对话框,配置各项参数。

      image

      参数

      描述

      数据源类型

      选择连接串模式

      数据源名称

      请输入user_behavior_analysis_mysql

      数据源描述

      输入DataWorks案例体验专用数据源,在离线同步配置时读取该数据源即可访问平台提供的测试数据,该数据源只支持数据集成场景读取,其他模块不支持使用。

      适用环境

      勾选开发、生产

      说明

      您需要同步创建开发环境生产环境的数据源,否则任务生产执行会报错。

      JDBC URL

      主机地址IP

      rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com

      端口号

      3306

      数据库名称

      workshop

      用户名

      workshop

      密码

      workshop#2017

      认证选项

      无认证。

      资源组连通性

      连接配置区域的Serverless资源组后,单击连通状态列的测试连通性,分别测试资源组与开发、生产环境数据源的网络是否连通,等待界面提示测试完成,连通状态为可连通

步骤三:新建业务流程

根据需求分析设计业务流程。新建两个离线同步节点ods_raw_log_dods_user_info_d,用于后续同步MySQL用户数据与OSS网站访问日志数据,再新建虚拟节点workshop_start统一管理该业务流程。本阶段仅涉及数据同步业务流程,不涉及具体同步任务配置。

1、新建业务流程

DataWorks默认预设了业务流程workflow,您可跳过该步骤直接使用该默认业务流程。

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 新建业务流程。

    数据开发面板,右键单击业务流程,选择新建业务流程。并根据业务需要定义业务流程名称,本案例业务流程命名为WorkShop

2、设计业务流程

  1. 进入业务流程面板。

    双击上一步创建的业务流程名,进入业务流程开发面板。

  2. 新建节点。

    通过业务流程拖拽组件的方式,实现业务流程设计。单击新建节点,找到需要创建的节点类型并拖拽至右侧的业务流程画布。

    本案例需新建1个虚拟节点workshop_start,两个离线同步节点:ods_raw_log_d(用于同步网站访问日志数据)和ods_user_info_d(用于同步用户基本信息数据)。

  3. 设置节点依赖。

    workshop_start节点设置为两个离线同步节点的上游节点。本教程中,由于虚拟节点和同步节点无血缘关系,所以此处通过业务流程拉线设置虚拟节点workshop_start与OSS数据同步节点ods_raw_log_d、MySQL数据同步节点ods_user_info_d间的依赖关系,更多依赖关系设置方式,详情请参见调度依赖配置指引

    image.png

步骤四:新建MaxCompute表

提前新建用于接收通过数据集成同步过来的原始数据的MaxCompute表。本教程仅快速创建相关表,更多MaxCompute表相关操作,请参见创建并使用MaxCompute表

  1. 新建表入口。

    image.png

  2. 新建ods_raw_log_d表。

    在新建表弹窗中输入名称ods_raw_log_d,在表的编辑页面单击DDL,输入下述建表语句后,单击生成表结构,并确认覆盖当前操作。

    CREATE TABLE IF NOT EXISTS ods_raw_log_d
    (
     col STRING
    ) 
    PARTITIONED BY
    (
     dt STRING
    )
    LIFECYCLE 7;
  3. 新建ods_user_info_d表。

    在新建表弹窗中输入名称ods_user_info_d,在表的编辑页面单击DDL,输入下述建表语句后,单击生成表结构,并确认覆盖当前操作。

    CREATE TABLE IF NOT EXISTS ods_user_info_d (
     uid STRING COMMENT '用户ID',
     gender STRING COMMENT '性别',
     age_range STRING COMMENT '年龄段',
     zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
     dt STRING
    )
    LIFECYCLE 7;
  4. 提交并发布表。

    表信息确认无误后,将ods_user_info_d表和ods_raw_log_d表分别单击提交到开发环境提交到生产环境此操作将根据您的节点配置在开发环境与生产环境对应计算引擎项目分别创建目标引擎物理表。

    说明

    表结构定义完成后,您需将其提交至开发环境与生产环境,提交成功后才可在对应环境的引擎项目查看该表。

    • 提交表至DataWorks的开发环境,即在开发环境所绑定的MaxCompute引擎中创建当前表。

    • 提交表至DataWorks的生产环境,即在生产环境所绑定的MaxCompute引擎中创建当前表。

步骤五:配置虚拟节点(workshop_start)

本教程通过workshop_start节点实现每天00:15统一调起用户画像分析业务流程执行,workshop_start属于管控类节点,不需要配置节点代码,节点调度配置如下。

  1. 进入workshop_start编辑页面。

    数据开发页面,双击相应的业务流程名,进入业务流程开发面板。双击虚拟节点workshop_start。进入该节点的编辑页面,单击右侧的调度配置

  2. 配置workshop_start调度属性。

    通过以下配置实现调度场景下,每日00:15,由工作空间根节点触发当前节点调度。

    • 配置调度时间:配置调度周期定时调度时间00:15

    • 配置重跑属性:配置为运行成功或失败后皆可重跑

    • 配置调度依赖:配置依赖的上游节点为使用工作空间根节点

      本教程中workshop_start作为流程管控节点,无其他强上游依赖,所以本教程直接依赖工作空间自带的根节点。即由工作空间根节点直接调起当前用户画像分析业务流程执行。

      image

      说明

      工作空间根节点为空间创建后默认生成节点,一般情况下,空间下所有任务均会挂在该节点下,工作空间根节点默认从0点开始调起挂在其下游的所有一级子节点,且该节点不支持变更。

步骤六:配置OSS数据同步节点(ods_raw_log_d)

此步骤中,通过配置ods_raw_log_d节点,实现将存储于OSS中的用户访问日志user_log.txt,通过数据集成同步任务写入MaxCompute的ods_raw_log_d表中。

在数据开发页面,双击相应业务流程下的ods_raw_log_d节点,进入该节点的编辑页面开始配置同步任务。

1、配置同步网络链接

此步骤中,使用Serverless资源组测试与源端HttpFile数据源user_behavior_analysis_httpfile,以及目标端MaxCompute数据源的网络连通。

image

  • 数据来源:数据来源选择HttpFile,数据源名称选择user_behavior_analysis_httpfile,即步骤二:创建数据源中新建的HttpFile数据源。

  • 我的资源组:选择购买的Serverless资源组

  • 数据去向:数据去向选择MaxCompute,然后选择对应数据源名称。

2、配置同步任务

  1. 同步任务基本配置。

    • 数据来源端指定要读取的文件user_log.txt

    • 数据去向端指定OSS文件写入MaxCompute数据源中的ods_raw_log_d表。并指定表分区信息分区信息通过${}格式定义了名为bizdate的变量,该变量将在后续步骤3中为其赋值。

      image

  2. 确认字段映射及通用配置。

    DataWorks通过配置源端与目标端字段映射关系,实现源端指定字段数据写入目标端指定字段,同时提供并发设置并发读写数据、提供限速功能避免同步对数据库造成影响、提供脏数据影响定义及分布式执行任务等功能。本教程使用默认配置。关于其他配置项说明,详情请参见通过向导模式配置离线同步任务

3、配置调度属性

通过以下配置实现调度场景下,每日00:15将存储在OSS的user_log.txt文件数据通过数据集成同步至MaxCompute的ods_raw_log_d表对应时间分区。

  • 配置调度参数bizdate=$bizdate,获取前一天的日期,格式为yyyymmdd

  • 配置调度时间:配置调度周期;无需单独配置当前节点定时调度时间,当前节点每日起调时间由业务流程根节点workshop_start的定时调度时间控制,即每日00:15后才会调度。

    image

  • 配置调度依赖

    • 确认依赖的上游节点:确认当前节点依赖的上游节点是否展示workshop_start节点,拉线设置的上游依赖将展示在此处。若未显示workshop_start节点,请确认是否已参照2、设计业务流程完成业务数据同步阶段的业务流程设计。

      本案例中,workshop_start节点定时时间到且执行完成后,将会触发当前节点执行。

    • 确认本节点输出:确认是否存在MaxCompute生产项目名称.ods_raw_log_d的节点输出,您可以进入管理中心查看MaxCompute生产项目名称

      说明

      DataWorks通过节点输出挂载节点依赖关系,为了方便下游SQL任务对同步任务产出表加工时,可通过自动解析机制基于表血缘快速为下游SQL节点添加当前同步任务的依赖,此处需要确认是否存在与同步产出表ods_raw_log_d同名的节点输出

      image

步骤七:配置MySQL数据同步节点(ods_user_info_d)

此步骤中,通过配置ods_user_info_d节点,实现将存储于MySQL中的用户信息ods_user_info_d表数据同步至MaxCompute的ods_user_info_d表中。

在数据开发页面,双击WorkShop业务流程下的ods_user_info_d节点,进入该节点的编辑页面开始配置同步任务。

1、配置同步网络链接

此步骤中,使用Serverless资源组测试与源端MySQL数据源user_behavior_analysis_mysql,以及目标端MaxCompute数据源的网络连通。

  • 数据来源:数据来源选择MySQL,数据源名称选择user_behavior_analysis_mysql

  • 我的资源组:选择购买的Serverless资源组

  • 数据去向:数据去向选择MaxCompute,然后选择对应数据源名称。

2、配置同步任务

  1. 同步任务基本配置。

    • 数据来源端指定要读取的MySQL数据源中的ods_user_info_d表。

    • 数据去向端指定写入MaxCompute数据源中的ods_user_info_d表,并配置表分区信息。分区信息通过${}格式定义了名为bizdate的变量,该变量将在后续步骤3中为其赋值。

      说明

      本案例默认全量读取MySQL表数据写入MaxCompute表指定日期分区。

      image

  2. 确认字段映射及通用配置。

    DataWorks通过配置源端与目标端字段映射关系,实现源端指定字段数据写入目标端指定字段,同时提供并发设置并发读写数据、提供限速功能避免同步对数据库造成影响、提供脏数据影响定义及分布式执行任务等功能。本教程使用默认配置。关于其他配置项说明,详情请参见通过向导模式配置离线同步任务

3、配置调度属性

通过以下配置实现调度场景下,每日00:15将存储在MySQL的ods_user_info_d用户信息数据通过数据集成同步至MaxCompute的ods_user_info_d表对应时间分区。

  • 配置调度参数bizdate=$bizdate,获取前一天的日期,格式为yyyymmdd。

  • 配置调度时间:配置调度周期;无需单独配置当前节点定时调度时间,当前节点每日起调时间由业务流程根节点WorkShop的定时调度时间控制,即每日00:15后才会调度。

    image

  • 配置调度依赖

    • 确认依赖的上游节点:确认当前节点依赖的上游节点是否展示workshop_start节点,拉线设置的上游依赖将展示在此处,若未显示workshop_start节点,请确认是否已参照2、设计业务流程完成业务数据同步阶段的业务流程设计。

      本案例中,workshop_start节点定时时间到且执行完成后,将会触发当前节点执行。

    • 确认本节点输出:确认是否存在MaxCompute生产项目名称.ods_user_info_d的节点输出。您可以进入管理中心查看MaxCompute生产项目名称

      说明

      DataWorks通过节点输出挂载节点依赖关系,为了方便下游SQL任务对同步任务产出表加工时,可通过自动解析机制基于表血缘快速为下游SQL节点添加当前同步任务的依赖,此处需要确认是否存在与同步产出表ods_user_info_d同名的节点输出。

      image

步骤八:运行并查看结果

运行当前业务流程,将MySQL中的用户基本信息和OSS中的用户网站访问日志数据写入MaxCompute对应表。

运行业务流程

  1. 数据开发页面,双击业务流程下的WorkShop,打开Workflow业务流程面板后,单击工具栏中的image.png图标,按照上下游依赖关系运行业务流程。

  2. 确认执行情况:

    • 查看任务运行状态:节点处于image.png状态,即代表同步执行过程无问题。

    • 查看任务执行日志:右键单击ods_user_info_d、ods_raw_log_d节点,选择查看日志。当日志中出现如下字样,表示同步节点运行成功,并成功同步数据。

      image

查询同步结果

预期运行业务流程后,MySQL中的用户基本信息ods_user_info_d表数据将全量同步至workshop2024_01_dev.ods_user_info_d昨天的分区中,OSS中的用户网站访问日志user_log.txt将全量同步至workshop2024_01_dev.ods_raw_log_d昨天的分区中。由于查询SQL不需要发布至生产环境执行,此时我们可以选择创建临时查询文件进行确认。

  1. 新建临时查询文件。

    数据开发页面的左侧导航栏,单击image.png,进入临时查询面板。右键单击临时查询,选择新建节点 > ODPS SQL

  2. 查询同步结果表。

    执行如下SQL语句,确认同步数据写入结果。查看导入ods_raw_log_dods_user_info_d的记录数。

    //此处您需要变更分区过滤条件为您当前操作的实际业务日期。例如,任务运行的日期为20230621,则业务日期为20230620,即任务运行日期的前一天。
    select count(*) from ods_user_info_d where dt=业务日期; 
    select count(*) from ods_raw_log_d where dt=业务日期;

    image

    说明

    本教程由于在DataStudio(开发环境)执行,所以该数据将默认写入开发环境对应的引擎项目workshop2024_01_dev的指定表中。

步骤九:提交业务流程

代码调试完成后,将用户画像分析业务流程提交至调度系统进行周期性调度,即周期性将原始业务数据同步至MaxCompute目标表。

  1. 进入业务流程面板。

    数据开发页面,双击业务流程名WorkShop,进入WorkShop业务流程面板。

  2. 提交业务流程。

    在业务流程画布中,单击工具栏中的image.png图标,提交该业务流程。

  3. 确认提交操作。

    提交对话框中选择当前业务流程中的所有节点,并勾选忽略输入输出不一致的告警。确认无误后单击确认,将提交该业务流程中的所有任务,并前往任务发布界面将其发布至生产环境。

后续步骤

现在,您已经学习了如何进行日志数据同步,完成数据的同步,您可以继续下一个教程。在该教程中,您将学习如何对同步的数据进行计算与分析。详情请参见数据加工