首页 Tablestore Stream

Tablestore Stream

更新时间: 2026-03-26 05:35:33

DataWorks data integration uses the Tablestore Stream Reader to read incremental data from Tablestore. This topic describes the data reading capabilities of the Tablestore Stream Reader.

Prepare the Tablestore Stream environment

Before you use the Tablestore Stream plugin, you must enable the Stream feature for the Tablestore table. The Stream feature is enabled by default for time series tables. You can enable the feature when you create a table or using the UpdateTable API of the software development kit (SDK). The following code shows how to enable the Stream feature.

SyncClient client = new SyncClient("", "", "", "");
// Method 1: Enable when creating a table:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24 indicates that incremental data is retained for 24 hours.
client.createTable(createTableRequest);
// Method 2: If the feature was not enabled during table creation, you can enable it using UpdateTable:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);

When you use the `UpdateTable` API of the SDK to enable the feature:

  • Enabling the Stream feature and setting a time-to-live (TTL) enables incremental data exporting for Tablestore. After you enable the Stream feature, the Tablestore server saves your operation logs. Each partition has an ordered operation log queue. Each operation log is removed by garbage collection after a specified period. This period is the TTL you set.

  • The Tablestore SDK provides several Stream-related APIs to read these operation logs. The incremental plugin uses these APIs to retrieve the incremental data. In column mode, incremental data is converted into multiple 6-tuples: `pk`, `colName`, `version`, `colValue`, `opType`, and `sequenceInfo`. In row mode, incremental data is exported as regular rows.

Supported synchronization modes and field types

The Tablestore Stream Reader plugin supports synchronizing incremental data from Tablestore in column mode or row mode. The following sections describe the synchronization process and field type requirements for each mode.

Column mode

In the multi-version mode of Tablestore, data is organized in a row > column > version hierarchy. A row can have any number of columns, and the column names are not fixed. Each column can have multiple versions, and each version has a specific timestamp, or version number.

You can perform read and write operations using the Tablestore API. Tablestore records your recent write operations, or data changes, as incremental data. You can think of incremental data as a batch of operation records.

Tablestore supports `PutRow`, `UpdateRow`, and `DeleteRow` operations:

  • `PutRow`: Writes a row. If the row already exists, it overwrites the row.

  • `UpdateRow`: Updates a row without changing other data in the original row. Updates include adding or overwriting some column values, deleting all versions of a column, or deleting a specific version of a column. An overwrite occurs if the corresponding version of the corresponding column already exists.

  • `DeleteRow`: Deletes a row.

Tablestore generates corresponding incremental data records for each operation. The Reader plugin reads these records and exports them in the data integration data format.

Because Tablestore features dynamic columns and multiple versions, a row exported by the Reader plugin corresponds to a single version of a column in Tablestore, not an entire Tablestore row. This means one row in Tablestore can be exported as multiple rows. Each exported row contains the primary key value, the column name, the timestamp (version number) for that version, the value of that version, and the operation type. If you set `isExportSequenceInfo` to `true`, the timing information is also included.

After conversion to the data integration data format, the following four operation types are defined:

  • `U` (UPDATE): Writes one version of a column.

  • `DO` (DELETE_ONE_VERSION): Deletes a specific version of a column.

  • `DA` (DELETE_ALL_VERSION): Deletes all versions of a column. Use the primary key and column name to delete all versions of the corresponding column.

  • `DR` (DELETE_ROW): Deletes a row. Use the primary key to delete the row.

Assume the table has two primary key columns named `pkName1` and `pkName2`. An example is as follows:

pkName1

pkName2

columnName

timestamp

columnValue

opType

pk1_V1

pk2_V1

col_a

1441803688001

col_val1

U

pk1_V1

pk2_V1

col_a

1441803688002

col_val2

U

pk1_V1

pk2_V1

col_b

1441803688003

col_val3

U

pk1_V2

pk2_V2

col_a

1441803688000

DO

pk1_V2

pk2_V2

col_b

DA

pk1_V3

pk2_V3

DR

pk1_V3

