首页 Tablestore User Guide Data Channels Data export Synchronize data to MaxCompute Convert incremental data from Tablestore to a full data format

Convert incremental data from Tablestore to a full data format

更新时间: 2026-04-05 17:27:54

You can use DataWorks to convert incremental data from Tablestore in a MaxCompute table to a full data format using an ODPS SQL statement.

Prerequisites

Step 1: Create a JAR resource

Create a JAR resource and upload the merge_udf.jar package to DataWorks.

Data Studio (legacy)

  1. Go to the Data Development page.

    1. Log on to the DataWorks console.

    2. In the top navigation bar, select a resource group and region.

    3. In the navigation pane on the left, click Data Development and O&M > Data Development.

    4. On the Data Development page, select the target workspace from the drop-down list and click the Go to Data Development button.

  2. Create a JAR resource.

    1. In the DataStudio console, on the Data Studio page, click New and select New Resource > MaxCompute > JAR.

      Alternatively, open the business flow, right-click MaxCompute, and select New Resource > JAR.

    2. In the New Resource dialog box, configure the parameters and click Create.

      Important
      • If the resource has not been uploaded using the MaxCompute client, select the Upload as ODPS Resource check box. If the resource has already been uploaded using the MaxCompute client, clear the Upload as ODPS Resource check box.

      • If you select Upload as ODPS Resource, the resource is stored in both DataWorks and MaxCompute.

      • The resource name must end with .jar.

  3. Commit the JAR resource.

    1. Click the image icon.

    2. In the Commit dialog box, enter a description as needed.

    3. Click Confirm.

Data Studio (new version)

  1. You can go to the Resource Management page.

    1. Log on to the DataWorks console.

    2. In the top navigation bar, select a resource group and region.

    3. In the left navigation pane, click Data Development and O&M > Data Studio.

    4. On the Data Development page, select the desired workspace from the drop-down list and click Go To DataStudio.

    5. In the navigation pane on the left, click the Resource Management image icon. The Resource Management page appears.

  2. Create a JAR resource.

    1. On the Resource Management page, click the image icon and choose New Resource > MaxCompute Jar.

    2. In the New Resource and Function dialog box, you can select a path, enter a resource name, and click Confirm.

      Important

      The resource name must end with .jar.

    3. On the configuration tab of the new resource, configure the upload parameters as described in the following table.

      Parameter

      Description

      File Source

      The source of the object file. Options include Local and OSS.

      File Content

      • If you select Local, click Click to upload in the Upload File section to upload a local file.

      • If you select OSS, select the OSS file from the Select File drop-down list.

      Data Source

      The data source to which the uploaded MaxCompute resource belongs.

    4. To save the configuration, click Save.

  3. Publish the JAR resource.

    1. Click Publish.

    2. On the Publish tab of the resource, optionally enter a description for the release and click Start Publishing to Production.

    3. Follow the process guide and click Confirm Publish.

Step 2: Create a function

Data Studio (legacy)

  1. Create a function.

    1. On the Data Studio page in the DataStudio console, right-click the target business flow and choose New Function > MaxCompute > Function.

    2. In the New Function dialog box, you can select a path and enter a function name.

    3. Click Create.

  2. Register the function.

    Configure the function parameters as described in the following table, and then click the image.png icon to save the configuration.

    Parameter

    Description

    Function Type

    Select Other Function.

    Class Name

    Enter the class name based on the type and pattern of the Tablestore source table.

    Note

    You can determine whether the Tablestore source table is a single-version or multi-version table by its MaxVersion property.

    • Single-version table

    • Multi-version table

      Important

      Multi-version pattern V1 and V2 are two different policies.

      Multi-version pattern V1 manages versions at the row level. In the output, each version retains a complete snapshot of the entire row.

      Multi-version pattern V2 manages versions at the column level. In the output, the versions of different columns are independent, and only the incremental changes of the columns are retained.

    Resources

    Select the name of the resource you created in Step 1.

  3. Commit the function.

    1. Click the image icon.

    2. In the Commit dialog box, enter an optional description for the commit.

    3. Click Confirm.

