Synchronize data

更新时间:
复制 MD 格式

This tutorial shows you how to use a single-table Data Integration batch task in DataWorks to synchronize basic user information from the MySQL table ods_user_info_d and website access logs from the Object Storage Service (OSS) file user_log.txt to the StarRocks tables ods_user_info_d_starrocks and ods_raw_log_d_starrocks, respectively. This process synchronizes data between heterogeneous data sources to populate a data warehouse.

Prerequisites

Ensure the working environment is ready. For detailed steps, see Prepare the Environment.

1. Add data sources

To ensure a smooth data processing workflow, create the following data sources in your DataWorks workspace to obtain the initial data provided by the platform.

  • MySQL data source: This data source is named user_behavior_analysis_mysql and retrieves basic user information from the ods_user_info_d table in MySQL.

  • HttpFile data source: This data source is named user_behavior_analysis_httpfile and retrieves user website access logs from the user_log.txt file in OSS.

Add MySQL data source (user_behavior_analysis_mysql)

The basic user information for this tutorial is stored in a MySQL database. You need to create a MySQL data source to synchronize the basic user information data (ods_user_info_d) to StarRocks.

  1. Go to the Data Sources page.

    1. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose More > Management Center. On the page that appears, select the desired workspace from the drop-down list and click Go to Management Center.

    2. In the left-side navigation pane, click Data Source.

  2. Click Add Connection, and then search for and select MySQL.

  3. On the Add MySQL Data Source page, use the following example values for both the development and production environments.

    The following table describes the key parameters. You can use the default values for other parameters.

    Parameter

    Description

    Data Source Name

    Enter user_behavior_analysis_mysql.

    Description

    A dedicated data source for DataWorks tutorials. This allows you to access the provided test data when you configure a single-table offline data synchronization task. This data source is read-only and available only for data integration.

    Configuration Mode

    Select User-created Data Store with Public IP Addresses.

    Connection Address

    • Host IP: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com

    • Port: 3306

    Database Name

    Enter workshop.

    Username

    Enter workshop.

    Password

    Enter workshop#2017.

    Authentication Method

    None.

  4. In the Connection Configuration section, click Test Connectivity for both the development environment and production environment. Make sure that the connectivity status is Connectable.

    Important
    • Ensure the resource group is bound to the workspace and public network access is enabled. Otherwise, subsequent data synchronization will fail. For configuration steps, see Prepare the environment.

    • If you do not have an available resource group, follow the on-screen guidance in the Go to Buy section and click Go to Buy and Associate Purchased Resource Group.

  5. Click Complete Creation.

Add HttpFile data source (user_behavior_analysis_httpfile)

In this tutorial, the user website access log data is stored in OSS. You need to create an Httpfile data source to synchronize the user website access logs (user_log.txt) from OSS to StarRocks.

  1. On the Management Center page, click Data Source in the left-side navigation pane.

  2. Click Add Connection. In the Add Connection dialog box, search for and select HttpFile.

  3. On the Add HttpFile Data Source page, use the following example values for both the development and production environments.

    The following table describes the key parameters. You can use the default values for other parameters.

    Parameter

    Description

    Data Source Name

    Enter user_behavior_analysis_httpfile.

    Description

    A dedicated data source for DataWorks tutorials. This allows you to access the provided test data when you configure a single-table offline data synchronization task. This data source is read-only and available only for data integration.

    URL

    Set the URL to https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com for both the development and production environments.

  4. In the Connection Configuration section, click Test Connectivity for both the development environment and production environment. Make sure that the connectivity status is Connectable.

    Important
    • Ensure the resource group is bound to the workspace and public network access is enabled. Otherwise, subsequent data synchronization will fail. For configuration steps, see Prepare the environment.

    • If you do not have an available resource group, follow the on-screen guidance in the Go to Buy section and click Go to Buy and Associate Purchased Resource Group.

  5. Click Complete Creation.

