Sync incremental data to MaxCompute

更新时间: 2026-04-25 03:15:53

To periodically synchronize new or updated data from Tablestore to MaxCompute for backups or business processing, you can configure an offline synchronization task in the DataWorks data integration platform.

Prerequisites

Important

If the MaxCompute instance and the Tablestore instance are in different regions, follow these steps to create a VPC peering connection to enable cross-region network connectivity.

Create a VPC peering connection to enable cross-region network connectivity

This example assumes the DataWorks workspace and the MaxCompute instance are in the same region, China (Hangzhou), while the Tablestore instance is in China (Shanghai).

  1. Attach a VPC to the Tablestore instance.

    1. Log on to the Tablestore console and select a region at the top of the page.

    2. Click the instance alias to open the Instance Management page.

    3. On the Network Management tab, click Attach VPC. Select a VPC and a vSwitch, enter a VPC name, and click OK.

    4. After the VPC is attached, the page automatically refreshes. You can view the attached VPC ID and VPC Endpoint in the VPC list.

      Note

      You will use this VPC endpoint when you add the Tablestore data source in the DataWorks console later.

      image

  2. Retrieve the VPC information for the DataWorks workspace resource group.

    1. Log in to the DataWorks console. In the top navigation bar, select the region where your workspace is located. In the navigation pane on the left, click Workspaces to go to the Workspace page.

    2. Click the workspace name to go to the Workspace Details page. In the left navigation pane, click the Resource Groups menu to view the resource groups attached to the workspace.

    3. To the right of the target resource group, click Network Settings. In the Resource Scheduling & Data Integration area, the VPC ID of the attached virtual private cloud is displayed.

  3. Create a VPC peering connection and configure a route.

    1. Log on to the VPC console. In the navigation pane on the left, click the VPCs menu. Select the regions of the Tablestore instance and the DataWorks workspace, and record the CIDR blocks of the corresponding VPCs.

      image

    2. In the navigation pane on the left, click the VPC Peering Connections menu. On the VPC Peering Connections page, click Create Peering Connection.

    3. On the Create Peering Connection page, enter a name for the peering connection and select the requester VPC, accepter account type, accepter region, and accepter VPC. Then, click OK.

    4. On the VPC Peering Connections page, find the VPC peering connection that you created. Click Configure Route Entry in both the Requester VPC and Accepter VPC columns.

      The destination CIDR block must be the CIDR block of the peer VPC. When you configure a route entry for the requester VPC, enter the CIDR block of the accepter VPC. When you configure a route entry for the accepter VPC, enter the CIDR block of the requester VPC.

Procedure

Step 1: Add Tablestore data source

  1. Go to the Data Integration page.

    Log on to the DataWorks console. After switching to the destination region, click Data Integration > Data Integration in the navigation pane on the left. In the drop-down list, select the corresponding workspace and click Go to Data Integration.

  2. In the navigation pane on the left, click Data Source.

  3. On the Data Sources page, click Add Data Source.

  4. In the Add Data Source dialog box, select Tablestore as the data source type.

  5. In the Add Tablestore Data Source dialog box, set the data source parameters as outlined in the table below.

    Parameter

    Description

    Data Source Name

    The name of the data source. The name can contain only letters, digits, and underscores (_), and must start with a letter.

    Data Source Description

    The description of the data source. The description cannot exceed 80 characters in length.

    Region

    Select the region where the Tablestore instance is located.

    Tablestore Instance Name

    The name of the Tablestore instance.

    Endpoint

    The endpoint of the Tablestore instance. We recommend that you use the VPC address.

    AccessKey ID

    The AccessKey ID and AccessKey secret of an Alibaba Cloud account or a Resource Access Management (RAM) user.

    AccessKey Secret

  6. Test the connectivity of the resource group. You must perform this test when you create a data source to ensure that the resource group used by the sync task can connect to the data source. Otherwise, the data synchronization task cannot run properly.

    1. In the Connection Configuration section, for the target resource group, click Test Network Connectivity in the Connection Status column.

    2. After the connectivity test passes, the Connectivity Status changes to Connected. Click Complete. The new data source then appears in the data source list.

      Note

      If the connectivity test fails and the status is Failed, use the connectivity diagnostic tool to troubleshoot the issue. If the resource group still cannot connect to the data source, submit a ticket or .