pk2_V3

col_a

1441803688005

col_val1

U

The example table shows 7 exported rows. These rows correspond to 3 rows in the Tablestore table with the primary keys (`pk1_V1`, `pk2_V1`), (`pk1_V2`, `pk2_V2`), and (`pk1_V3`, `pk2_V3`):

  • For the row with the primary key (`pk1_V1`, `pk2_V1`), operations include writing two versions of the `col_a` column and one version of the `col_b` column.

  • For the row with the primary key (`pk1_V2`, `pk2_V2`), operations include deleting one version of the `col_a` column and deleting all versions of the `col_b` column.

  • For the row with the primary key (`pk1_V3`, `pk2_V3`), operations include deleting the entire row and writing one version of the `col_a` column.

Row mode

  • Wide table

    You can export data in row mode. This mode extracts each update record and exports it as a row. To do this, set the `mode` property and configure the column names.

    "parameter": {
      // Configure the following three items in the parameter object. Other configuration items such as datasource and table should be configured as usual.
      "mode": "single_version_and_update_only", // Configure the export mode.
      "column":[  // Add the columns from Tablestore that you want to export. You can add as many as you need.
              {
                 "name": "uid"  // Example column name. It can be a primary key or attribute column.
              },
              {
                 "name": "name"  // Example column name. It can be a primary key or attribute column.
              },
      ],
      "isExportSequenceInfo": false, // This must be false in single_version_and_update_only mode.
    }
  • Time series table

    The Stream feature is automatically enabled when a time series table is created. You do not need to enable it manually.

    The Tablestore Stream Reader supports exporting incremental data from time series tables. If the table is a time series table, configure the following information:

    "parameter": {
      // Configure the following four items in the parameter object. Other configuration items such as datasource and table should be configured as usual.
      "mode": "single_version_and_update_only", // Configure the export mode.
      "isTimeseriesTable":"true",  // Configure the export for a time series table.
      "column":[  // Add the columns from Tablestore that you want to export. You can add as many as you need.
              {
                "name": "_m_name"       // The measurement name field.
              },
              {
                "name": "_data_source"  // The data source field.
              },
              {
                "name": "_tags"         // The tag field. Converts tags to the string type.
              },
              {
                "name": "tag1",       // The key name of an internal tag field.
                "is_timeseries_tag":"true"  // Indicates that this field is an internal field of tags.
              },
              {
                "name": "time"          // The timestamp field.
              },
              {
                 "name": "name"         // The attribute column name.
              },
      ],
      "isExportSequenceInfo": false, // This must be false in single_version_and_update_only mode.
    }

    Data exported in row mode is more similar to the original rows and easier to process. However, note the following:

    • Each exported row is extracted from an update record and corresponds to a single write or update operation. If a user updates only specific columns, some exported records contain values only for the updated columns. The other columns are empty.

    • Row mode does not export data version numbers (the timestamp for each column). Delete operations are also not supported.

Data type conversion list

The Tablestore Stream Reader supports all Tablestore data types. The following table lists the data type conversions.

Type classification

Tablestore Stream data type

Integer

INTEGER

Floating-point

DOUBLE

String

STRING

Boolean

BOOLEAN

Binary