Data Studio (new version)

  1. Create a function.

    1. On the Resource Management page, click the image icon, and then choose New Function > MaxCompute Function.

    2. In the New Resource and Function dialog box, select a path, enter a function name, and click Confirm.

    3. On the configuration tab of the new function, configure the function parameters as described in the following table.

      Parameter

      Description

      Function Type

      Select OTHER (Other Function).

      Data Source

      The data source to which the MaxCompute function belongs.

      Class Name

      Enter the class name based on the type and pattern of the Tablestore source table.

      Note

      You can determine whether the Tablestore source table is a single-version or multi-version table by its MaxVersion property.

      • Single-version table

      • Multi-version table

        Important

        Multi-version pattern V1 and V2 are two different policies.

        Multi-version pattern V1 manages versions at the row level. In the output, each version retains a complete snapshot of the entire row.

        Multi-version pattern V2 manages versions at the column level. In the output, the versions of different columns are independent, and only the incremental changes of the columns are retained.

      Type

      Select Resource Function.

      Resources

      Select the name of the resource you created in Step 1.

    4. To save the configuration, click Save.

  2. Publish the function.

    1. Click Publish.

    2. On the Publish tab, you can optionally enter a description for the release and then click Start Publishing to Production.

    3. Follow the process guide and click Confirm Publish.

Step 3: Write and run the ODPS SQL statement

Data Studio (legacy)

  1. Create an ODPS SQL node.

    1. On the Data Studio page in the DataStudio console, click New and select New Node > MaxCompute > ODPS SQL.

      Alternatively, you can right-click the target business flow and choose New Node > MaxCompute > ODPS SQL.

    2. In the New Node dialog box, select a path and enter a name.

    3. Click Confirm.

  2. On the node editing page, write the ODPS SQL statement.

    The format of the SQL statement is as follows:

    SELECT function_name(para_list) 
    AS (custom_para_list)
    FROM(
        SELECT * FROM stream_table_name 
        DISTRIBUTE BY primary_keys 
        SORT by primary_keys, SequenceID
    )t;

    The following table describes the parameters.

    Parameter

    Description

    function_name

    The function name. Enter the name of the function you created in Step 2.

    para_list

    The parameter list. Configure the parameters for the Tablestore source table and the MaxCompute incremental table.

    custom_para_list

    The custom parameter list. Define the field names in the result.

    stream_table_name

    The name of the MaxCompute table that stores the incremental data from Tablestore.

    primary_keys

    The list of primary keys for the Tablestore source table.

    SequenceID

    The time series information. Enter `sequenceInfo`.

    Important

    For ODPS SQL examples and parameter settings, see Appendix: Pattern selection. Select the appropriate pattern (Single-version pattern, Multi-version pattern V1, or Multi-version pattern V2) based on the function that you created in Step 2.

  3. Run the ODPS SQL statement.

    Click the 1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e icon. The results appear on the Result tab.

Data Studio (new version)

  1. In the DataStudio console, navigate to the Data Studio page, click the image icon, and choose New Node > MaxCompute > MaxCompute SQL.

  2. In the New Node dialog box, select a path, enter a name, and click Confirm.

  3. On the node editing page, write the ODPS SQL statement.

    The format of the SQL statement is as follows:

    SELECT function_name(para_list) 
    AS (custom_para_list)
    FROM(
        SELECT * FROM stream_table_name 
        DISTRIBUTE BY primary_keys 
        SORT by primary_keys, SequenceID
    )t;

    The following table describes the parameters.

    Parameter

    Description

    function_name

    The function name. Enter the name of the function you created in Step 2.

    para_list

    The parameter list. Configure the parameters for the Tablestore source table and the MaxCompute incremental table.

    custom_para_list

    The custom parameter list. Define the field names in the result.

    stream_table_name

    The name of the MaxCompute table that stores the incremental data from Tablestore.

    primary_keys

    The list of primary keys for the Tablestore source table.

    SequenceID

    The time series information. Enter `sequenceInfo`.

    Important

    For ODPS SQL examples and parameter settings, see Appendix: Pattern selection. Select the appropriate pattern (Single-version pattern, Multi-version pattern V1, or Multi-version pattern V2) based on the function that you created in Step 2.

  4. Run the ODPS SQL statement.

    Click Run. The results display on the Result tab.

Appendix: Pattern selection

The patterns include single-version, multi-version V1, and multi-version V2. You can select the appropriate pattern based on your Tablestore table type.

Single-version pattern

Class name

com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell

Parameter list

The format of the parameter list is pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo. For more information, see the following table.

Parameter

Type

Description

pknum

INT (constant)

The number of primary keys in the Tablestore source table.

colnum

INT (constant)

The number of Tablestore attribute columns whose incremental changes need to be merged.

colnames

List<String> (constant)

The names of the Tablestore attribute columns whose incremental changes need to be merged. Separate the names with commas (,).

pknames

List<String> (constant)

The list of primary keys for the Tablestore source table. Separate the keys with commas (,).

colname

STRING (variable)

The attribute column (provided by the `colname` field of the incremental table).

version

BIGINT (variable)

The version number (provided by the `version` field of the incremental table).

colvalue

STRING (variable)

The incremental value (provided by the `colvalue` field of the incremental table).

optype

STRING (variable)