2. Build a synchronization workflow

  1. Click the 图标 icon in the upper-left corner, choose All Products > Data Development and O&M > DataStudio, and then switch to the workspace you created for this tutorial.

  2. In the left-side navigation pane, click image. In the Project Directory section, click image and select Create Workflow. Set a name for the workflow. For this tutorial, set the name to user_profile_analysis_starrocks.

  3. On the workflow orchestration page, drag a Zero-Load Node and two Data Integration nodes to the canvas on the right. Configure the Data Integration nodes as follows:

    • Data Source Type: MySQL.

    • Data destination type: OSS.

    • Specific Type: Single Table Offline.

    Node type

    Node name

    Description

    imagezero-load node

    workshop_start_starrocks

    Manages the user profile analysis workflow to clarify the data flow path. This node is for a Dry-Run Node and requires no code editing.

    imageStarRocks node

    ddl_ods_user_info_d_starrocks

    Run before the synchronization task. This node creates the StarRocks table ods_user_info_d_starrocks to store basic user information from the MySQL source.

    imageStarRocks node

    ddl_ods_raw_log_d_starrocks

    Run before the synchronization task. This node creates the StarRocks table ods_raw_log_d_starrocks to store user website access logs from the OSS source.

    imageSingle Table Offline node

    ods_user_info_d_starrocks

    Synchronizes basic user information from MySQL to the StarRocks table ods_user_info_d_starrocks.

    imageSingle Table Offline node

    ods_raw_log_d_starrocks

    Synchronizes user website access logs from OSS to the StarRocks table ods_raw_log_d_starrocks.

  4. Manually connect the nodes by dragging lines between them. Set the workshop_start_starrocks node as the upstream node for the two Single Table Offline nodes. The final workflow is shown in the following diagram:

    image
  5. Configure the workflow scheduling.

    On the right side of the workflow orchestration page, click Scheduling Configuration and set the parameters. The following table describes the key parameters for this tutorial. You can keep the default values for other parameters.

    Parameter

    Description

    Scheduling Parameters

    Set scheduling parameters for the entire workflow, which can be used directly by its internal nodes. For this tutorial, set the parameter to bizdate=$[yyyymmdd-1] to get the previous day's date.

    Scheduling period

    For this tutorial, set this to Daily.

    Scheduling time

    For this tutorial, set the Scheduling time to 00:30. The workflow starts at 00:30 daily.

    Scheduling dependencies

    This workflow has no upstream dependencies. For easier management, you can select Using the Workspace Root Node to attach it to the workspace root node.

    Workspace root nodes use the format workspace-name_root.

Step 3: Configure synchronization tasks

Configure the initial node

  1. On the workflow orchestration page, hover over the workshop_start_starrocks node and click Open Node.

  2. On the right side of the workshop_start_starrocks node configuration page, click Scheduling Configuration to configure the scheduling properties. The following table describes the key parameters for this tutorial. You can keep the default values for other parameters.

    Parameter

    Description

    Scheduling Type

    For this tutorial, set this parameter to dry run.

    Scheduling Resource Groups

    Select the serverless resource group that you created in the Prepare the environment step.

    Node dependency configuration

    Since workshop_start_starrocks is the initial node and has no upstream dependencies, click Using the Workspace Root Node to trigger the workflow from the workspace root node.

    The workspace root node is named in the format WorkspaceName_root.

Create the user table ddl_ods_user_info_d_starrocks

Before you synchronize data to StarRocks, create the ddl_ods_user_info_d_starrocks table to store basic user information from the MySQL data source. You can create the table within the node as described below, or you can manually create it in Data Catalog.

  1. On the workflow orchestration page, hover over the ddl_ods_user_info_d_starrocks node and click Open Node.

  2. Edit the CREATE TABLE statement.

    CREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks (
        uid STRING COMMENT 'User ID',
        gender STRING COMMENT 'Gender',
        age_range STRING COMMENT 'Age range',
        zodiac STRING COMMENT 'Zodiac sign',
        dt STRING not null COMMENT 'Date'
    ) 
    DUPLICATE KEY(uid)
    COMMENT 'User behavior analysis - basic user information table'
    PARTITION BY(dt) 
    PROPERTIES("replication_num" = "1");
  3. Configure the debugging configuration.

    On the right side of the StarRocks node configuration page, click Run Configuration and configure the following parameters. In Step 4, you can use the parameters from Run Configuration to test the run during debugging.

    Parameter

    Description

    Computing Resources

    Select the StarRocks computing resource that you bound in the Prepare the environment step.

    Resource Group

    Select the serverless resource group that you created in the Prepare the environment step.

  4. (Optional) Configure scheduling properties.

    On the right side of the node configuration page, click Scheduling Configuration to view the scheduling properties. For this tutorial, you can keep the default values. For more information about these parameters, see Node scheduling configuration.

    • Scheduling Parameters: You configure these at the workflow level, so no node-level configuration is needed. You can reference them directly in your tasks or code.

    • Scheduling Policy: You can use the Delayed execution time parameter to specify a delay for running a child node after the workflow starts. This tutorial does not use this setting.

  5. In the top toolbar, click Save.