Step 2: Add MaxCompute data source

The procedure is similar to Step 1. In the Add Data Source dialog box, search for and select MaxCompute, and then configure the data source parameters.

Step 3: Configure offline synchronization

Data Studio (Legacy)

Step 1: Create a task node

  1. Go to the Data Development page.

    1. Log on to the DataWorks console.

    2. In the top navigation bar, select a resource group and region.

    3. In the navigation pane on the left, click Data Development and O&M > Data Development.

    4. On the Data Development page, select the target workspace from the drop-down list and click the Go to Data Development button.

  2. On the Data Development page of the Data Studio console, under the Business Flow node, click the target business flow.

    For more information, see Create a business flow.

  3. Right-click the Data Integration node and choose Create Node > Batch Synchronization.

  4. In the Create Node dialog box, select a path, enter a name, and click OK.

    The new batch synchronization node is displayed under the Data Integration node.

Step 2: Configure the synchronization task

  1. Under the Data Integration node, double-click the new offline synchronization node.

  2. Configure the network and resources.

    Select the data source, data destination, and the resource group for executing the synchronization task, and then test the connectivity.

    1. In the Network and Resource Configuration step, set Data Source to Tablestore Stream and select your new Tablestore data source for Data Source Name.

    2. Select a resource group.

      After you select a resource group, the system displays its details, such as region and specifications. The system automatically tests the connectivity between the resource group and the selected data source.

      Note

      For a Serverless resource group, you can specify the maximum number of compute units (CUs) for the synchronization task. If the task fails with an Out of Memory (OOM) error due to insufficient resources, increase the CU value for the resource group.

    3. Set Data Destination to MaxCompute and select your new MaxCompute data source for Data Source Name.

      The system automatically tests the connectivity between the resource group and the selected data source.

    4. After the connectivity test is successful, click Next.

  3. Configure and save the task.

    Wizard mode

    1. In the Configure Task step, configure the data source and destination in the Configure Source and Destination section based on your requirements.

      Data source

      Parameter

      Description

      Data source

      The Tablestore data source that you selected in the previous step is displayed by default.

      Table

      The source data table.

      Start Time

      Specifies the start and end times for reading incremental data. Set these parameters to the variables ${startTime} and ${endTime} respectively. The specific format is configured later in the scheduling properties. The time range for incremental data is left-inclusive and right-exclusive.

      End Time

      Status Table

      The name of the table used to record status. The default value is TableStoreStreamReaderStatusTable.

      Maximum Retries

      The maximum number of retries for each request when reading incremental data from Tablestore.

      Export Sequence Information

      Specifies whether to export sequence information, which includes data write times.

      Important

      Select the Export Sequence Information check box. This information is required to convert incremental data into a full data format in a later step.

      Data destination

      Parameter

      Description

      Data source

      The MaxCompute data source that you selected in the previous step is displayed by default.

      Tunnel Resource Group

      This parameter specifies the Tunnel Quota. The default value is the public transport resource, which is the free quota provided by MaxCompute.

      For information about data transport resources for MaxCompute, see Purchase and use exclusive resource groups for data transport services.

      Note

      If an exclusive tunnel quota becomes unavailable due to an overdue payment or expiration, the task automatically switches to public transport resources.

      Table

      The destination MaxCompute table.

      Partition Information

      If you want to store daily incremental data in partitions corresponding to specific dates, you can use partitions for daily increments. For example, set the partition key pt to ${bizdate}.

      Write Mode

      The mode for writing data to the table. The following modes are supported:

      • Append (Insert Into): Inserts data directly into the table or a static partition.

        Important

        Because this offline synchronization task will run periodically, select Append (Insert Into).

      • Overwrite (Insert Overwrite): Clears all existing data from the table or a static partition before inserting new data.

      Convert Empty String to Null

      Specifies whether to convert empty strings from the source to null values when writing to a destination MaxCompute column. The default value is No.

      Visible upon Completion

      This parameter appears only after you click Advanced Settings.

      Specifies whether the data synchronized to MaxCompute can be queried only after the synchronization is complete. The default value is No.

    2. Configure field mapping.

      In the field mapping section, the system automatically maps fields with the same name. You can keep the default settings. For more information, see Configure field mappings.

    3. Configure channel control.

      You can configure channel settings to control properties related to the data synchronization process. For more information about the parameters, see Relationship between concurrency and rate limiting for offline synchronization.

    4. Click the image.png icon to save the configuration.

    Script mode

    1. In the Configure Task step, click the image.png icon, and then click OK in the dialog box that appears.

    2. On the script configuration page, edit the script.

      The following code provides an example of a script configuration. Replace the parameters in the configuration file with your own information based on your synchronization requirements.

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "otsstream",
            "parameter": {
              "statusTable": "TableStoreStreamReaderStatusTable",
              "maxRetries": 30,
              "isExportSequenceInfo": true,
              "datasource": "MyTablestoreDataSource",
              "dataTable": "target_table",
              "newVersion": "true",
              "column": [
                "pk",
                "colName",
                "version",
                "colValue",
                "opType",
                "sequenceInfo"
              ],
              "startTimeString": "${startTime}",
              "endTimeString": "${endTime}"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "odps",
            "parameter": {
              "partition": "",
              "truncate": false,
              "datasource": "MyMaxComputeDataSource",
              "table": "target_table",
              "column": [
                "pk",
                "colname",
                "version",
                "colvalue",
                "optype",
                "sequenceinfo"
              ],
              "emptyAsNull": false,
              "compress": false
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        },
        "setting": {
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "throttle": false,
            "concurrent": 2
          }
        }
      }
      • The following table describes the parameters to replace for Tablestore Stream Reader.

        Parameter

        Description

        datasource

        The name of the Tablestore data source for the source table.

        dataTable

        The name of the source table.

        column

        The primary keys and incremental change information of the source table.

      • The following table describes the parameters to replace for MaxCompute Writer.

        Parameter

        Description

        datasource

        The name of the MaxCompute data source for the destination table.

        table

        The name of the destination table.

        column

        The columns to which you write data.

    3. Click the image.png icon to save the configuration.

