同步数据

本教程以MySQL中的用户基本信息ods_user_info_d表和OSS中的网站访问日志数据user_log.txt文件为例,通过数据集成离线同步任务分别同步至StarRocks的ods_user_info_d_starrocksods_raw_log_d_starrocks表。旨在介绍如何通过DataWorks数据集成实现异构数据源间的数据同步,完成数仓数据同步操作。

前提条件

在进行数据同步之前,请确保已准备好所需的工作环境。具体操作步骤请参见准备环境

章节目标

将案例提供的公共数据源中的数据同步至StarRocks,完成业务流程设计中的数据同步部分的内容。

源端数据源类型

源端待同步数据

源端表结构

目标端数据源类型

接收源端数据的目标表

目标表结构

MySQL

表:ods_user_info_d

用户基本信息数据

  • uid 用户名

  • gender 性别

  • age_range 年龄分段

  • zodiac 星座

StarRocks

ods_user_info_d_starrocks

  • uid 用户名

  • gender 性别

  • age_range 年龄分段

  • zodiac 星座

  • dt 分区字段

HttpFile

文件:user_log.txt

用户网站访问日志数据

一行为一条用户访问记录

$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

StarRocks

ods_raw_log_d_starrocks

  • col 原始日志

  • dt 分区字段

进入数据开发

登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

操作步骤

步骤一:设计业务流程

步骤二:搭建同步链路

步骤三:验证同步数据

步骤一:设计业务流程

设计业务流程

  1. 新建业务流程。

    数据开发需基于业务流程使用对应的开发组件进行具体开发操作。在创建节点之前,您需要先新建业务流程。具体操作方法可参见创建业务流程

    该业务流程的命名为:用户画像分析_StarRocks。image

  2. 设计业务流程

    业务流程新建完成后,将自动展开该业务流程画布。请根据业务流程设计,在业务流程画布中单击新建节点,通过将节点组件拖拽至业务流程画布,并通过拉线设置节点上下游依赖的方式,设计数据同步阶段的业务流程。

    image

  3. 在本教程中,由于虚拟节点和同步节点之间并无血缘关系,因此我们通过业务流程拉线的方式来设置节点的依赖关系。有关更多依赖关系设置方式的详细信息,详情请参见调度依赖配置指引。以下为各个节点的类型、命名以及作用的介绍。

    节点分类

    节点类型

    节点命名

    (以最终产出表命名)

    节点作用

    通用

    虚拟节点

    image

    workshop_start_starrocks

    用于统筹管理整个用户画像分析业务流程,例如业务流程起调时间。当空间业务流程较复杂时,可使数据流转路径更清晰。该节点为空跑任务,无须编辑代码。

    数据库

    StarRocks

    image

    ddl_ods_user_info_d_starrocks

    同步任务前创建,用于接收源端MySQL存储的用户基本信息数据的StarRocks表ods_user_info_d_starrocks

    数据库

    StarRocks

    image

    ddl_ods_raw_log_d_starrocks

    同步任务前创建,用于接收源端OSS存储的用户网站访问记录的StarRocks表ods_raw_log_d_starrocks

    数据集成

    离线同步

    image

    ods_user_info_d_starrocks

    用于将存储于MySQL的用户基本信息数据同步至StarRocks表ods_user_info_d_starrocks

    数据集成

    离线同步

    image

    ods_raw_log_d_starrocks

    用于将存储于OSS的用户网站访问记录同步至StarRocks表ods_raw_log_d_starrocks

配置调度逻辑

本案例通过虚拟节点workshop_start_starrocks控制整个业务流程每天00:30调度执行,以下为虚拟节点关键调度配置,其他节点调度无须变更,实现逻辑详情请参见:场景:如何配置业务流程定时时间。其他调度配置相关说明,请参见:任务调度属性配置概述

调度配置

图片展示

说明

调度时间配置

image

虚拟节点配置调度时间为00:30,该虚拟节点会在每日00:30调起当前业务流程并执行。

调度依赖配置

image

由于虚拟节点workshop_start_starrocks无上游依赖,此时可以直接依赖工作空间根节点,由空间根节点触发workshop_start_starrocks节点执行。

说明

DataWorks中的所有节点都需要依赖于上游节点,数据同步阶段的所有任务都以虚拟节点workshop_start_starrocks为依赖。换句话说,通过workshop_start_starrocks节点来触发数据同步业务流程的执行。

步骤二:搭建同步链路

创建目标StarRocks表

在进行同步操作之前,需要提前创建目标StarRocks表,以便存储后续同步过来的原始数据。