The type of incremental operation (provided by the `optype` field of the incremental table).

sequenceinfo

STRING (variable)

The time series information (provided by the `sequenceInfo` field of the incremental table).

Example

Assume that the Tablestore source table and the MaxCompute incremental table have the following schemas:

  • Tablestore source table schema: The primary keys are pk1 and pk2. The attribute columns are `col1`, `col2`, and `col3`.

  • MaxCompute incremental table schema: pk1,pk2,colname,version,colvalue,optype,sequenceinfo.

To convert the incremental data in the MaxCompute incremental table to a full data format, set the parameter list to 2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo and the custom parameter list to pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted.

The following code provides an example of the ODPS SQL statement:

SELECT mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
FROM(
    SELECT * FROM stream_table_name 
    DISTRIBUTE BY pk1,pk2 
    SORT by pk1,pk2,sequenceInfo
)t;

After you run the ODPS SQL statement, the output is similar to the data in the following table.

pk1

pk2

col1

co1_is_deleted

col2

col2_is_deleted

col3

col3_is_deleted

test

0

\N

\N

20

\N

\N

True

The outputs are as follows:

  • If the `col` column and the `col_is_deleted` column are both \N, no incremental operation was performed on the `col` column.

  • If the `col` column has a specific value and the `col_is_deleted` column is \N, the value of the `col` column was changed to that specific value.

  • If the `col` column is \N and the `col_is_deleted` column is `true`, the `col` column was deleted.

Multi-version pattern V1

Class name

com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1

Parameter list

The format of the parameter list is pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo. For more information, see the following table.

Parameter

Type

Description

pknum

INT (constant)

The number of primary keys in the Tablestore source table.

colnum

INT (constant)

The number of Tablestore attribute columns whose incremental changes need to be merged.

colnames

List<String> (constant)

The names of the Tablestore attribute columns whose incremental changes need to be merged. Separate the names with commas (,).

pknames

List<String> (constant)

The list of primary keys for the Tablestore source table. Separate the keys with commas (,).

colname

STRING (variable)

The attribute column (provided by the `colname` field of the incremental table).

version

BIGINT (variable)

The version number (provided by the `version` field of the incremental table).

colvalue

STRING (variable)

The incremental value (provided by the `colvalue` field of the incremental table).

optype

STRING (variable)

The type of incremental operation (provided by the `optype` field of the incremental table).

sequenceinfo

STRING (variable)

The time series information (provided by the `sequenceInfo` field of the incremental table).

Example

Assume that the Tablestore source table and the MaxCompute incremental table have the following schemas:

  • Tablestore source table schema: The primary keys are pk1 and pk2. The attribute columns are `col1`, `col2`, and `col3`.

  • MaxCompute incremental table schema: pk1,pk2,colname,version,colvalue,optype,sequenceinfo.

To convert the incremental data in the MaxCompute incremental table to a full data format, set the parameter list to 2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo and the custom parameter list to pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted.

The following code provides an example of the ODPS SQL statement:

SELECT mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
FROM(
    SELECT * FROM stream_table_name 
    DISTRIBUTE BY pk1,pk2 
    SORT by pk1,pk2,sequenceInfo
)t;

After you run the ODPS SQL statement, the output is similar to the data in the following table.

pk1

pk2

version

col1

co1_is_deleted

col2

col2_is_deleted

col3

col3_is_deleted

test

0

123

\N

\N

20

\N

\N

True

The following table describes the output.

  • If the `version` column has a specific value, and the `col` and `col_is_deleted` columns are both \N, no incremental operation was performed on the corresponding version of the `col` column.

  • If the `version` and `col` columns both have specific values, and the `col_is_deleted` column is \N, the value of the corresponding version of the `col` column was changed to the specific value.

  • If the `version` column has a specific value, the `col` column is \N, and the `col_is_deleted` column is `true`, the corresponding version of the `col` column was deleted.

  • If the `version` and `col` columns are both \N, and the `col_is_deleted` column is `true`, an operation was performed to delete all versions of a column.

Multi-version pattern V2

Class name

com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2

Parameter list

The format of the parameter list is pknum,colnum,maxversion,colnames,pknames,colname,version,colvalue,optype,sequenceinfo. For more information, see the following table.

Parameter

Type

Description

pknum

INT (constant)

The number of primary keys in the Tablestore source table.

colnum

INT (constant)

The number of Tablestore attribute columns whose incremental changes need to be merged.

maxversion

BIGINT (constant)

The maximum number of versions for the Tablestore source table.

colnames

List<String> (constant)

The names of the Tablestore attribute columns whose incremental changes need to be merged. Separate the names with commas (,).

pknames

List<String> (constant)