Step 3: Configure scheduling properties

  1. On the right side of the task, click Scheduling configuration.

  2. In the Scheduling configuration panel, in the Scheduling Parameters section, click Add Parameter and add the parameters listed in the following table. For more information, see Supported formats for scheduling parameters.

    Parameter

    Value

    startTime

    $[yyyymmddhh24-2/24]$[miss-10/24/60]

    endTime

    $[yyyymmddhh24-1/24]$[miss-10/24/60]

    The following example describes the configuration.

    If the task runs at 19:00:00 on April 23, 2023, the value of startTime is 20230423175000 and the value of endTime is 20230423185000. The task synchronizes data added between 17:50 and 18:50.

  3. In the Time Properties section, configure the time properties. For more information, see Time Property Configuration.

    This example shows how to configure the task to run automatically every hour.

    Configure the parameters as follows: For Instance Generation Mode, select Generate Instance upon Deployment. For Scheduling Type, select Normal Scheduling. For Scheduling Calendar, select Default. Set Scheduling Cycle to Hour, Start Time to 00:00, Interval to 1 hour, and End Time to 23:59. For Timeout, select System Default. For Rerun Property, select Rerunnable on success or failure. For Effective Date, select Permanent.

  4. In the Dependencies section, click Use Workspace Root Node. The system automatically generates the upstream dependency.

    Using the workspace root node indicates that the task has no upstream dependencies.

    After you select this option, a dependency record containing the upstream node's output name, node ID, and workspace is automatically generated in the table below.

  5. After completing the configuration, close the Scheduling configuration panel.

  6. Click the image.png icon to save the configuration.

