The Tablestore connector lets you use Tablestore tables as source tables, dimension tables, and sink tables in Flink SQL jobs running in streaming mode.
Connector capabilities
| Item | Description |
|---|---|
| Running mode | Streaming mode |
| API type | SQL API |
| Table type | Source table, dimension table, and sink table |
| Data format | N/A |
| Sink table metrics | numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, currentSendTime |
| Data update or deletion in a sink table | Supported |
For details on sink metrics, see Monitoring metrics.
Prerequisites
Before you begin, ensure that you have:
-
A Tablestore instance and a Tablestore table. See Use Tablestore.
Usage limits
Cross-account access to Tablestore instances is supported. When using a VPC endpoint, the Tablestore instance must be in the same region as Flink. Set the accessId and accessKey to the AccessKey pair of the account that owns the Tablestore instance.
Syntax
All three table types use 'connector'='ots' in the WITH clause, with type-specific options.
Sink table
CREATE TABLE ots_sink (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name, age) NOT ENFORCED
) WITH (
'connector'='ots',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'endPoint'='<yourEndpoint>',
'valueColumns'='birthday'
);
A Tablestore sink table requires a primary key. Each output record is appended to the table to update existing data.
Dimension table
CREATE TABLE ots_dim (
id INT,
len INT,
content STRING
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}'
);
Source table
CREATE TABLE tablestore_stream (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-source',
'tableName'='flink_source_table',
'tunnelName'='flinksourcestream',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'ignoreDelete'='false'
);
Available metadata
The Tablestore source table exposes two metadata fields via the METADATA keyword. Use these fields to track the operation type and timing of each change event.
| Metadata key | Flink data type | Description |
|---|---|---|
type |
STRING | The data operation type (maps to OtsRecordType). |
timestamp |
BIGINT | The data operation time in microseconds (maps to OtsRecordTimestamp). Set to 0 for full data reads. |
To read metadata fields, declare them with the METADATA FROM syntax:
CREATE TABLE tablestore_stream (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
Connector options
General options
All table types share the following options.
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Set to ots. |
instanceName |
String | Yes | — | Name of the Tablestore instance. |
endPoint |
String | Yes | — | Endpoint of the Tablestore instance. See Endpoints. |
tableName |
String | Yes | — | Name of the table. |
accessId |
String | Yes | — | AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user. See How do I view the AccessKey ID and AccessKey secret? |
accessKey |
String | Yes | — | AccessKey secret of your Alibaba Cloud account or a RAM user. |
connectTimeout |
Integer | No | 30000 | Connection timeout in milliseconds. |
socketTimeout |
Integer | No | 30000 | Socket timeout in milliseconds. |
ioThreadCount |
Integer | No | 4 | Number of I/O threads. |
callbackThreadPoolSize |
Integer | No | 4 | Size of the callback thread pool. |
Use variables to store your AccessKey pair instead of hardcoding it.
Source table options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
tunnelName |
String | Yes | — | Name of the Tablestore tunnel. Create the tunnel in the Tablestore console before using this option. Supported tunnel types: Incremental, Full, and Differential. See the "Create a tunnel" section in Quick start. |
ignoreDelete |
Boolean | No | false | Whether to skip delete operations. true: skip; false: process delete operations. |
skipInvalidData |
Boolean | No | false | Whether to skip dirty data. true: skip dirty data; false: report an error. Requires Ververica Runtime (VVR) 8.0.4 or later. |
retryStrategy |
Enum | No | TIME | Retry policy. TIME: retry until retryTimeoutMs elapses; COUNT: retry until retryCount is reached. |
retryCount |
Integer | No | 3 | Maximum number of retries. Applies when retryStrategy is COUNT. |
retryTimeoutMs |
Integer | No | 180000 | Retry timeout in milliseconds. Applies when retryStrategy is TIME. |
streamOriginColumnMapping |
String | No | — | Mapping from original column names to actual column names. Format: origin_col1:col1,origin_col2:col2. |
outputSpecificRowType |
Boolean | No | false | Whether to pass through the specific row type. false: all rows are treated as INSERT; true: rows can be INSERT, DELETE, or UPDATE_AFTER. |
dataFetchTimeoutMs |
Integer | No | 10000 | Maximum time in milliseconds to fetch data from a single partition. Reduce this value to lower overall synchronization latency when syncing many partitions. Requires VVR 8.0.10 or later. |
enableRequestCompression |
Boolean | No | false | Whether to enable request compression. Reduces bandwidth usage at the cost of higher CPU load. Requires VVR 8.0.10 or later. |
Sink table options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
valueColumns |
String | Yes | — | Names of the columns to write. Separate multiple column names with commas (,). |
retryIntervalMs |
Integer | No | 1000 | Retry interval in milliseconds. |
maxRetryTimes |
Integer | No | 10 | Maximum number of retries. |
bufferSize |
Integer | No | 5000 | Maximum number of records buffered before a write is triggered. |
batchWriteTimeoutMs |
Integer | No | 5000 | Write timeout in milliseconds. If buffered records don't reach bufferSize within this period, all buffered records are written. |
batchSize |
Integer | No | 100 | Number of records written per batch. Maximum: 200. |
ignoreDelete |
Boolean | No | false | Whether to skip delete operations. |
autoIncrementKey |
String | No | — | Name of the auto-increment primary key column. Configure only if the sink table has an auto-increment primary key column. Requires VVR 8.0.4 or later. |
overwriteMode |
Enum | No | PUT | Write mode. PUT: overwrite in PUT mode; UPDATE: overwrite in UPDATE mode. Dynamic column mode requires UPDATE. |
defaultTimestampInMillisecond |
Long | No | -1 | Default timestamp for writes. If not set, the current system time is used. |
dynamicColumnSink |
Boolean | No | false | Whether to enable dynamic column mode. In this mode, no columns are pre-defined; columns are inserted based on runtime values. The first N columns define the primary key. The second-to-last column holds the column name and the last column holds its value — both must be STRING. If enabled, overwriteMode must be UPDATE and auto-increment primary keys are not supported. |
checkSinkTableMeta |
Boolean | No | true | Whether to verify that the Tablestore table's primary key matches the primary key declared in the CREATE TABLE statement. |
enableRequestCompression |
Boolean | No | false | Whether to enable request compression during writes. |
maxColumnsCount |
Integer | No | 128 | Maximum number of columns written to the sink table. If set above 128, the error The count of attribute columns exceeds the maximum occurs. Requires VVR 8.0.10 or later. |
storageType |
String | No | WIDE_COLUMN |
Sink table type. WIDE_COLUMN: wide-column table; TIMESERIES: time series table. |
Dimension table options
How the cache works
The dimension table cache reduces repeated lookups against Tablestore. Choose a cache policy based on your table size and query patterns:
-
None: No caching. Every lookup hits Tablestore directly. Use when data changes frequently and freshness is critical.
-
LRU: Caches a fixed number of recently accessed records. When a lookup misses the cache, the connector queries Tablestore and updates the cache with the result. Set
cacheSizeandcacheTTLMswhen using this policy. -
ALL (default): Loads the entire dimension table into the cache before the job starts. All subsequent lookups are served from cache. When the cache expires (
cacheTTLMs), the connector reloads all data. Use ALL when the table is small and you expect many missing-key lookups. When using ALL, increase the memory of the join node — the cache requires approximately twice the size of the remote table.
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
retryIntervalMs |
Integer | No | 1000 | Retry interval in milliseconds. |
maxRetryTimes |
Integer | No | 10 | Maximum number of retries. |
cache |
String | No | ALL | Cache policy: None, LRU, or ALL. |
cacheSize |
Integer | No | — | Maximum number of cached records. Applies when cache is LRU. |
cacheTTLMs |
Integer | No | — | Cache TTL in milliseconds. For LRU: timeout per entry. For ALL: full-cache refresh interval. Leave unset to disable expiration. |
cacheEmpty |
Boolean | No | — | Whether to cache empty (no-match) results. true: cache; false: do not cache. |
cacheReloadTimeBlackList |
String | No | — | Time windows during which the ALL cache is not refreshed. Format: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Separate multiple windows with commas; use -> between start and end times. |
async |
Boolean | No | false | Whether to enable asynchronous lookup. true: async lookups (results are not ordered); false: synchronous lookups. |
Data type mappings
Source table
| Tablestore type | Flink SQL type |
|---|---|
| INTEGER | BIGINT |
| STRING | STRING |
| BOOLEAN | BOOLEAN |
| DOUBLE | DOUBLE |
| BINARY | BINARY |
Sink table
| Flink SQL type | Tablestore type |
|---|---|
| BINARY | BINARY |
| VARBINARY | BINARY |
| CHAR | STRING |
| VARCHAR | STRING |
| TINYINT | INTEGER |
| SMALLINT | INTEGER |
| INTEGER | INTEGER |
| BIGINT | INTEGER |
| FLOAT | DOUBLE |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
Examples
Read from Tablestore and write to Tablestore
This example reads order data from a Tablestore source table via Tunnel Service and writes it to a Tablestore sink table. The sink table uses an auto-increment primary key column.
CREATE TEMPORARY TABLE tablestore_stream (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-source',
'tableName'='flink_source_table',
'tunnelName'='flinksourcestream',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'ignoreDelete'='false',
'skipInvalidData'='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`, orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Synchronize a wide-column table to a time series table
This example reads from a wide-column source table and writes to a time series sink table. The sink table's tags column uses MAP<STRING, STRING> to hold tag key-value pairs, and storageType is set to TIMESERIES.
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
) WITH (
'connector'='ots',
'endPoint'='https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName'='iotstore-test',
'tableName'='test_ots_timeseries_2',
'tunnelName'='timeseries_source_tunnel_2',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'ignoreDelete'='true'
);
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags MAP<STRING, STRING>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY (measurement, datasource, tags, `time`) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName'='iotstore-test',
'tableName'='test_timeseries_sink_table_2',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'storageType'='TIMESERIES'
);
-- Build the tags map from individual tag columns and insert into the time series sink table
INSERT INTO timeseries_sink
SELECT
measurement,
datasource,
MAP['tag_a', tag_a, 'tag_b', tag_b, 'tag_c', tag_c, 'tag_d', tag_d, 'tag_e', tag_e, 'tag_f', tag_f] AS tags,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value,
tag_b,
tag_c,
tag_d,
tag_e,
tag_f
FROM timeseries_source;