本案例Starrocks表基于源端表结构生成,具体请参见章节目标。在业务流程面板,双击数据库ddl_ods_user_info_d_starrocks节点和数据库ddl_ods_raw_log_d_starrocks节点,进入节点编辑页,分别填入对应的StarRocks建表命令,并单击image保存。

  • ddl_ods_user_info_d_starrocks

    CREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks (
        uid STRING COMMENT '用户ID',
        gender STRING COMMENT '性别',
        age_range STRING COMMENT '年龄段',
        zodiac STRING COMMENT '星座',
        dt STRING not null COMMENT '时间'
    ) 
    DUPLICATE KEY(uid)
    COMMENT '用户行为分析案例-用户基本信息表'
    PARTITION BY(dt) 
    PROPERTIES("replication_num" = "1");
  • ddl_ods_raw_info_d_starrocks

    CREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks (
        col STRING COMMENT '日志',
        dt DATE  not null COMMENT '时间'
    ) DUPLICATE KEY(col) 
    COMMENT '用户行为分析案例-网站访问日志原始数据表' 
    PARTITION BY(dt) 
    PROPERTIES ("replication_num" = "1");

配置用户数据同步链路

在业务流程面板,双击离线同步ods_user_info_d_starrocks节点,进入ods_user_info_d_starrocks节点的配置面板,完成用户基本信息数据从案例提供的MySQL表ods_user_info_d同步至StarRocks表ods_user_info_d_starrocks的同步链路配置操作。

  1. 网络与资源配置。

    在配置好数据来源我的资源组数据去向后,单击下一步,根据页面提示完成连通性测试。详细配置如下。

    配置项

    配置内容

    数据来源

    • 数据来源:MySQL

    • 数据源名称user_behavior_analysis_mysql

    我的资源组

    选择在准备环境阶段新建的Serverless资源组。

    数据去向

    • 数据去向:StarRocks

    • 数据源名称Doc_StarRocks_Storage_Compute_Tightly_01

    image

  2. 配置任务。

    • 配置数据来源与去向。

      模块

      配置项

      配置内容

      数据来源

      选择MySQL表ods_user_info_d

      切分键

      建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。

      此处配置切分键为uid字段。

      数据去向

      选择StarRocks表ods_user_info_d_starrocks

      导入前准备语句

      本案例按dt字段动态分区,为避免节点重跑数据重复写入,通过以下SQL语句实现每次同步前删除已有目标分区。

      ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE其中${var}为参数,后续在调度配置阶段为其赋值调度参数,以实现调度场景下的动态入参,详情请参见调度设置

      StreamLoad请求参数

      StreamLoad 的请求参数,需为JSON格式。

      {
        "row_delimiter": "\\x02",
        "column_separator": "\\x01"
      }

      image

    • 字段映射。

      通过字段映射关系确定源端字段与目标端字段的写入关系,并通过变量赋值调度参数的方式,实现动态为StarRocks分区字段赋值,即每日数据写入StarRocks对应业务分区。

      单击同名映射,源端MySQL字段将自动映射目标表同名字段,即源端字段数据将默认写入与源端字段同名的目标端字段。

      单击添加一行,输入'${var}'并手动设置该字段与StarRocks的dt字段进行映射。

      image

    • 调度配置。

      配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息,详情可参见任务调度配置。以下为配置的内容。

      配置项

      配置内容

      图示

      调度参数

      调度参数项中单击新增参数,添加:

      • 参数名:var

      • 参数值:$[yyyymmdd-1]

      image

      调度依赖

      调度依赖确认产出表已作为本节点输出。

      格式为worksspacename.tablename

      image

配置用户日志同步链路