Step 4: (Optional) Debug the script

Debug the script to ensure that the synchronization task synchronizes incremental data from Tablestore to MaxCompute.

Note

When you debug the script, data within the specified time range may be imported into MaxCompute multiple times. Duplicate data rows will overwrite existing rows in MaxCompute.

  1. Click the 1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e icon.

  2. In the Parameters dialog box, select the resource group to run the task and configure the custom parameters.

    The format for the custom parameter is yyyyMMddHHmmss, for example, 20250631170000.

    For example, set Resource Group Name to Public Scheduling Resource Group, enter 20250701154000 for startTime, and enter 20250701164000 for endTime.

  3. Click Run.

    After the task finishes running, you can view the synchronization result in the destination table. For more information, see MaxCompute Table Data.

Step 5: Deploy the synchronization task

After you submit the synchronization task, it runs based on the configured scheduling properties.

  1. Click the image icon.

  2. In the Submit dialog box, enter a change description if needed.

  3. Click Confirm.

Data Studio (New)

Step 1: Create a task node

  1. Go to the Data Development page.

    1. Log on to the DataWorks console.

    2. In the top navigation bar, select a resource group and region.

    3. In the navigation pane on the left, click Data Development and O&M > Data Development.

    4. On the Data Development page, select your target workspace from the drop-down list and click Go to Data Studio.

  2. In data development page of the Data Studio console, click the image icon to the right of Workspace Directories, and select Create Node > Data Integration > Batch Synchronization.

    Note

    If this is your first time using Workspace Directories, you can also click the Create Node button.

  3. In the Create Node dialog box, select a path, enter a name, and click OK.

    The new batch synchronization node appears in the Workspace Directories.