Create the log table ddl_ods_raw_log_d_starrocks

Before you synchronize data to StarRocks, create the ddl_ods_raw_log_d_starrocks table to store user website access logs from the HttpFile data source. You can create the table within the node as described below, or you can manually create it in Data Catalog.

  1. On the workflow orchestration page, hover over the ddl_ods_raw_log_d_starrocks node and click Open Node.

  2. Edit the CREATE TABLE statement.

    CREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks (
        col STRING COMMENT 'Log entry',
        dt DATE  not null COMMENT 'Date'
    ) DUPLICATE KEY(col) 
    COMMENT 'User behavior analysis - raw website access log table' 
    PARTITION BY(dt) 
    PROPERTIES ("replication_num" = "1");
  3. Configure the debugging configuration.

    On the right side of the batch synchronization node configuration page, click Run Configuration and configure the following parameters to perform a test run with the relevant parameters from Run Configuration during the debug run in Step 4.

    Parameter

    Description

    Computing Resources

    Select the StarRocks computing resource that you bound in the Prepare the environment step.

    Resource Group

    Select the serverless resource group that you created in the Prepare the environment step.

  4. (Optional) Configure scheduling properties.

    On the right side of the node configuration page, click Scheduling Configuration to view the scheduling properties. For this tutorial, you can keep the default values. For more information about these parameters, see Node scheduling configuration.

    • Scheduling Parameters: You configure these at the workflow level, so no node-level configuration is needed. You can reference them directly in your tasks or code.

    • Scheduling Policy: You can use the Delayed execution time parameter to specify a delay for running a child node after the workflow starts. This tutorial does not use this setting.

  5. In the top toolbar, click Save.

Configure user data pipeline (ods_user_info_d_starrocks)

  1. On the workflow orchestration page, hover over the ods_user_info_d_starrocks node and click Open Node.

  2. Configure the network and resources for the synchronization pipeline.

    Parameter

    Description

    Data source

    • Data source: MySQL

    • Data Source Name: user_behavior_analysis_mysql

    My Resource Group

    Select the serverless resource group that you created in the Prepare the environment step.

    Data going

    • Data going: StarRocks

    • Data Source Name: doc_starrocks_storage_compute_tightly_01

  3. Configure the task.

    • Configure Source and Destination.

      Section

      Parameter

      Configuration

      Data source

      Table

      Select the MySQL table ods_user_info_d.

      Shard Key

      Use a primary key or an indexed column as the split key. Only integer data types are supported.

      For this tutorial, set the split key to the uid field.

      Data going

      Table

      Select the StarRocks table ods_user_info_d_starrocks.

      Statement Run Before Writing

      This tutorial uses dynamic partitioning based on the dt field. To prevent duplicate data when a node is rerun, the following SQL statement deletes the target partition before each synchronization.

      The ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE statement uses a parameter, ${var}, which will be assigned a value during scheduling configuration. This enables dynamic parameter input in scheduling scenarios. For more information, see Scheduling settings.

      Streamload Request Parameters

      The request parameters for StreamLoad. The value must be in JSON format.

      {
        "row_delimiter": "\\x02",
        "column_separator": "\\x01"
      }
    • Field Mapping.

      Use field mapping to define the relationship between source and destination fields. By assigning a scheduling parameter to a variable, you can dynamically populate the StarRocks partition field to ensure that daily data is written to the correct partition.

      1. Click Map Fields with Same Name. The system automatically maps the source MySQL fields to destination fields with the same names.

      2. Click Add, enter '${var}', and manually map this field to the dt field in the StarRocks table.

    • Channel.

      For this tutorial, set the Policy for Dirty Data Records to Disallow Dirty Data Records and keep the default values for other settings. For more information, see Configure a batch synchronization node in the codeless UI.

  4. Configure the debugging configuration.

    On the right side of the batch synchronization node configuration page, click Run Configuration and configure the following parameters. During the debug run in Step 4, you can use the parameters from the Run Configuration section to test the run.

    Parameter

    Description

    Resource Group

    Select the serverless resource group that you created in the Prepare the environment step.

    Script Parameters

    Click Add parameter and set it to a constant in the var=yyyymmdd format, such as var=20250223. During debugging, Data Studio replaces the task variable with this constant.

  5. (Optional) Configure scheduling properties.

    On the right side of the node configuration page, click Scheduling Configuration to view the scheduling properties. For this tutorial, you can keep the default values. For more information about these parameters, see Node scheduling configuration.

    • Scheduling Parameters: You configure these at the workflow level, so no node-level configuration is needed. You can reference them directly in your tasks or code.

    • Scheduling Policy: You can use the Delayed execution time parameter to specify a delay for running a child node after the workflow starts. This tutorial does not use this setting.

  6. In the top toolbar, click Save.

