同步数据

本教程以MySQL数据源中的用户基本信息ods_user_info_d表和HttpFile中的网站访问日志数据user_log.txt文件为例,通过数据集成离线同步任务分别同步至私有OSS中,再通过Spark SQL创建外部表来访问私有OSS数据存储。本章节旨在完成数据同步操作。

章节目标

  1. 本章节通过数据集成将平台提供的MySQL数据源内的用户基本信息数据HttpFile数据源内的用户网站访问日志数据同步至私有OSS对象存储创建的数据源中。

    源端数据源类型

    源端待同步数据

    源端表结构

    目标端数据源类型

    MySQL

    表:ods_user_info_d

    用户基本信息数据

    • uid 用户名

    • gender 性别

    • age_range 年龄分段

    • zodiac 星座

    OSS

    HttpFile

    文件:user_log.txt

    用户网站访问日志数据

    一行为一条用户访问记录。

    $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

    OSS

  2. 完成同步任务后,通过EMR Spark SQL创建外部表来访问私有OSS数据存储。

操作步骤

步骤一:设计业务流程

步骤二:搭建同步链路

步骤三:验证同步数据

步骤一:设计业务流程

本步骤内,将数据集成节点以及EMR Spark SQL 节点相结合,形成用户画像分析任务流程中获取数据部分的流程。主要是通过ods_raw_log_d_2oss_spark节点从HttpFile数据源获取日志数据至私有OSS数据源中,再通过ods_raw_log_d_spark节点生成简单的日志外部表,从私有OSS数据存储中获取用户日志数据。以及通过ods_user_info_d_2oss_spark从MySQL数据源同步用户基本信息至私有OSS数据源中,再通过ods_user_info_d_spark实现外部表的创建,从私有OSS数据存储中获取用户基本信息数据。

登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

设计业务流程

  1. 新建业务流程。

    数据开发需基于业务流程使用对应的开发组件进行具体开发操作。在创建节点之前,您需要先新建业务流程。具体操作方法可参见创建业务流程

    该业务流程的命名为:用户画像分析_Spark

    image

  2. 设计业务流程。

    业务流程新建完成后,将自动展开该业务流程画布。请根据业务流程设计,在业务流程画布中单击新建节点,通过将节点组件拖拽至业务流程画布,并通过拉线设置节点上下游依赖的方式,设计数据同步阶段的业务流程。

    image

  3. 在本教程中,由于虚拟节点和同步节点之间并无血缘关系,因此我们通过业务流程拉线的方式来设置节点的依赖关系。有关更多依赖关系设置方式的详细信息,详情请参见调度依赖配置指引。以下为各个节点的类型、命名以及作用的介绍。

    节点分类

    节点类型

    节点命名

    节点作用

    通用

    image虚拟节点

    workshop_start_spark

    用于统筹管理整个用户画像分析业务流程,例如业务流程起调时间。当业务流程较复杂时,可使数据流转路径更清晰。该节点为空跑任务,无须编辑代码。

    数据集成

    image离线同步

    ods_raw_log_d_2oss_spark

    用于将HttpFile数据源存储的用户网站访问记录,通过离线同步的方式同步至私有OSS数据源中,以供后续Spark SQL获取。

    数据集成

    image离线同步

    ods_user_info_d_2oss_spark

    用于将MySQL数据源存储的用户基本信息数据,通过离线同步的方式同步至私有OSS数据源中,以供后续Spark SQL获取。

    EMR

    imageEMR Spark SQL

    ods_raw_log_d_spark

    在EMR Spark SQL节点中,创建表ods_raw_log_d_spark,并通过该外部表访问私有OSS中的用户网站访问记录数据。

    EMR

    imageEMR Spark SQL

    ods_user_info_d_spark

    在EMR Spark SQL节点中,创建表ods_user_info_d_spark,并通过该外部表访问私有OSS中的用户基本信息数据。

配置调度逻辑

本案例通过虚拟节点workshop_start_Spark控制整个业务流程每天00:30调度执行,以下为虚拟节点关键调度配置,其他节点调度无须变更,实现逻辑详情请参见:场景:如何配置业务流程定时时间。其他调度配置相关说明,请参见:任务调度属性配置概述

调度配置

图片示例

说明

调度时间配置

image

虚拟节点配置调度时间为00:30,该虚拟节点会在每日00:30调起当前业务流程并执行。

调度依赖配置

image

由于虚拟节点workshop_start_spark无上游依赖,此时可以直接依赖工作空间根节点,由空间根节点触发workshop_start_spark节点执行。

说明

DataWorks中的所有节点都需要依赖于上游节点,数据同步阶段的所有任务都以虚拟节点workshop_start_spark为依赖,通过workshop_start_spark节点来触发数据同步业务流程的执行。

步骤二:搭建同步链路