在业务流程面板,双击离线同步ods_raw_log_d_starrocks节点,进入ods_raw_log_d_starrocks节点的配置面板,完成用户网站访问信息数据从平台提供的公共数据源HttpFile文件user_log.txt同步到StarRocks表ods_raw_log_d_starrocks的同步链路配置操作。

  1. 网络与资源配置。

    在配置好数据来源我的资源组数据去向后,请单击下一步,根据页面提示完成连通性测试。详细配置如下。

    配置项

    配置内容

    数据来源

    • 数据来源:HttpFile

    • 数据源名称user_behavior_analysis_HttpFile

    我的资源组

    选择在准备环境阶段购买的Serverless资源组。

    数据去向

    • 数据去向:StarRocks

    • 数据源名称Doc_StarRocks_Storage_Compute_Tightly_01

    image

  2. 任务配置。

    • 配置数据来源与去向

      模块

      配置项

      配置内容

      数据来源

      文件路径

      /user_log.txt

      文件类型

      text

      列分隔符

      |

      高级设置 > 是否跳过表头

      No

      完成以上数据来源配置后,单击确认表数据结构

      数据去向

      ods_raw_log_d_starrocks

      导入前准备语句

      本案例按dt字段动态分区,为避免节点重跑数据重复写入,通过以下SQL语句实现每次同步前删除已有目标分区。

      ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE

      其中${var}为变量参数,后续在调度配置阶段为其赋值调度参数,以实现调度场景下的动态入参。

      StreamLoad请求参数

      {
        "row_delimiter": "\\x02",
        "column_separator": "\\x01"
      }

      image

    • 字段映射。

      单击节点页面工具栏image,将任务配置方式从向导模式转换为脚本模式,完成HttpFile数据源的字段映射与StarRocks动态分区字段dt的动态赋值。

      来源HttpFile端Column配置新增:

       {
                 "type": "STRING",
                "value": "${var}"
              }
    • ods_raw_log_d_starrocks节点完整脚本示例:

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "httpfile",
                  "parameter": {
                      "fileName": "/user_log.txt",
                      "nullFormat": "",
                      "compress": "",
                      "requestMethod": "GET",
                      "connectTimeoutSeconds": 60,
                      "column": [
                          {
                              "index": 0,
                              "type": "STRING"
                          },
                          {
                              "type": "STRING",
                              "value": "${var}"
                          }
                      ],
                      "skipHeader": "false",
                      "encoding": "UTF-8",
                      "fieldDelimiter": "|",
                      "fieldDelimiterOrigin": "|",
                      "socketTimeoutSeconds": 3600,
                      "envType": 0,
                      "datasource": "user_behavior_analysis",
                      "bufferByteSizeInKB": 1024,
                      "fileFormat": "text"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "starrocks",
                  "parameter": {
                      "loadProps": {
                          "row_delimiter": "\\x02",
                          "column_separator": "\\x01"
                      },
                      "envType": 0,
                      "datasource": "Doc_StarRocks_Storage_Compute_Tightly_01",
                      "column": [
                          "col",
                          "dt"
                      ],
                      "tableComment": "",
                      "table": "ods_raw_log_d_starrocks",
                      "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS  p${var} FORCE ; "
                  },
                  "name": "Writer",
                  "category": "writer"
              },
              {
                  "copies": 1,
                  "parameter": {
                      "nodes": [],
                      "edges": [],
                      "groups": [],
                      "version": "2.0"
                  },
                  "name": "Processor",
                  "category": "processor"
              }
          ],
          "setting": {
              "errorLimit": {
                  "record": "0"
              },
              "locale": "zh",
              "speed": {
                  "throttle": false,
                  "concurrent": 2
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • 调度设置。

      配置页面单击右侧调度配置,可进入调度配置面板配置调度与节点信息。以下为配置的内容。

      配置项

      配置内容

      图示

      调度参数

      调度参数项中单击新增参数,添加:

      • 参数名:var

      • 参数值:$[yyyymmdd-1]

      image

      调度依赖

      调度依赖确认产出表已作为本节点输出。

      格式为worksspacename.tablename

      image

步骤三:验证同步数据

运行业务流程

  1. 进入业务流程面板。

    双击业务流程下的用户画像分析_Starrocks版,即可进入该业务流程画布。image

  2. 运行业务流程。

    在业务流程画布,单击工具栏中的image图标,将按照上下游依赖关系运行数据集成阶段的业务流程。

  3. 查看任务运行状态。

    节点处于image状态,即代表同步执行过程无问题。

  4. 查看任务执行日志。

    右键画布中的ods_user_info_d_starrocks节点、ods_raw_log_d_starrocks节点,选择查看日志,即可查看详细的同步过程。image

查看同步结果

  1. 新建临时查询文件。

    数据开发页面的左侧导航栏,单击image.png,进入临时查询面板。右键单击临时查询,选择新建节点 > StarRocksimage

  2. 查询同步结果表。

    --查询语句中的分区列需要更新为业务日期。例如,任务运行的日期为20240102,则业务日期为20240101,即任务运行日期的前一天。
    SELECT * from ods_raw_log_d_starrocks where dt=业务日期; 
    SELECT * from ods_user_info_d_starrocks where dt=业务日期; 

后续步骤

现在,您已经完成了同步数据,您可以继续下一个教程。在下一个教程中,您将学习将用户基本信息数据、用户网站访问日志数据在StarRocks中进行加工处理。详情请参见加工数据