Step 2: Configure the synchronization task

  1. Under Project Explorer, click the new offline synchronization task node to open it.

  2. Configure the network and resources.

    Select the data source, data destination, and the resource group for executing the synchronization task, and then test the connectivity.

    1. In the Network and Resource Configuration step, set Data Source to Tablestore Stream and select your new Tablestore data source for Data Source Name.

    2. Select a resource group.

      After you select a resource group, the system displays its details, such as region and specifications. The system automatically tests the connectivity between the resource group and the selected data source.

      Note

      For a Serverless resource group, you can specify the maximum number of compute units (CUs) for the synchronization task. If the task fails with an Out of Memory (OOM) error due to insufficient resources, increase the CU value for the resource group.

    3. Set Data Destination to MaxCompute and select your new MaxCompute data source for Data Source Name.

      The system automatically tests the connectivity between the resource group and the selected data source.

    4. After the connectivity test is successful, click Next.

  3. Configure and save the task.

    Wizard mode

    1. In the Configure Task step, configure the data source and destination in the Configure Source and Destination section based on your requirements.

      Data source

      Parameter

      Description

      Data source

      The Tablestore data source that you selected in the previous step is displayed by default.

      Table

      The source data table.

      Start Time

      Specifies the start and end times for reading incremental data. Set these parameters to the variables ${startTime} and ${endTime} respectively. The specific format is configured later in the scheduling properties. The time range for incremental data is left-inclusive and right-exclusive.

      End Time

      Status Table

      The name of the table used to record status. The default value is TableStoreStreamReaderStatusTable.

      Maximum Retries

      The maximum number of retries for each request when reading incremental data from Tablestore.

      Export Sequence Information

      Specifies whether to export sequence information, which includes data write times.

      Important

      Select the Export Sequence Information check box. This information is required to convert incremental data into a full data format in a later step.

      Data destination

      Parameter

      Description

      Data source

      The MaxCompute data source that you selected in the previous step is displayed by default.

      Tunnel Resource Group

      This parameter specifies the Tunnel Quota. The default value is the public transport resource, which is the free quota provided by MaxCompute.

      For information about data transport resources for MaxCompute, see Purchase and use exclusive resource groups for data transport services.

      Note

      If an exclusive tunnel quota becomes unavailable due to an overdue payment or expiration, the task automatically switches to public transport resources.

      Table

      The destination MaxCompute table.

      Partition Information

      If you want to store daily incremental data in partitions corresponding to specific dates, you can use partitions for daily increments. For example, set the partition key pt to ${bizdate}.

      Write Mode

      The mode for writing data to the table. The following modes are supported:

      • Append (Insert Into): Inserts data directly into the table or a static partition.

        Important

        Because this offline synchronization task will run periodically, select Append (Insert Into).

      • Overwrite (Insert Overwrite): Clears all existing data from the table or a static partition before inserting new data.

      Convert Empty String to Null

      Specifies whether to convert empty strings from the source to null values when writing to a destination MaxCompute column. The default value is No.

      Visible upon Completion

      This parameter appears only after you click Advanced Settings.

      Specifies whether the data synchronized to MaxCompute can be queried only after the synchronization is complete. The default value is No.

    2. Configure field mapping.

      In the field mapping section, the system automatically maps fields with the same name. You can keep the default settings. For more information, see Configure field mappings.

    3. Configure channel control.

      You can configure channel settings to control properties related to the data synchronization process. For more information about the parameters, see Relationship between concurrency and rate limiting for offline synchronization.

    4. Click Save to save the configuration.

    Script mode

    1. In the Configure Task step, click Script Mode, and then click OK in the dialog box that appears.

    2. On the script configuration page, edit the script.

      The following code provides an example of a script configuration. Replace the parameters in the configuration file with your own information based on your synchronization requirements.

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "otsstream",
            "parameter": {
              "statusTable": "TableStoreStreamReaderStatusTable",
              "maxRetries": 30,
              "isExportSequenceInfo": true,
              "datasource": "MyTablestoreDataSource",
              "dataTable": "target_table",
              "newVersion": "true",
              "column": [
                "pk",
                "colName",
                "version",
                "colValue",
                "opType",
                "sequenceInfo"
              ],
              "startTimeString": "${startTime}",
              "endTimeString": "${endTime}"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "odps",
            "parameter": {
              "partition": "",
              "truncate": false,
              "datasource": "MyMaxComputeDataSource",
              "table": "target_table",
              "column": [
                "pk",
                "colname",
                "version",
                "colvalue",
                "optype",
                "sequenceinfo"
              ],
              "emptyAsNull": false,
              "compress": false
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        },
        "setting": {
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "throttle": false,
            "concurrent": 2
          }
        }
      }
      • The following table describes the parameters to replace for Tablestore Stream Reader.

        Parameter

        Description

        datasource

        The name of the Tablestore data source for the source table.

        dataTable

        The name of the source table.

        column

        The primary keys and incremental change information of the source table.

      • The following table describes the parameters to replace for MaxCompute Writer.

        Parameter

        Description

        datasource

        The name of the MaxCompute data source for the destination table.

        table

        The name of the destination table.

        column

        The columns to which you write data.

    3. Click Save to save the configuration.

