Sink: Streaming Lookup source: Sync/Async mode
The Lindorm connector lets Flink streaming jobs write data to and look up data from Lindorm wide tables using the SQL API.
Prerequisites
Before you begin, ensure that you have:
-
A Lindorm wide table engine and a Lindorm table. For more information, see Create an instance.
-
Network connectivity between the Lindorm cluster and the Flink workspace—for example, both in the same virtual private cloud (VPC).
Lindorm HBase tables are not supported. Only LindormTable is supported.
Quick start
The following example generates 10 rows of data, looks up matching rows in a Lindorm dimension table, and writes the result to a Lindorm sink table.
-- Source: generate 10 rows with sequential IDs 0-9
CREATE TEMPORARY TABLE example_source (
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
-- Dimension table: look up user details from Lindorm
CREATE TEMPORARY TABLE lindorm_hbase_dim (
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'tablename' = '${lindorm_dim_table}',
'seedserver' = '${lindorm_seed_server}',
'namespace' = 'default',
'username' = '${lindorm_username}',
'password' = '${lindorm_password}'
);
-- Sink table: write enriched records to Lindorm
CREATE TEMPORARY TABLE lindorm_hbase_sink (
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'tablename' = '${lindorm_sink_table}',
'seedserver' = '${lindorm_seed_server}',
'namespace' = 'default',
'username' = '${lindorm_username}',
'password' = '${lindorm_password}'
);
-- Temporal join: enrich source data with the dimension table and write to sink
INSERT INTO lindorm_hbase_sink
SELECT
s.id,
d.name,
d.birth
FROM example_source AS s
JOIN lindorm_hbase_dim AS d FOR SYSTEM_TIME AS OF s.proc_time
ON s.id = d.id;
Replace the placeholders before running:
| Placeholder | Description |
|---|---|
${lindorm_dim_table} |
Name of the Lindorm dimension table |
${lindorm_sink_table} |
Name of the Lindorm sink table |
${lindorm_seed_server} |
Lindorm server endpoint in host:port format |
${lindorm_username} |
Lindorm username |
${lindorm_password} |
Lindorm password |
Syntax
CREATE TABLE white_list (
id VARCHAR,
name VARCHAR,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<host:port>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
Connector options
General
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Must be lindorm. |
seedserver |
String | Yes | — | Lindorm server endpoint in host:port format. Realtime Compute for Apache Flink uses the ApsaraDB for HBase API for Java to connect. For details, see Use Flink to connect to and use LindormTable. |
namespace |
String | Yes | — | Lindorm database namespace. |
username |
String | Yes | — | Lindorm database username. |
password |
String | Yes | — | Lindorm database password. |
tableName |
String | Yes | — | Name of the Lindorm table. |
columnFamily |
String | Yes | — | Column family name. If no column family was specified when the table was created, enter f. |
retryIntervalMs |
Integer | No | 1000 |
Retry interval for failed read operations, in milliseconds. |
maxRetryTimes |
Integer | No | 5 |
Maximum number of retries for read or write operations. |
Sink-specific
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
bufferSize |
Integer | No | 500 |
Number of records to buffer before flushing to Lindorm. |
flushIntervalMs |
Integer | No | 2000 |
Maximum time between flushes when the buffer is not full, in milliseconds. |
ignoreDelete |
Boolean | No | false |
When true, delete operations are skipped. |
dynamicColumnSink |
Boolean | No | false |
When true, enables the dynamic table feature. See Dynamic table. |
excludeUpdateColumns |
String | No | — | Comma-separated list of columns to exclude from updates. For example, a,b,c skips updates to columns a, b, and c. Requires VVR 8.0.9 or later. |
Dimension table-specific
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
partitionedJoin |
Boolean | No | false |
When true, uses the JoinKey for partitioning to improve the cache hit rate. |
shuffleEmptyKey |
Boolean | No | false |
When true, randomly routes empty upstream keys to downstream nodes. When false, routes them to parallel thread 0. |
cache |
String | No | None |
Cache policy. Valid values: None (no caching) and LRU (cache recently accessed rows). |
cacheSize |
Integer | No | 1000 |
Maximum number of rows to cache. Applies when cache is LRU. |
cacheTTLMs |
Integer | No | — | Cache entry expiration time, in milliseconds. Applies when cache is LRU. By default, entries do not expire. |
cacheEmpty |
Boolean | No | true |
When true, caches lookup results that returned no rows. |
async |
Boolean | No | false |
When true, enables asynchronous lookup mode. |
asyncLindormRpcTimeoutMs |
Integer | No | 300000 |
RPC timeout for asynchronous lookups, in milliseconds. |
Lookup cache
By default, the connector fetches every lookup directly from Lindorm (cache = None). Enable LRU caching to reduce Lindorm read pressure for high-throughput joins.
With cache = LRU, the connector stores recently accessed rows in memory. When the join key matches a cached row, the connector returns the cached value without querying Lindorm. A cached entry is evicted when:
-
The cache reaches
cacheSizerows (oldest rows are evicted first). -
The entry has been in the cache for longer than
cacheTTLMsmilliseconds.
There is a tradeoff: a longer TTL or larger cache reduces Lindorm read traffic but increases the risk of serving stale data. Tune both values based on your throughput requirements and how quickly the underlying data changes.
The Lindorm connector supports one-to-many lookup joins. Pay close attention to caching strategies and throughput when rows in the dimension table can match multiple upstream events.
Idempotent writes
When a Lindorm table has a primary key, all writes use upsert semantics: each incoming record either inserts a new row or updates the matching existing row. This makes writes idempotent.
Idempotent writes are important for fault tolerance. If a Flink job restarts from a checkpoint, it replays messages from the last successful checkpoint. Because Lindorm upserts are idempotent, replayed records produce the same result as the original writes—no duplicate rows or constraint violations.
Define a primary key in your DDL to take advantage of idempotent writes.
Dynamic table
Use the dynamic table feature when your schema evolves at runtime—columns are created dynamically based on values in the data rather than fixed at DDL time. A typical use case is tracking hourly metrics per day, where hours are column names and days are primary keys:
| Primary key | 00:00 | 01:00 |
|---|---|---|
| 2025-06-01 | 45 | 32 |
| 2025-06-02 | 76 | 34 |
DDL rules for dynamic tables:
-
The first N columns form the primary key.
-
The last two columns must be
VARCHAR. -
The second-to-last column (
c1) holds the dynamic column name. -
The last column (
c2) holds the value for that column. -
No non-primary-key columns other than
c1andc2are allowed.
CREATE TABLE lindorm_dynamic_output (
pk1 VARCHAR,
pk2 VARCHAR,
pk3 VARCHAR,
c1 VARCHAR, -- column name written to Lindorm
c2 VARCHAR, -- column value written to Lindorm
PRIMARY KEY (pk1, pk2, pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<host:port>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>',
'dynamicColumnSink' = 'true'
);
Each time a record is written, the connector adds or updates one column in the Lindorm row identified by <pk1, pk2, pk3>. Other columns in that row remain unchanged.
Data type mappings
All Lindorm data is stored in binary format. The following table shows how the connector converts between Flink SQL types and Lindorm binary representations.
| Flink SQL type | Write to Lindorm | Read from Lindorm |
|---|---|---|
CHAR / VARCHAR |
StringData::toBytes |
StringData::fromBytes |
BOOLEAN |
Bytes::toBytes(boolean) |
Bytes::toBigDecimal |
BINARY / VARBINARY |
Direct bytes | Direct bytes |
DECIMAL |
Bytes::toBytes(BigDecimal) |
Bytes::toBigDecimal |
TINYINT |
First byte of byte[] |
bytes[0] |
SMALLINT |
Bytes::toBytes(short) |
Bytes::toShort |
INT |
Bytes::toBytes(int) |
Bytes::toInt |
BIGINT |
Bytes::toBytes(long) |
Bytes::toLong |
FLOAT |
Bytes::toBytes(float) |
Bytes::toFloat |
DOUBLE |
Bytes::toBytes(double) |
Bytes::toDouble |
DATE |
Bytes::toBytes(int) with days since 1970-01-01 |
Bytes::toInt → days since 1970-01-01 |
TIME |
Bytes::toBytes(int) with milliseconds since 00:00:00 |
Bytes::toInt → milliseconds since 00:00:00 |
TIMESTAMP |
Bytes::toBytes(long) with milliseconds since 1970-01-01 00:00:00 |
Bytes::toLong → milliseconds since 1970-01-01 00:00:00 |
Bytes methods are in the com.alibaba.lindorm.client.core.utils.Bytes class. StringData methods (CHAR/VARCHAR rows) are in the org.apache.flink.table.data.StringData class.
Metrics
The following sink metrics are available. For details, see Metrics.
| Metric | Description |
|---|---|
numBytesOut |
Total bytes written to the sink |
numBytesOutPerSecond |
Bytes written per second |
numRecordsOut |
Total records written to the sink |
numRecordsOutPerSecond |
Records written per second |