Configure log data pipeline (ods_raw_log_d_starrocks)

  1. On the workflow orchestration page, hover over the ods_raw_log_d_starrocks node and click Open Node.

  2. Configure the network and resources for the synchronization pipeline.

    Configure the Data source, My Resource Group, and Data going as detailed below. Then, click The next Step and follow the on-screen instructions to complete the connectivity test.

    Parameter

    Configuration

    Data source

    • Data source: HttpFile

    • Data Source Name: user_behavior_analysis_HttpFile

    My Resource Group

    Select the serverless resource group that you created in the Prepare the environment step.

    Data going

    • Data going: StarRocks

    • Data Source Name: doc_starrocks_storage_compute_tightly_01

  3. Click The next Step to configure the synchronization task.

    • Configure Source and Destination.

      Section

      Parameter

      Configuration

      Data source

      File Path

      /user_log.txt

      File Type

      text

      Field Delimiter

      |

      Advanced Settings > Skip Header

      No

      After you configure the source, click Confirm Data Structure.

      Data going

      Table

      Select the StarRocks table ods_raw_log_d_starrocks.

      Statement Run Before Writing

      This tutorial uses dynamic partitioning based on the dt field. To prevent duplicate data when a node is rerun, the following SQL statement deletes the target partition before each synchronization.

      The ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE statement uses a parameter, ${var}, which will be assigned a value during scheduling configuration. This enables dynamic parameter input in scheduling scenarios. For more information, see Scheduling settings.

      Streamload Request Parameters

      The request parameters for StreamLoad. The value must be in JSON format.

      {
        "row_delimiter": "\\x02",
        "column_separator": "\\x01"
      }
    • Channel.

      For this tutorial, set the Policy for Dirty Data Records to Disallow Dirty Data Records and keep the default values for other settings. For more information, see Configure a batch synchronization node in the codeless UI.

    • Field Mapping.

      In the node's toolbar, click the image icon to switch the configuration mode from the Wizard Mode to the Script Mode. This allows you to map the fields from the HttpFile data source and dynamically assign a value to the StarRocks partition field dt.

      • In the column configuration for the source, add the following code:

         {
                   "type": "STRING",
                  "value": "${var}"
                }
      • The following is the complete script for the ods_raw_log_d_starrocks node:

        {
            "type": "job",
            "version": "2.0",
            "steps": [
                {
                    "stepType": "httpfile",
                    "parameter": {
                        "fileName": "/user_log.txt",
                        "nullFormat": "",
                        "compress": "",
                        "requestMethod": "GET",
                        "connectTimeoutSeconds": 60,
                        "column": [
                            {
                                "index": 0,
                                "type": "STRING"
                            },
                            {
                                "type": "STRING",
                                "value": "${var}"
                            }
                        ],
                        "skipHeader": "false",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "|",
                        "fieldDelimiterOrigin": "|",
                        "socketTimeoutSeconds": 3600,
                        "envType": 0,
                        "datasource": "user_behavior_analysis",
                        "bufferByteSizeInKB": 1024,
                        "fileFormat": "text"
                    },
                    "name": "Reader",
                    "category": "reader"
                },
                {
                    "stepType": "starrocks",
                    "parameter": {
                        "loadProps": {
                            "row_delimiter": "\\x02",
                            "column_separator": "\\x01"
                        },
                        "envType": 0,
                        "datasource": "doc_starrocks_storage_compute_tightly_01",
                        "column": [
                            "col",
                            "dt"
                        ],
                        "tableComment": "",
                        "table": "ods_raw_log_d_starrocks",
                        "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS  p${var} FORCE;"
                    },
                    "name": "Writer",
                    "category": "writer"
                },
                {
                    "copies": 1,
                    "parameter": {
                        "nodes": [],
                        "edges": [],
                        "groups": [],
                        "version": "2.0"
                    },
                    "name": "Processor",
                    "category": "processor"
                }
            ],
            "setting": {
                "errorLimit": {
                    "record": "0"
                },
                "locale": "en",
                "speed": {
                    "throttle": false,
                    "concurrent": 2
                }
            },
            "order": {
                "hops": [
                    {
                        "from": "Reader",
                        "to": "Writer"
                    }
                ]
            }
        }
  4. Configure the debugging configuration.

    On the right side of the batch synchronization node configuration page, click Run Configuration and configure the following parameters. During the debug run in Step 4, you can use the parameters from the Run Configuration section to test the run.

    Parameter

    Description

    Resource Group

    Select the serverless resource group that you created in the Prepare the environment step.

    Script Parameters

    Click Add parameter and set it to a constant in the var=yyyymmdd format, such as var=20250223. During debugging, Data Studio replaces the task variable with this constant.

  5. (Optional) Configure scheduling properties.

    On the right side of the node configuration page, click Scheduling Configuration to view the scheduling properties. For this tutorial, you can keep the default values. For more information about these parameters, see Node scheduling configuration.

    • Scheduling Parameters: You configure these at the workflow level, so no node-level configuration is needed. You can reference them directly in your tasks or code.

    • Scheduling Policy: You can use the Delayed execution time parameter to specify a delay for running a child node after the workflow starts. This tutorial does not use this setting.

  6. In the top toolbar, click Save.