The list of primary keys for the Tablestore source table. Separate the keys with commas (,).

colname

STRING (variable)

The attribute column (provided by the `colname` field of the incremental table).

version

BIGINT (variable)

The version number (provided by the `version` field of the incremental table).

colvalue

STRING (variable)

The incremental value (provided by the `colvalue` field of the incremental table).

optype

STRING (variable)

The type of incremental operation (provided by the `optype` field of the incremental table).

sequenceinfo

STRING (variable)

The time series information (provided by the `sequenceInfo` field of the incremental table).

Example

Assume that the Tablestore source table and the MaxCompute incremental table have the following schemas:

  • Tablestore source table schema: The primary keys are pk1 and pk2. The attribute columns are `col1`, `col2`, and `col3`, and the maximum number of versions is 3.

  • MaxCompute incremental table schema: pk1,pk2,colname,version,colvalue,optype,sequenceinfo.

To convert the incremental data in the MaxCompute incremental table to a full data format, set the parameter list to 2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo and the custom parameter list to pk1,pk2,col1,col2,col3.

The following code provides an example of the ODPS SQL statement:

SELECT mergeCell(2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,col1,col2,col3)
FROM(
    SELECT * FROM stream_table_name 
    DISTRIBUTE BY pk1,pk2 
    SORT by pk1,pk2,sequenceInfo
)t;

After you run the ODPS SQL statement, the output is similar to the data in the following table.

pk1

pk2

col1

col2

col3

test

02

{"data":[{"version":1621330803390,"value":"value001"},{"version":1621330795198,"value":"value002"},{"version":1621330785936,"value":"value003"}],"needDeleteAllVersionFirst":true,"deleteVersions":[]}

\N

\N

The outputs are as follows:

  • `data` indicates the list of newly written data, sorted by version number in descending order. A maximum of `maxversion` versions of data are retained.

  • `needDeleteAllVersionFirst` indicates whether all existing versions of the column must be deleted. This value is `true` if a `DeleteRow` or `DeleteColumns` operation occurs. Otherwise, this value is `false`.

  • `deleteVersions` indicates the list of versions to be deleted for the column, sorted by version number in descending order. A maximum of `maxversion` versions are retained.

    The version numbers in `deleteVersions` are different from the version numbers in `data`. If `needDeleteAllVersionFirst` is `true`, `deleteVersions` is an empty list.

FAQ

Type conversion error occurs when running an SQL statement to convert the Tablestore data format

  • Symptom

    When you run an SQL statement in DataWorks Data Studio to convert the Tablestore data format, the following error message is returned:

    FAILED ODPS -0010000:System internal error -fuxi job failed, causer by: Failed in UDF/UDTF/UDAF  com.aliyun.otsstream.utils.mergecell.oneversion.MergeCell class, at query location of line 1, column 8
  • Possible causes

    • The odps.sql.type.system.odps2 parameter is incorrectly configured. For example, it is set to odps.sql.type.system.odps2=true.

    • The field types in Tablestore and MaxCompute are inconsistent during data synchronization.

  • Solution

    • Add set odps.sql.type.system.odps2=false; before the SQL statement and run them together.

    • Check for inconsistent field types in the sync task. If you find any, modify them, synchronize the data again, and then convert the incremental data to full data.

Attribute columns cannot be mapped to values and are shown as deleted

  • Symptom

    After you run the SQL statement, some attribute columns in the result cannot be mapped to their actual values, and the corresponding `is_deleted` field for these columns has a value of True.

    For example, the values in the `col1` attribute column are all \N and the values in the `col1_is_deleted` column are all True. This indicates that the `col1` column has been deleted, but the `col1` column is not empty.

  • Possible cause

    When you configure an offline sync task, setting the Export Time Series Information parameter to "isExportSequenceInfo": false causes the system to incorrectly determine that the attribute column is deleted.

    Note

    The `sequenceInfo` field stores the historical version information of attribute columns. If `sequenceInfo` is not exported, the version information may not be parsed correctly when you convert incremental data to a full data format. This can prevent data from the corresponding versions from being mapped to the target rows.

  • Solution

    In Step 3: Configure an offline sync task, modify the parameter settings for Export Time Series Information, resynchronize the Tablestore incremental data, and execute an SQL statement to convert the data format.

    • Codeless UI

      In the Configure Data Source and Destination section of the Configure Task step, select the Export Time Series Information check box and then save and commit the task.

    • Code editor

      Change the value of the `isExportSequenceInfo` parameter in the Tablestore Stream Reader to true. Then, save and commit the task.

上一篇: Sync incremental data to MaxCompute 下一篇: Download Tablestore data to a local file
阿里云首页 表格存储 相关技术圈