Tablestore Stream
DataWorks data integration uses the Tablestore Stream Reader to read incremental data from Tablestore. This topic describes the data reading capabilities of the Tablestore Stream Reader.
Prepare the Tablestore Stream environment
Before you use the Tablestore Stream plugin, you must enable the Stream feature for the Tablestore table. The Stream feature is enabled by default for time series tables. You can enable the feature when you create a table or using the UpdateTable API of the software development kit (SDK). The following code shows how to enable the Stream feature.
SyncClient client = new SyncClient("", "", "", "");
// Method 1: Enable when creating a table:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24 indicates that incremental data is retained for 24 hours.
client.createTable(createTableRequest);
// Method 2: If the feature was not enabled during table creation, you can enable it using UpdateTable:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);When you use the `UpdateTable` API of the SDK to enable the feature:
Enabling the Stream feature and setting a time-to-live (TTL) enables incremental data exporting for Tablestore. After you enable the Stream feature, the Tablestore server saves your operation logs. Each partition has an ordered operation log queue. Each operation log is removed by garbage collection after a specified period. This period is the TTL you set.
The Tablestore SDK provides several Stream-related APIs to read these operation logs. The incremental plugin uses these APIs to retrieve the incremental data. In column mode, incremental data is converted into multiple 6-tuples: `pk`, `colName`, `version`, `colValue`, `opType`, and `sequenceInfo`. In row mode, incremental data is exported as regular rows.
Supported synchronization modes and field types
The Tablestore Stream Reader plugin supports synchronizing incremental data from Tablestore in column mode or row mode. The following sections describe the synchronization process and field type requirements for each mode.
Column mode
In the multi-version mode of Tablestore, data is organized in a row > column > version hierarchy. A row can have any number of columns, and the column names are not fixed. Each column can have multiple versions, and each version has a specific timestamp, or version number.
You can perform read and write operations using the Tablestore API. Tablestore records your recent write operations, or data changes, as incremental data. You can think of incremental data as a batch of operation records.
Tablestore supports `PutRow`, `UpdateRow`, and `DeleteRow` operations:
`PutRow`: Writes a row. If the row already exists, it overwrites the row.
`UpdateRow`: Updates a row without changing other data in the original row. Updates include adding or overwriting some column values, deleting all versions of a column, or deleting a specific version of a column. An overwrite occurs if the corresponding version of the corresponding column already exists.
`DeleteRow`: Deletes a row.
Tablestore generates corresponding incremental data records for each operation. The Reader plugin reads these records and exports them in the data integration data format.
Because Tablestore features dynamic columns and multiple versions, a row exported by the Reader plugin corresponds to a single version of a column in Tablestore, not an entire Tablestore row. This means one row in Tablestore can be exported as multiple rows. Each exported row contains the primary key value, the column name, the timestamp (version number) for that version, the value of that version, and the operation type. If you set `isExportSequenceInfo` to `true`, the timing information is also included.
After conversion to the data integration data format, the following four operation types are defined:
`U` (UPDATE): Writes one version of a column.
`DO` (DELETE_ONE_VERSION): Deletes a specific version of a column.
`DA` (DELETE_ALL_VERSION): Deletes all versions of a column. Use the primary key and column name to delete all versions of the corresponding column.
`DR` (DELETE_ROW): Deletes a row. Use the primary key to delete the row.
Assume the table has two primary key columns named `pkName1` and `pkName2`. An example is as follows:
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pk1_V1 | pk2_V1 | col_a | 1441803688001 | col_val1 | U |
pk1_V1 | pk2_V1 | col_a | 1441803688002 | col_val2 | U |
pk1_V1 | pk2_V1 | col_b | 1441803688003 | col_val3 | U |
pk1_V2 | pk2_V2 | col_a | 1441803688000 | — | DO |
pk1_V2 | pk2_V2 | col_b | — | — | DA |
pk1_V3 | pk2_V3 | — | — | — | DR |
pk1_V3 | pk2_V3 | col_a | 1441803688005 | col_val1 | U |
The example table shows 7 exported rows. These rows correspond to 3 rows in the Tablestore table with the primary keys (`pk1_V1`, `pk2_V1`), (`pk1_V2`, `pk2_V2`), and (`pk1_V3`, `pk2_V3`):
For the row with the primary key (`pk1_V1`, `pk2_V1`), operations include writing two versions of the `col_a` column and one version of the `col_b` column.
For the row with the primary key (`pk1_V2`, `pk2_V2`), operations include deleting one version of the `col_a` column and deleting all versions of the `col_b` column.
For the row with the primary key (`pk1_V3`, `pk2_V3`), operations include deleting the entire row and writing one version of the `col_a` column.
Row mode
Wide table
You can export data in row mode. This mode extracts each update record and exports it as a row. To do this, set the `mode` property and configure the column names.
"parameter": { // Configure the following three items in the parameter object. Other configuration items such as datasource and table should be configured as usual. "mode": "single_version_and_update_only", // Configure the export mode. "column":[ // Add the columns from Tablestore that you want to export. You can add as many as you need. { "name": "uid" // Example column name. It can be a primary key or attribute column. }, { "name": "name" // Example column name. It can be a primary key or attribute column. }, ], "isExportSequenceInfo": false, // This must be false in single_version_and_update_only mode. }Time series table
The Stream feature is automatically enabled when a time series table is created. You do not need to enable it manually.
The Tablestore Stream Reader supports exporting incremental data from time series tables. If the table is a time series table, configure the following information:
"parameter": { // Configure the following four items in the parameter object. Other configuration items such as datasource and table should be configured as usual. "mode": "single_version_and_update_only", // Configure the export mode. "isTimeseriesTable":"true", // Configure the export for a time series table. "column":[ // Add the columns from Tablestore that you want to export. You can add as many as you need. { "name": "_m_name" // The measurement name field. }, { "name": "_data_source" // The data source field. }, { "name": "_tags" // The tag field. Converts tags to the string type. }, { "name": "tag1", // The key name of an internal tag field. "is_timeseries_tag":"true" // Indicates that this field is an internal field of tags. }, { "name": "time" // The timestamp field. }, { "name": "name" // The attribute column name. }, ], "isExportSequenceInfo": false, // This must be false in single_version_and_update_only mode. }Data exported in row mode is more similar to the original rows and easier to process. However, note the following:
Each exported row is extracted from an update record and corresponds to a single write or update operation. If a user updates only specific columns, some exported records contain values only for the updated columns. The other columns are empty.
Row mode does not export data version numbers (the timestamp for each column). Delete operations are also not supported.
Data type conversion list
The Tablestore Stream Reader supports all Tablestore data types. The following table lists the data type conversions.
Type classification | Tablestore Stream data type |
Integer | INTEGER |
Floating-point | DOUBLE |
String | STRING |
Boolean | BOOLEAN |
Binary | BINARY |
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
For instructions, see Codeless UI configuration and Code editor configuration.
For a complete list of parameters and a script example for the code editor, see Appendix: Script demo and parameter description.
Appendix: Script demo and parameter description
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization node by using the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Column mode
{ "type":"job", "version":"2.0",// Version number. "steps":[ { "stepType":"otsstream",// Plugin name. "parameter":{ "datasource":"$srcDatasource",// Data source. "dataTable":"",// Table name. "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table used to record status. "maxRetries":30,// The maximum number of retries for each request when reading incremental data from Tablestore. The default is 30. "isExportSequenceInfo":false,// Specifies whether to export timing information. "startTimeString":"${startTime}${hh}",// The left boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure startTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the start time for the Tablestore read operation is the scheduled time of the data timestamp. "endTimeString":"${endTime}${hh}"// The right boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure endTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the end time for the Tablestore read operation is the scheduled time of the data timestamp. }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// Number of error records. }, "speed":{ "throttle":true,// If throttle is false, the mbps parameter has no effect, which means no throttling. If throttle is true, throttling is enabled. "concurrent":1,// Job concurrency. "mbps":"12"// Throttling. Here, 1 mbps = 1 MB/s. } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }Row mode for reading a wide table
{ "type":"job", "version":"2.0",// Version number. "steps":[ { "stepType":"otsstream",// Plugin name. "parameter":{ "datasource":"$srcDatasource",// Data source. "dataTable":"",// Table name. "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table used to record status. "maxRetries":30,// The maximum number of retries for each request when reading incremental data from Tablestore. The default is 30. "isExportSequenceInfo":false,// Specifies whether to export timing information. "startTimeString":"${startTime}${hh}",// The left boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure startTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the start time for the Tablestore read operation is the scheduled time of the data timestamp. "endTimeString":"${endTime}${hh}",// The right boundary of the incremental data time range (left-closed, right-open). In the parameter settings, configure endTime=${yyyymmdd} and hh=$[hh24miss]. This indicates that the end time for the Tablestore read operation is the scheduled time of the data timestamp. "mode": "single_version_and_update_only", "column":[ { "name":"pId" }, { "name": "uId" }, { "name":"col0" }, { "name": "col1" } ], }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// Number of error records. }, "speed":{ "throttle":true,// If throttle is false, the mbps parameter has no effect, which means no throttling. If throttle is true, throttling is enabled. "concurrent":1,// Job concurrency. "mbps":"12"// Throttling. } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }Reading a time series table using the row pattern
{ "type":"job", "version":"2.0",// The version number. "steps":[ { "stepType":"otsstream",// The plug-in name. "parameter":{ "datasource":"$srcDatasource",// The data source. "dataTable":"",// The name of the table. "statusTable":"TableStoreStreamReaderStatusTable",// The name of the table that Tablestore Stream Reader uses to store status records. "maxRetries":30,// The maximum number of retries for each request to read incremental data from Tablestore. Default value: 30. "isExportSequenceInfo":false,// Specifies whether to read time series information. "startTimeString":"${startTime}${hh}",// The left boundary of the time range for incremental data. The time range is left-closed and right-open. In the parameter settings, `startTime=${yyyymmdd}` and `hh=$[hh24miss]` specify that the start time for reading data from Tablestore is the scheduled time of the data timestamp. "endTimeString":"${endTime}${hh}",// The right boundary of the time range for incremental data. The time range is left-closed and right-open. In the parameter settings, `endTime=${yyyymmdd}` and `hh=$[hh24miss]` specify that the end time for reading data from Tablestore is the scheduled time of the data timestamp. "mode": "single_version_and_update_only", "isTimeseriesTable":"true", "column": [ { "name": "_m_name" }, { "name": "_data_source", }, { "name": "_tags", }, { "name": "string_column", } ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// The number of error records. }, "speed":{ "throttle":true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. "concurrent":1,// The job concurrency. "mbps":"12"// The maximum transmission rate. Unit: MB/s. } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. The code editor supports adding data sources. The value of this parameter must be the same as the name of the added data source. | Yes | None |
dataTable | The name of the table from which to export incremental data. The Stream feature needs to be enabled for this table. You can enable it when creating the table or using the `UpdateTable` API. | Yes | None |
statusTable | The name of the table that the Reader plugin uses to record its status. This status information helps reduce scans of data outside the target range and speeds up the export process. `statusTable` is the table used by the Reader to save status. If the table does not exist, the Reader automatically creates it. After an offline export task is complete, you do not need to delete this table. The status recorded in it can be used for the next export task.
Configure a name such as `TableStoreStreamReaderStatusTable`. Make sure the name does not conflict with any of your business-related tables. | Yes | None |
startTimestampMillis | The left boundary of the incremental data time range (left-closed, right-open), in milliseconds:
| No | None |
endTimestampMillis | The right boundary of the incremental data time range (left-closed, right-open), in milliseconds:
| No | None |
date | The date format is `yyyyMMdd`, for example, 20151111, which means to export data for that day. If `date` is not specified, you need to specify `startTimestampMillis` and `endTimestampMillis` or `startTimeString` and `endTimeString`, and vice versa. For example, Data Process Center scheduling supports only day-level granularity. This parameter is provided for that purpose and functions similarly to `startTimestampMillis` and `endTimestampMillis` or `startTimeString` and `endTimeString`. | No | None |
isExportSequenceInfo | Specifies whether to export timing information, which includes details such as the data write time. The default value is `false`, which means this information is not exported. | No | false |
maxRetries | The maximum number of retries for each request when reading incremental data from Tablestore. The default is 30. Retries are spaced out, and 30 retries take about 5 minutes in total. You usually do not need to change this value. | No | 30 |
startTimeString | The start time of the task, which is the left boundary of the incremental data time range (left-closed, right-open). The format is | No | None |
endTimeString | The end time of the task, which is the right boundary of the incremental data time range (left-closed, right-open). The format is | No | None |
enableSeekIterator | The Reader plugin must first determine the incremental offset before it can pull data. For frequently run tasks, the plugin determines the position based on the previously scanned offset. If the plugin has not run before, it starts scanning from the beginning of the available incremental data. By default, incremental data is retained for 7 days. This can cause a delay at the start of the export task where no data is exported until the scan reaches the specified start time. To speed up offset positioning, add | No | false |
mode | The export mode. Set to `single_version_and_update_only` for row mode. If not set, it defaults to column mode. | No | None |
isTimeseriesTable | Specifies whether it is a time series table. This configuration only takes effect in row mode, that is, when `mode` is `single_version_and_update_only`. | No | false |
column | Configures the data columns to export when `mode` is set to
Note This parameter is required in row mode. Otherwise, no data is exported. |
| None |