配置完成业务流程后,分别双击ods_user_info_d_2oss_spark以及ods_raw_log_d_2oss_spark数据集成节点,配置用户数据同步至私有OSS和配置用户日志同步至私有OSS,并且通过ods_raw_log_d_sparkods_user_info_d_spark采用 Spark SQL代码,实现通过Spark SQL创建的外表来访问存储于私有OSS的数据。

用户数据与日志同步至OSS数据源

使用数据集成将平台提供的用户数据与用户日志同步至私有OSS对象存储的Bucket目录下。

配置用户日志同步至OSS

通过离线数据集成任务,实现从平台的HttpFile数据源内的获取用户日志信息,同步至私有OSS数据源中。

同步HttpFile数据源的日志信息至自建的OSS。

  1. 数据开发页面,双击ods_raw_log_d_2oss_spark节点,进入节点配置页面。

  2. 配置同步网络链接。

    完成以下网络与资源配置后,单击下一步,并根据界面提示完成连通性测试。

    参数

    描述

    数据来源

    • 数据来源:HttpFile

    • 数据源名称:user_behavior_analysis_httpfile

    我的资源组

    选择已购买的Serverless资源组

    数据去向

    • 数据去向:OSS

    • 数据源名称:选择前文创建的私有OSS数据源,此处示例为test_g。

  3. 配置同步任务。

    参数

    描述

    数据来源

    • 文本类型:选择text类型。

    • 文件路径/user_log.txt

    • 列分隔符:输入列分隔符为|

    • 压缩格式:包括None、Gzip、Bzip2和Zip四种类型,此处选择None

    • 是否跳过表头:选择No。

    数据去向

    • 文本类型:选择text类型。

    • 文件名(含路径):根据您自建OSS的目录进行输入,示例为ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt,其中ods_raw_log_d为您自建的目录名,$bizdate表示获取前一天的日期。

    • 列分隔符:输入列分隔符为|

  4. 调度设置。

    配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息。以下为配置的内容。

    说明

    DataWorks提供调度参数,可实现在调度场景下,将每日数据写入不同的OSS路径及文件下,并以业务日期对路径目录与文件进行命名。

    在实际场景下,您可以在数据去向的文件名(含目录)配置中通过${变量名}格式自定义路径中的变量,并通过在调度配置页面为变量赋值调度参数的方式,实现调度场景下动态生成数据去向目录与文件名。

    配置项

    配置内容

    图示

    调度参数

    调度参数项中单击新增参数,添加:

    • 参数名:bizdate

    • 参数值:$[yyyymmdd-1]

    详情可参见:配置调度参数

    image

    调度依赖

    调度依赖确认产出表已作为本节点输出。

    格式为workspacename.节点名

    详情可参见:配置调度依赖

    image

  5. 配置完成后,单击工具栏中的保存图标,进行保存。

配置用户数据同步至OSS

通过离线数据集成任务,实现从平台的MySQL数据源内的获取用户数据信息,同步至私有OSS数据源中。

  1. 数据开发页面,双击ods_user_info_d_2oss_spark节点,进入节点配置页面。

  2. 配置同步网络链接。

    完成以下网络与资源配置后,单击下一步,并根据界面提示完成连通性测试。

    参数

    描述

    数据来源

    • 数据来源:MySQL

    • 数据源名称:user_behavior_analysis_mysql

    我的资源组

    选择已购买的Serverless资源组

    数据去向

    • 数据去向:OSS

    • 数据源名称:选择前文创建的私有OSS数据源,此处示例为test_g。

  3. 配置同步任务。

    参数

    描述

    数据来源

    • :选择数据源中的ods_user_info_d

    • 切分键:建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。此处设置切分键为uid

    数据去向

    • 文本类型:选择text类型。

    • 文件名(含路径):根据您自建OSS的目录进行输入,示例为ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。其中ods_user_info_d为您自建的目录名,$bizdate表示获取前一天的日期。

    • 列分隔符:输入列分隔符为|

  4. 调度设置

    配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息。以下为配置的内容。

    说明

    DataWorks提供调度参数,可实现在调度场景下,将每日数据写入不同的OSS路径及文件下,并以业务日期对路径目录与文件进行命名。

    在实际场景下,您可以在数据去向的文件名(含目录)配置中通过${变量名}格式自定义路径中的变量,并通过在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下动态生成数据去向目录与文件名。

    配置项

    配置内容

    图示

    调度参数

    调度参数项中单击新增参数,添加:

    • 参数名:bizdate

    • 参数值:$[yyyymmdd-1]

    详情可参见:配置调度参数

    image

    调度依赖

    调度依赖确认产出表已作为本节点输出。

    格式为workspacename.节点名

    详情可参见:配置调度依赖

    image

  5. 配置完成后,单击工具栏中的保存图标。

创建Spark外部表加载OSS数据