4. Run the task

  1. Synchronize the data.

    In the workflow toolbar, click Run, set the values of the parameter variables defined in each node for this run (this tutorial uses 20250223, and you can change the value as needed), click OK, and wait for the run to complete.

  2. Check the results.

    1. Go to the SQL Query page.

      Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Analysis and Service > DataAnalysis. On the page that appears, click Go to DataAnalysis. In the left-side navigation pane of the page that appears, click SQL Query.

    2. Configure an SQL query file.

      1. Click the image icon next to Show My Nodes Only to create a new file, and then name the SQL query file.

      2. Click the new file to open the editor.

      3. In the upper-right corner of the file editor, click the image icon to configure the workspace and data source for the SQL query. The following table describes the parameters.

        Parameter

        Description

        Work space

        Select the workspace where the user_profile_analysis_starrocks workflow is located.

        Data Source Type

        From the drop-down list, select StarRocks.

        Data Source Name

        Select the StarRocks development environment you bound in Prepare environment.

      4. Click OK to save the data source configuration.

    3. Edit the SQL query.

      After all nodes in this section have run successfully, run the following SQL queries to check if the external tables were created correctly.

      -- Update the partition key in the query with the correct data timestamp. The data timestamp is the day before the task's run date. For example, if a task runs on 20250223, the data timestamp is 20250222.
      SELECT * FROM ods_raw_log_d_starrocks WHERE dt=your_data_timestamp; 
      SELECT * FROM ods_user_info_d_starrocks WHERE dt=your_data_timestamp; 

Next steps

Now that you have synchronized your data, in the next tutorial you'll learn how to compute and analyze it. For more information, see Process data.