Convert incremental data from Tablestore to a full data format
You can use DataWorks to convert incremental data from Tablestore in a MaxCompute table to a full data format using an ODPS SQL statement.
Prerequisites
-
You have synchronized incremental data from Tablestore to a MaxCompute table.
-
You have attached a MaxCompute compute resource to the current DataWorks workspace.
-
You have downloaded the merge_udf.jar package.
Step 1: Create a JAR resource
Create a JAR resource and upload the merge_udf.jar package to DataWorks.
Data Studio (legacy)
Go to the Data Development page.
Log on to the DataWorks console.
In the top navigation bar, select a resource group and region.
In the navigation pane on the left, click .
On the Data Development page, select the target workspace from the drop-down list and click the Go to Data Development button.
-
Create a JAR resource.
-
In the DataStudio console, on the Data Studio page, click New and select .
Alternatively, open the business flow, right-click MaxCompute, and select .
-
In the New Resource dialog box, configure the parameters and click Create.
Important-
If the resource has not been uploaded using the MaxCompute client, select the Upload as ODPS Resource check box. If the resource has already been uploaded using the MaxCompute client, clear the Upload as ODPS Resource check box.
-
If you select Upload as ODPS Resource, the resource is stored in both DataWorks and MaxCompute.
-
The resource name must end with
.jar.
-
-
-
Commit the JAR resource.
-
Click the
icon. -
In the Commit dialog box, enter a description as needed.
-
Click Confirm.
-
Data Studio (new version)
-
You can go to the Resource Management page.
-
Log on to the DataWorks console.
-
In the top navigation bar, select a resource group and region.
-
In the left navigation pane, click .
-
On the Data Development page, select the desired workspace from the drop-down list and click Go To DataStudio.
-
In the navigation pane on the left, click the Resource Management
icon. The Resource Management page appears.
-
-
Create a JAR resource.
-
On the Resource Management page, click the
icon and choose . -
In the New Resource and Function dialog box, you can select a path, enter a resource name, and click Confirm.
ImportantThe resource name must end with
.jar. -
On the configuration tab of the new resource, configure the upload parameters as described in the following table.
Parameter
Description
File Source
The source of the object file. Options include Local and OSS.
File Content
-
If you select Local, click Click to upload in the Upload File section to upload a local file.
-
If you select OSS, select the OSS file from the Select File drop-down list.
Data Source
The data source to which the uploaded MaxCompute resource belongs.
-
-
To save the configuration, click Save.
-
-
Publish the JAR resource.
-
Click Publish.
-
On the Publish tab of the resource, optionally enter a description for the release and click Start Publishing to Production.
-
Follow the process guide and click Confirm Publish.
-
Step 2: Create a function
Data Studio (legacy)
-
Create a function.
-
On the Data Studio page in the DataStudio console, right-click the target business flow and choose .
-
In the New Function dialog box, you can select a path and enter a function name.
-
Click Create.
-
-
Register the function.
Configure the function parameters as described in the following table, and then click the
icon to save the configuration.Parameter
Description
Function Type
Select Other Function.
Class Name
Enter the class name based on the type and pattern of the Tablestore source table.
NoteYou can determine whether the Tablestore source table is a single-version or multi-version table by its MaxVersion property.
-
Single-version table
-
Class name:
com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
-
-
Multi-version table
-
Class name:
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1 -
Class name:
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
ImportantMulti-version pattern V1 and V2 are two different policies.
Multi-version pattern V1 manages versions at the row level. In the output, each version retains a complete snapshot of the entire row.
Multi-version pattern V2 manages versions at the column level. In the output, the versions of different columns are independent, and only the incremental changes of the columns are retained.
-
Resources
Select the name of the resource you created in Step 1.
-
-
Commit the function.
-
Click the
icon. -
In the Commit dialog box, enter an optional description for the commit.
-
Click Confirm.
-
Data Studio (new version)
-
Create a function.
-
On the Resource Management page, click the
icon, and then choose . -
In the New Resource and Function dialog box, select a path, enter a function name, and click Confirm.
-
On the configuration tab of the new function, configure the function parameters as described in the following table.
Parameter
Description
Function Type
Select OTHER (Other Function).
Data Source
The data source to which the MaxCompute function belongs.
Class Name
Enter the class name based on the type and pattern of the Tablestore source table.
NoteYou can determine whether the Tablestore source table is a single-version or multi-version table by its MaxVersion property.
-
Single-version table
-
Class name:
com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
-
-
Multi-version table
-
Class name:
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1 -
Class name:
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
ImportantMulti-version pattern V1 and V2 are two different policies.
Multi-version pattern V1 manages versions at the row level. In the output, each version retains a complete snapshot of the entire row.
Multi-version pattern V2 manages versions at the column level. In the output, the versions of different columns are independent, and only the incremental changes of the columns are retained.
-
Type
Select Resource Function.
Resources
Select the name of the resource you created in Step 1.
-
-
To save the configuration, click Save.
-
-
Publish the function.
-
Click Publish.
-
On the Publish tab, you can optionally enter a description for the release and then click Start Publishing to Production.
-
Follow the process guide and click Confirm Publish.
-
Step 3: Write and run the ODPS SQL statement
Data Studio (legacy)
-
Create an ODPS SQL node.
-
On the Data Studio page in the DataStudio console, click New and select .
Alternatively, you can right-click the target business flow and choose .
-
In the New Node dialog box, select a path and enter a name.
-
Click Confirm.
-
-
On the node editing page, write the ODPS SQL statement.
The format of the SQL statement is as follows:
SELECT function_name(para_list) AS (custom_para_list) FROM( SELECT * FROM stream_table_name DISTRIBUTE BY primary_keys SORT by primary_keys, SequenceID )t;The following table describes the parameters.
Parameter
Description
function_name
The function name. Enter the name of the function you created in Step 2.
para_list
The parameter list. Configure the parameters for the Tablestore source table and the MaxCompute incremental table.
custom_para_list
The custom parameter list. Define the field names in the result.
stream_table_name
The name of the MaxCompute table that stores the incremental data from Tablestore.
primary_keys
The list of primary keys for the Tablestore source table.
SequenceID
The time series information. Enter `sequenceInfo`.
ImportantFor ODPS SQL examples and parameter settings, see Appendix: Pattern selection. Select the appropriate pattern (Single-version pattern, Multi-version pattern V1, or Multi-version pattern V2) based on the function that you created in Step 2.
-
Run the ODPS SQL statement.
Click the
icon. The results appear on the Result tab.
Data Studio (new version)
-
In the DataStudio console, navigate to the Data Studio page, click the
icon, and choose . -
In the New Node dialog box, select a path, enter a name, and click Confirm.
-
On the node editing page, write the ODPS SQL statement.
The format of the SQL statement is as follows:
SELECT function_name(para_list) AS (custom_para_list) FROM( SELECT * FROM stream_table_name DISTRIBUTE BY primary_keys SORT by primary_keys, SequenceID )t;The following table describes the parameters.
Parameter
Description
function_name
The function name. Enter the name of the function you created in Step 2.
para_list
The parameter list. Configure the parameters for the Tablestore source table and the MaxCompute incremental table.
custom_para_list
The custom parameter list. Define the field names in the result.
stream_table_name
The name of the MaxCompute table that stores the incremental data from Tablestore.
primary_keys
The list of primary keys for the Tablestore source table.
SequenceID
The time series information. Enter `sequenceInfo`.
ImportantFor ODPS SQL examples and parameter settings, see Appendix: Pattern selection. Select the appropriate pattern (Single-version pattern, Multi-version pattern V1, or Multi-version pattern V2) based on the function that you created in Step 2.
-
Run the ODPS SQL statement.
Click Run. The results display on the Result tab.
Appendix: Pattern selection
The patterns include single-version, multi-version V1, and multi-version V2. You can select the appropriate pattern based on your Tablestore table type.
Single-version pattern
Class name
com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
Parameter list
The format of the parameter list is pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo. For more information, see the following table.
| Parameter |
Type |
Description |
| pknum |
|
The number of primary keys in the Tablestore source table. |
| colnum |
|
The number of Tablestore attribute columns whose incremental changes need to be merged. |
| colnames |
|
The names of the Tablestore attribute columns whose incremental changes need to be merged. Separate the names with commas (,). |
| pknames |
|
The list of primary keys for the Tablestore source table. Separate the keys with commas (,). |
| colname |
|
The attribute column (provided by the `colname` field of the incremental table). |
| version |
|
The version number (provided by the `version` field of the incremental table). |
| colvalue |
|
The incremental value (provided by the `colvalue` field of the incremental table). |
| optype |
|
The type of incremental operation (provided by the `optype` field of the incremental table). |
| sequenceinfo |
|
The time series information (provided by the `sequenceInfo` field of the incremental table). |
Example
Assume that the Tablestore source table and the MaxCompute incremental table have the following schemas:
-
Tablestore source table schema: The primary keys are
pk1andpk2. The attribute columns are `col1`, `col2`, and `col3`. -
MaxCompute incremental table schema:
pk1,pk2,colname,version,colvalue,optype,sequenceinfo.
To convert the incremental data in the MaxCompute incremental table to a full data format, set the parameter list to 2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo and the custom parameter list to pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted.
The following code provides an example of the ODPS SQL statement:
SELECT mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
FROM(
SELECT * FROM stream_table_name
DISTRIBUTE BY pk1,pk2
SORT by pk1,pk2,sequenceInfo
)t;
After you run the ODPS SQL statement, the output is similar to the data in the following table.
| pk1 |
pk2 |
col1 |
co1_is_deleted |
col2 |
col2_is_deleted |
col3 |
col3_is_deleted |
| test |
0 |
\N |
\N |
20 |
\N |
\N |
True |
The outputs are as follows:
-
If the `col` column and the `col_is_deleted` column are both
\N, no incremental operation was performed on the `col` column. -
If the `col` column has a specific value and the `col_is_deleted` column is
\N, the value of the `col` column was changed to that specific value. -
If the `col` column is
\Nand the `col_is_deleted` column is `true`, the `col` column was deleted.
Multi-version pattern V1
Class name
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1
Parameter list
The format of the parameter list is pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo. For more information, see the following table.
| Parameter |
Type |
Description |
| pknum |
|
The number of primary keys in the Tablestore source table. |
| colnum |
|
The number of Tablestore attribute columns whose incremental changes need to be merged. |
| colnames |
|
The names of the Tablestore attribute columns whose incremental changes need to be merged. Separate the names with commas (,). |
| pknames |
|
The list of primary keys for the Tablestore source table. Separate the keys with commas (,). |
| colname |
|
The attribute column (provided by the `colname` field of the incremental table). |
| version |
|
The version number (provided by the `version` field of the incremental table). |
| colvalue |
|
The incremental value (provided by the `colvalue` field of the incremental table). |
| optype |
|
The type of incremental operation (provided by the `optype` field of the incremental table). |
| sequenceinfo |
|
The time series information (provided by the `sequenceInfo` field of the incremental table). |
Example
Assume that the Tablestore source table and the MaxCompute incremental table have the following schemas:
-
Tablestore source table schema: The primary keys are
pk1andpk2. The attribute columns are `col1`, `col2`, and `col3`. -
MaxCompute incremental table schema:
pk1,pk2,colname,version,colvalue,optype,sequenceinfo.
To convert the incremental data in the MaxCompute incremental table to a full data format, set the parameter list to 2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo and the custom parameter list to pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted.
The following code provides an example of the ODPS SQL statement:
SELECT mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
FROM(
SELECT * FROM stream_table_name
DISTRIBUTE BY pk1,pk2
SORT by pk1,pk2,sequenceInfo
)t;
After you run the ODPS SQL statement, the output is similar to the data in the following table.
| pk1 |
pk2 |
version |
col1 |
co1_is_deleted |
col2 |
col2_is_deleted |
col3 |
col3_is_deleted |
| test |
0 |
123 |
\N |
\N |
20 |
\N |
\N |
True |
The following table describes the output.
-
If the `version` column has a specific value, and the `col` and `col_is_deleted` columns are both
\N, no incremental operation was performed on the corresponding version of the `col` column. -
If the `version` and `col` columns both have specific values, and the `col_is_deleted` column is
\N, the value of the corresponding version of the `col` column was changed to the specific value. -
If the `version` column has a specific value, the `col` column is
\N, and the `col_is_deleted` column is `true`, the corresponding version of the `col` column was deleted. -
If the `version` and `col` columns are both
\N, and the `col_is_deleted` column is `true`, an operation was performed to delete all versions of a column.
Multi-version pattern V2
Class name
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
Parameter list
The format of the parameter list is pknum,colnum,maxversion,colnames,pknames,colname,version,colvalue,optype,sequenceinfo. For more information, see the following table.
| Parameter |
Type |
Description |
| pknum |
|
The number of primary keys in the Tablestore source table. |
| colnum |
|
The number of Tablestore attribute columns whose incremental changes need to be merged. |
| maxversion |
|
The maximum number of versions for the Tablestore source table. |
| colnames |
|
The names of the Tablestore attribute columns whose incremental changes need to be merged. Separate the names with commas (,). |
| pknames |
|
The list of primary keys for the Tablestore source table. Separate the keys with commas (,). |
| colname |
|
The attribute column (provided by the `colname` field of the incremental table). |
| version |
|
The version number (provided by the `version` field of the incremental table). |
| colvalue |
|
The incremental value (provided by the `colvalue` field of the incremental table). |
| optype |
|
The type of incremental operation (provided by the `optype` field of the incremental table). |
| sequenceinfo |
|
The time series information (provided by the `sequenceInfo` field of the incremental table). |
Example
Assume that the Tablestore source table and the MaxCompute incremental table have the following schemas:
-
Tablestore source table schema: The primary keys are
pk1andpk2. The attribute columns are `col1`, `col2`, and `col3`, and the maximum number of versions is 3. -
MaxCompute incremental table schema:
pk1,pk2,colname,version,colvalue,optype,sequenceinfo.
To convert the incremental data in the MaxCompute incremental table to a full data format, set the parameter list to 2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo and the custom parameter list to pk1,pk2,col1,col2,col3.
The following code provides an example of the ODPS SQL statement:
SELECT mergeCell(2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,col1,col2,col3)
FROM(
SELECT * FROM stream_table_name
DISTRIBUTE BY pk1,pk2
SORT by pk1,pk2,sequenceInfo
)t;
After you run the ODPS SQL statement, the output is similar to the data in the following table.
| pk1 |
pk2 |
col1 |
col2 |
col3 |
| test |
02 |
{"data":[{"version":1621330803390,"value":"value001"},{"version":1621330795198,"value":"value002"},{"version":1621330785936,"value":"value003"}],"needDeleteAllVersionFirst":true,"deleteVersions":[]} |
\N |
\N |
The outputs are as follows:
-
`data` indicates the list of newly written data, sorted by version number in descending order. A maximum of `maxversion` versions of data are retained.
-
`needDeleteAllVersionFirst` indicates whether all existing versions of the column must be deleted. This value is `true` if a `DeleteRow` or `DeleteColumns` operation occurs. Otherwise, this value is `false`.
-
`deleteVersions` indicates the list of versions to be deleted for the column, sorted by version number in descending order. A maximum of `maxversion` versions are retained.
The version numbers in `deleteVersions` are different from the version numbers in `data`. If `needDeleteAllVersionFirst` is `true`, `deleteVersions` is an empty list.
FAQ
Type conversion error occurs when running an SQL statement to convert the Tablestore data format
-
Symptom
When you run an SQL statement in DataWorks Data Studio to convert the Tablestore data format, the following error message is returned:
FAILED ODPS -0010000:System internal error -fuxi job failed, causer by: Failed in UDF/UDTF/UDAF com.aliyun.otsstream.utils.mergecell.oneversion.MergeCell class, at query location of line 1, column 8 -
Possible causes
-
The
odps.sql.type.system.odps2parameter is incorrectly configured. For example, it is set toodps.sql.type.system.odps2=true. -
The field types in Tablestore and MaxCompute are inconsistent during data synchronization.
-
-
Solution
-
Add
set odps.sql.type.system.odps2=false;before the SQL statement and run them together. -
Check for inconsistent field types in the sync task. If you find any, modify them, synchronize the data again, and then convert the incremental data to full data.
-
Attribute columns cannot be mapped to values and are shown as deleted
-
Symptom
After you run the SQL statement, some attribute columns in the result cannot be mapped to their actual values, and the corresponding `is_deleted` field for these columns has a value of
True.For example, the values in the `col1` attribute column are all
\Nand the values in the `col1_is_deleted` column are allTrue. This indicates that the `col1` column has been deleted, but the `col1` column is not empty. -
Possible cause
When you configure an offline sync task, setting the Export Time Series Information parameter to
"isExportSequenceInfo": falsecauses the system to incorrectly determine that the attribute column is deleted.NoteThe `sequenceInfo` field stores the historical version information of attribute columns. If `sequenceInfo` is not exported, the version information may not be parsed correctly when you convert incremental data to a full data format. This can prevent data from the corresponding versions from being mapped to the target rows.
-
Solution
In Step 3: Configure an offline sync task, modify the parameter settings for Export Time Series Information, resynchronize the Tablestore incremental data, and execute an SQL statement to convert the data format.
-
Codeless UI
In the Configure Data Source and Destination section of the Configure Task step, select the Export Time Series Information check box and then save and commit the task.
-
Code editor
Change the value of the `isExportSequenceInfo` parameter in the Tablestore Stream Reader to
true. Then, save and commit the task.
-