数据通过离线集成任务同步至私有OSS数据源后,基于生成的OSS文件,通过Spark SQL的create语法创建ods_raw_log_d_sparkods_user_info_d_spark表,并通过LOCATION来获取OSS中的用户信息文件、用户日志信息以供后续数据加工使用。

配置ods_raw_log_d_spark节点

基于通过EMR Spark SQL创建的外部表ods_raw_log_d_spark,用LOCATION来访问离线数据集成任务写入私有OSS对象存储Bucket的日志信息。

  1. 代码配置。

    -- 场景:以下SQL为Spark SQL,通过EMR Spark SQL创建的外部表ods_raw_log_d_spark,用LOCATION来获取离线数据集成任务写入私有OSS对象存储Bucket的日志信息,并添加对应的dt分区。
    -- 补充:
    --      DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。
    --      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。 
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark
    (
      `col` STRING
    ) 
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/';
    
    ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'
    ;
    说明

    上述代码中的location为示例路径,与之前配置离线同步任务时的数据去向相同,需要输入您建立的文件路径名称,其中dw-emr-demo是您准备环境时创建的OSS Bucket域名。

  2. 配置调度配置。

    ods_raw_log_d_spark节点配置任务调度,通过配置的调度参数来获取对应业务日期的私有OSS日志文件,并写入同样业务日期分区的Spark表内。

    配置项

    配置内容

    图示

    调度参数

    调度参数项中单击新增参数,添加:

    参数名:bizdate

    参数值:$[yyyymmdd-1],详情可参见:配置调度参数

    image

    调度依赖

    调度依赖确认产出表已作为本节点输出。

    格式为workspacename.节点名

    详情可参见:配置调度依赖

    image

    说明

    本章节在SQL中配置了调度参数${bizdate},并将其赋值为T-1。在离线计算场景下bizdate为业务交易发生的日期,也常被称为业务日期。例如,今天统计前一天的营业额,此处的前一天指的是交易发生的日期,也就是业务日期。

  3. 完成配置后,单击image保存节点。

配置ods_user_info_d_spark节点

基于通过EMR Spark SQL节点创建的外部表ods_user_info_d_spark,用LOCATION来访问离线数据集成任务写入私有OSS对象存储Bucket的用户信息。

  1. 代码配置。

    -- 场景:以下SQL为Spark SQL,通过EMR Spark SQL节点创建的外部表ods_user_info_d_spark,用LOCATION来获取离线数据集成任务写入私有OSS对象存储Bucket的用户信息,并写入对应的dt分区。
    -- 补充:
    --      DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应的业务分区内。
    --      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过变量赋值调度参数的方式,实现调度场景下代码动态入参。
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark
    (
        `uid`        STRING COMMENT '用户ID'
        ,`gender`    STRING COMMENT '性别'
        ,`age_range` STRING COMMENT '年龄段'
        ,`zodiac`    STRING COMMENT '星座'
    )
    PARTITIONED BY 
    (
        dt           STRING
    )
    ROW FORMAT DELIMITED 
    FIELDS
    TERMINATED
    BY'|'
    STORED AS TEXTFILE
    LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    
    ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    说明

    上述代码中的location为示例路径,与之前配置离线同步任务时的数据去向相同,需要输入您建立的文件路径名称,其中dw-emr-demo是您准备环境时创建的OSS Bucket域名。

  2. 配置调度配置。

    ods_user_info_d_spark节点配置任务调度,通过配置的调度参数来获取对应业务日期的私有OSS用户信息文件,并写入同样业务日期分区的Spark表内。

    配置项

    配置内容

    图示

    调度参数

    调度参数项中单击新增参数,添加:

    参数名:bizdate

    参数值:$[yyyymmdd-1],详情可参见:配置调度参数

    image

    调度依赖

    调度依赖确认产出表已作为本节点输出。

    格式为worksspacename.节点名

    详情可参见:配置调度依赖

    image

  3. 完成配置后,单击image保存节点。

步骤三:验证同步数据

在确保该章节内的所有节点运行成功的情况下,在左侧导航栏的临时查询中新建EMR Spark SQL临时查询,编写SQL查看EMR Spark SQL节点创建的外部表是否正常产出。

-- 您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20240808,则业务日期为20240807,即任务运行日期的前一天。
SELECT  * FROM  ods_raw_log_d_spark  WHERE dt ='业务日期';--查询ods_raw_log_d_spark表
SELECT  * FROM  ods_user_info_d_spark   WHERE dt ='业务日期';--查询ods_user_info_d_spark表
说明

在验证同步数据的SQL中,可将WHERE条件替换为"dt = ${bizdate}",在临时查询任务中单击image带参运行,为SQL占位符${bizdate}赋值后运行即可。

后续步骤

现在,您已经完成了同步数据,您可以继续下一个教程。在下一个教程中,您将学习将用户基本信息数据、用户网站访问日志数据在Spark中进行加工处理。详情请参见加工数据