BINARY

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Appendix: Script demo and parameter description

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization node by using the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

  • Column mode

    {
        "type":"job",
        "version":"2.0",// Version number.
        "steps":[
            {
                "stepType":"otsstream",// Plugin name.
                "parameter":{
                    "datasource":"$srcDatasource",// Data source.
                    "dataTable":"",// Table name.
                    "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table used to record status.
                    "maxRetries":30,// The maximum number of retries for each request when reading incremental data from Tablestore. The default is 30.
                    "isExportSequenceInfo":false,// Specifies whether to export timing information.
                    "startTimeString":"${startTime}${hh}",// The left boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure startTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the start time for the Tablestore read operation is the scheduled time of the data timestamp.
                    "endTimeString":"${endTime}${hh}"// The right boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure endTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the end time for the Tablestore read operation is the scheduled time of the data timestamp.
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// Number of error records.
            },
            "speed":{
                "throttle":true,// If throttle is false, the mbps parameter has no effect, which means no throttling. If throttle is true, throttling is enabled.
                "concurrent":1,// Job concurrency.
                "mbps":"12"// Throttling. Here, 1 mbps = 1 MB/s.
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • Row mode for reading a wide table

    {
        "type":"job",
        "version":"2.0",// Version number.
        "steps":[
            {
                "stepType":"otsstream",// Plugin name.
                "parameter":{
                    "datasource":"$srcDatasource",// Data source.
                    "dataTable":"",// Table name.
                    "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table used to record status.
                    "maxRetries":30,// The maximum number of retries for each request when reading incremental data from Tablestore. The default is 30.
                    "isExportSequenceInfo":false,// Specifies whether to export timing information.
                    "startTimeString":"${startTime}${hh}",// The left boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure startTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the start time for the Tablestore read operation is the scheduled time of the data timestamp.
                    "endTimeString":"${endTime}${hh}",// The right boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure endTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the end time for the Tablestore read operation is the scheduled time of the data timestamp.
                    "mode": "single_version_and_update_only",
                    "column":[
                            {
                                "name":"pId"
                            },
                            {
                                "name": "uId"
                            },
                            {
                                "name":"col0"
                            },
                            {
                                "name": "col1"
                            }
                        ],
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// Number of error records.
            },
            "speed":{
                "throttle":true,// If throttle is false, the mbps parameter has no effect, which means no throttling. If throttle is true, throttling is enabled.
                "concurrent":1,// Job concurrency.
                "mbps":"12"// Throttling.
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • Reading a time series table using the row pattern

    {
        "type":"job",
        "version":"2.0",// The version number.
        "steps":[
            {
                "stepType":"otsstream",// The plug-in name.
                "parameter":{
                    "datasource":"$srcDatasource",// The data source.
                    "dataTable":"",// The name of the table.
                    "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table that Tablestore Stream Reader uses to store status records.
                    "maxRetries":30,// The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30.
                    "isExportSequenceInfo":false,// Specifies whether to read time series information.
                    "startTimeString":"${startTime}${hh}",// The left boundary of the time range for incremental data. The time range is left-closed and right-open. In the parameter settings, `startTime=${yyyymmdd}` and `hh=$[hh24miss]` specify that the start time for reading data from Tablestore is the scheduled time of the data timestamp.
                    "endTimeString":"${endTime}${hh}",// The right boundary of the time range for incremental data. The time range is left-closed and right-open. In the parameter settings, `endTime=${yyyymmdd}` and `hh=$[hh24miss]` specify that the end time for reading data from Tablestore is the scheduled time of the data timestamp.
                    "mode": "single_version_and_update_only",
                    "isTimeseriesTable":"true",
                    "column": [
                              {
                                "name": "_m_name"
                              },
                              {
                                "name": "_data_source",
                              },
                              {
                                "name": "_tags",
                              },
                              {
                                "name": "string_column",
                              }
                        ]
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"// The number of error records.
            },
            "speed":{
                "throttle":true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
                "concurrent":1,// The job concurrency.
                "mbps":"12"// The maximum transmission rate. Unit: MB/s.
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }

Reader script parameters

Parameter

Description

Required

Default value

datasource

The name of the data source. The code editor supports adding data sources. The value of this parameter must be the same as the name of the added data source.

Yes

None

dataTable

The name of the table from which to export incremental data. The Stream feature needs to be enabled for this table. You can enable it when creating the table or using the `UpdateTable` API.

Yes

None

statusTable

The name of the table that the Reader plugin uses to record its status. This status information helps reduce scans of data outside the target range and speeds up the export process. `statusTable` is the table used by the Reader to save status. If the table does not exist, the Reader automatically creates it. After an offline export task is complete, you do not need to delete this table. The status recorded in it can be used for the next export task.

  • You do not need to create this table. Just provide a table name. The Reader plugin attempts to create the table in your instance. If the table does not exist, it is created. If the table already exists, the plugin checks whether the table's metadata matches the expected schema. If it does not match, an exception is thrown.

  • After an export is complete, you do not need to delete this table. The status of this table can be used for the next export task.

  • TTL is enabled for this table, so data expires automatically. The data volume is expected to be small.

  • You can use the same `statusTable` for Reader configurations of different `dataTable`s in the same instance. The status information for each table is recorded independently and does not interfere with others.

Configure a name such as `TableStoreStreamReaderStatusTable`. Make sure the name does not conflict with any of your business-related tables.

Yes

None

startTimestampMillis

The left boundary of the incremental data time range (left-closed, right-open), in milliseconds:

  • The Reader plugin finds the offset for `startTimestampMillis` in the `statusTable` and starts reading and exporting data from that point.

  • If the corresponding offset cannot be found in the `statusTable`, it starts reading from the first piece of incremental data retained by the system and skips data with a write time less than `startTimestampMillis`.

No

None

endTimestampMillis

The right boundary of the incremental data time range (left-closed, right-open), in milliseconds:

  • The Reader plugin starts exporting data from the `startTimestampMillis` position. When it encounters the first record with a timestamp greater than or equal to `endTimestampMillis`, the export process stops and is considered complete.

  • The read operation also ends when all available incremental data is read, even if the `endTimestampMillis` has not been reached.

No

None

date

The date format is `yyyyMMdd`, for example, 20151111, which means to export data for that day. If `date` is not specified, you need to specify `startTimestampMillis` and `endTimestampMillis` or `startTimeString` and `endTimeString`, and vice versa. For example, Data Process Center scheduling supports only day-level granularity. This parameter is provided for that purpose and functions similarly to `startTimestampMillis` and `endTimestampMillis` or `startTimeString` and `endTimeString`.

No

None

isExportSequenceInfo

Specifies whether to export timing information, which includes details such as the data write time. The default value is `false`, which means this information is not exported.

No

false

maxRetries

The maximum number of retries for each request when reading incremental data from Tablestore. The default is 30. Retries are spaced out, and 30 retries take about 5 minutes in total. You usually do not need to change this value.

No

30

startTimeString

The start time of the task, which is the left boundary of the incremental data time range (left-closed, right-open). The format is `yyyymmddhh24miss`, and the unit is seconds.

No

None

endTimeString

The end time of the task, which is the right boundary of the incremental data time range (left-closed, right-open). The format is `yyyymmddhh24miss`, and the unit is seconds.

No

None

enableSeekIterator

The Reader plugin must first determine the incremental offset before it can pull data. For frequently run tasks, the plugin determines the position based on the previously scanned offset. If the plugin has not run before, it starts scanning from the beginning of the available incremental data. By default, incremental data is retained for 7 days. This can cause a delay at the start of the export task where no data is exported until the scan reaches the specified start time. To speed up offset positioning, add "enableSeekIterator": true to the reader's parameters.

No

false

mode

The export mode. Set to `single_version_and_update_only` for row mode. If not set, it defaults to column mode.

No

None

isTimeseriesTable

Specifies whether it is a time series table. This configuration only takes effect in row mode, that is, when `mode` is `single_version_and_update_only`.

No

false

column

Configures the data columns to export when `mode` is set to `single_version_and_update_only`. The following code provides a configuration example:

"column":[
    {"name":"pk1"},
	{"name":"col1"},
	{"name":"col2","dataType":"new"},
	{"name":"col2","dataType":"old"},
	{"name":"col2","dataType":"latest"}
],
  • The `name` field specifies the name of the data column to export. This field is required.

  • The `dataType` field specifies the type of data to export. This field is optional and defaults to `new`. The `dataType` field supports three enumerated values:

    • `new`: The value of this column after the update.

    • `old`: The value of this column before the update.

    • `latest`: The current latest value of this column in the table.

Note

This parameter is required in row mode. Otherwise, no data is exported.

  • Row mode: Yes

  • Column mode: No

None

阿里云首页 大数据开发治理平台 DataWorks 相关技术圈