Step 3: Configure scheduling properties

  1. In the right-side pane of the task, click Scheduling Configuration.

  2. In the Scheduling Configuration panel, click Add Parameter in the Scheduling Parameters section and add parameters as described in the following table. For more information, see Sources and expressions of scheduling parameters.

    Parameter

    Value

    startTime

    $[yyyymmddhh24-2/24]$[miss-10/24/60]

    endTime

    $[yyyymmddhh24-1/24]$[miss-10/24/60]

    The following example describes the configuration.

    For example, if the task runs at 19:00:00 on April 23, 2023, startTime is 20230423175000 and endTime is 20230423185000. The task then synchronizes data added between 17:50 and 18:50.

  3. In the Scheduling Policy section, configure the scheduling policy. For more information, see Instance generation mode: Generate upon deployment.

    For Instance Generation Mode, select Generate upon deployment. For Scheduling Type, select Normal Scheduling. For Scheduling Calendar, select Default. For Timeout Definition, select System Default. For Rerun Properties, select Rerunnable on success or failure, and select the Auto-rerun on Failure check box. Set Number of Retries to 3 and Retry Interval to 2 minutes. For Scheduling Resource Group, select public scheduling resource group.

  4. In the Scheduling Time section, configure the scheduling time. For more information, see Scheduling time.

    The following example shows how to configure the task to run automatically every hour on the hour.

    Set Scheduling Cycle to Hour, select the Hourly Range mode, set Start Time to 00:00, set Interval to 1 hour, and set End Time to 23:59. For Effective Date, select Permanent.

  5. In the Scheduling Dependencies section, click Use Workspace Root Node. The system automatically generates the upstream node dependency information.

    Note

    Using the workspace root node indicates that the task has no upstream dependencies.

  6. After completing the configuration, close the Scheduling Configuration panel.

  7. Click Save to save the configuration.

Step 4 (Optional): Debug the script

Debug the script to ensure that the synchronization task can successfully synchronize incremental data from Tablestore to MaxCompute.

Note

When you debug the script, data within the specified time range may be imported into MaxCompute multiple times. Duplicate data rows will overwrite existing ones in MaxCompute.

  1. In the right-side pane of the task, click Debugging Configuration. Select the resource group for the run and configure the script parameters.

    The custom parameter must be in the yyyyMMddHHmmss format, such as 20250528160000.

  2. After completing the configuration, close the Debugging Configuration panel.

  3. Click Run.

    After the task finishes running, you can view the synchronization result in the destination table. For more information, see MaxCompute table data.

Step 5: Publish the synchronization task

Once published, the task runs according to its scheduling properties.

  1. Click Publish.

  2. In the Publish tab of the synchronization task, enter an optional description, and then click Publish to Production.

  3. Follow the steps in the publishing workflow and click Confirm Publish.

Step 4: Check synchronization results

  1. Check the task run status in the DataWorks console.

    Data Studio (Legacy)

    1. Click Operation Center on the right side of the synchronization task toolbar.

    2. On the cycle instance page, on the instance view tab, view the instance's run details. For details, see Cycle instance view.

    Data Studio (New)

    1. Click Go to Operation Center to view cycle instances.

    2. On the cycle instance page, on the instance view tab, view the instance's run details. For details, see Cycle instance view.

  2. Check the synchronization results in the destination table.

    In the Data Map module of the DataWorks console, view the details of the destination MaxCompute table. For details, see MaxCompute table data.

Next steps

The incremental data synchronized to MaxCompute contains column-level change records from the Tablestore table. You can then convert this incremental data into a full data format.

MaxCompute table creation

This example shows how to create a destination MaxCompute table to store column-level data change records from a Tablestore table.

Important

The primary key structure of the destination MaxCompute table, including column names, data types, and order, must match that of the source Tablestore table.

CREATE TABLE IF NOT EXISTS maxcompute_target_table(
    `pk` STRING COMMENT 'Primary key field', -- Adjust this based on your actual primary key.
    `colname` STRING COMMENT 'Column name',
    `version` BIGINT COMMENT 'Version',
    `colvalue` STRING COMMENT 'Column value',
    `optype` STRING COMMENT 'Operation type',
    `sequenceinfo` STRING COMMENT 'Sequence information'
)
COMMENT 'Stores incremental data from Tablestore';
上一篇: Export full data to MaxCompute 下一篇: Convert incremental data from Tablestore to a full data format
阿里云首页 表格存储 相关技术圈