Lindorm connector

更新时间:
复制 MD 格式

Sink: Streaming Lookup source: Sync/Async mode

The Lindorm connector lets Flink streaming jobs write data to and look up data from Lindorm wide tables using the SQL API.

Prerequisites

Before you begin, ensure that you have:

  • A Lindorm wide table engine and a Lindorm table. For more information, see Create an instance.

  • Network connectivity between the Lindorm cluster and the Flink workspace—for example, both in the same virtual private cloud (VPC).

Lindorm HBase tables are not supported. Only LindormTable is supported.

Quick start

The following example generates 10 rows of data, looks up matching rows in a Lindorm dimension table, and writes the result to a Lindorm sink table.

-- Source: generate 10 rows with sequential IDs 0-9
CREATE TEMPORARY TABLE example_source (
  id INT,
  proc_time AS PROCTIME()
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '10',
  'fields.id.kind' = 'sequence',
  'fields.id.start' = '0',
  'fields.id.end' = '9'
);

-- Dimension table: look up user details from Lindorm
CREATE TEMPORARY TABLE lindorm_hbase_dim (
  `id`    INT,
  `name`  VARCHAR,
  `birth` VARCHAR,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'   = 'lindorm',
  'tablename'   = '${lindorm_dim_table}',
  'seedserver'  = '${lindorm_seed_server}',
  'namespace'   = 'default',
  'username'    = '${lindorm_username}',
  'password'    = '${lindorm_password}'
);

-- Sink table: write enriched records to Lindorm
CREATE TEMPORARY TABLE lindorm_hbase_sink (
  `id`    INT,
  `name`  VARCHAR,
  `birth` VARCHAR,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'   = 'lindorm',
  'tablename'   = '${lindorm_sink_table}',
  'seedserver'  = '${lindorm_seed_server}',
  'namespace'   = 'default',
  'username'    = '${lindorm_username}',
  'password'    = '${lindorm_password}'
);

-- Temporal join: enrich source data with the dimension table and write to sink
INSERT INTO lindorm_hbase_sink
SELECT
  s.id,
  d.name,
  d.birth
FROM example_source AS s
JOIN lindorm_hbase_dim AS d FOR SYSTEM_TIME AS OF s.proc_time
  ON s.id = d.id;

Replace the placeholders before running:

Placeholder Description
${lindorm_dim_table} Name of the Lindorm dimension table
${lindorm_sink_table} Name of the Lindorm sink table
${lindorm_seed_server} Lindorm server endpoint in host:port format
${lindorm_username} Lindorm username
${lindorm_password} Lindorm password

Syntax

CREATE TABLE white_list (
  id     VARCHAR,
  name   VARCHAR,
  age    INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'    = 'lindorm',
  'seedserver'   = '<host:port>',
  'namespace'    = '<yourNamespace>',
  'username'     = '<yourUsername>',
  'password'     = '<yourPassword>',
  'tableName'    = '<yourTableName>',
  'columnFamily' = '<yourColumnFamily>'
);

Connector options

General

Option Type Required Default Description
connector String Yes Must be lindorm.
seedserver String Yes Lindorm server endpoint in host:port format. Realtime Compute for Apache Flink uses the ApsaraDB for HBase API for Java to connect. For details, see Use Flink to connect to and use LindormTable.
namespace String Yes Lindorm database namespace.
username String Yes Lindorm database username.
password String Yes Lindorm database password.
tableName String Yes Name of the Lindorm table.
columnFamily String Yes Column family name. If no column family was specified when the table was created, enter f.
retryIntervalMs Integer No 1000 Retry interval for failed read operations, in milliseconds.
maxRetryTimes Integer No 5 Maximum number of retries for read or write operations.

Sink-specific

Option Type Required Default Description
bufferSize Integer No 500 Number of records to buffer before flushing to Lindorm.
flushIntervalMs Integer No 2000 Maximum time between flushes when the buffer is not full, in milliseconds.
ignoreDelete Boolean No false When true, delete operations are skipped.
dynamicColumnSink Boolean No false When true, enables the dynamic table feature. See Dynamic table.
excludeUpdateColumns String No Comma-separated list of columns to exclude from updates. For example, a,b,c skips updates to columns a, b, and c. Requires VVR 8.0.9 or later.

Dimension table-specific

Option Type Required Default Description
partitionedJoin Boolean No false When true, uses the JoinKey for partitioning to improve the cache hit rate.
shuffleEmptyKey Boolean No false When true, randomly routes empty upstream keys to downstream nodes. When false, routes them to parallel thread 0.
cache String No None Cache policy. Valid values: None (no caching) and LRU (cache recently accessed rows).
cacheSize Integer No 1000 Maximum number of rows to cache. Applies when cache is LRU.
cacheTTLMs Integer No Cache entry expiration time, in milliseconds. Applies when cache is LRU. By default, entries do not expire.
cacheEmpty Boolean No true When true, caches lookup results that returned no rows.
async Boolean No false When true, enables asynchronous lookup mode.
asyncLindormRpcTimeoutMs Integer No 300000 RPC timeout for asynchronous lookups, in milliseconds.

Lookup cache

By default, the connector fetches every lookup directly from Lindorm (cache = None). Enable LRU caching to reduce Lindorm read pressure for high-throughput joins.

With cache = LRU, the connector stores recently accessed rows in memory. When the join key matches a cached row, the connector returns the cached value without querying Lindorm. A cached entry is evicted when:

  • The cache reaches cacheSize rows (oldest rows are evicted first).

  • The entry has been in the cache for longer than cacheTTLMs milliseconds.

There is a tradeoff: a longer TTL or larger cache reduces Lindorm read traffic but increases the risk of serving stale data. Tune both values based on your throughput requirements and how quickly the underlying data changes.

The Lindorm connector supports one-to-many lookup joins. Pay close attention to caching strategies and throughput when rows in the dimension table can match multiple upstream events.

Idempotent writes

When a Lindorm table has a primary key, all writes use upsert semantics: each incoming record either inserts a new row or updates the matching existing row. This makes writes idempotent.

Idempotent writes are important for fault tolerance. If a Flink job restarts from a checkpoint, it replays messages from the last successful checkpoint. Because Lindorm upserts are idempotent, replayed records produce the same result as the original writes—no duplicate rows or constraint violations.

Define a primary key in your DDL to take advantage of idempotent writes.

Dynamic table

Use the dynamic table feature when your schema evolves at runtime—columns are created dynamically based on values in the data rather than fixed at DDL time. A typical use case is tracking hourly metrics per day, where hours are column names and days are primary keys:

Primary key 00:00 01:00
2025-06-01 45 32
2025-06-02 76 34

DDL rules for dynamic tables:

  • The first N columns form the primary key.

  • The last two columns must be VARCHAR.

  • The second-to-last column (c1) holds the dynamic column name.

  • The last column (c2) holds the value for that column.

  • No non-primary-key columns other than c1 and c2 are allowed.

CREATE TABLE lindorm_dynamic_output (
  pk1 VARCHAR,
  pk2 VARCHAR,
  pk3 VARCHAR,
  c1  VARCHAR,  -- column name written to Lindorm
  c2  VARCHAR,  -- column value written to Lindorm
  PRIMARY KEY (pk1, pk2, pk3) NOT ENFORCED
) WITH (
  'connector'         = 'lindorm',
  'seedserver'        = '<host:port>',
  'namespace'         = '<yourNamespace>',
  'username'          = '<yourUsername>',
  'password'          = '<yourPassword>',
  'tableName'         = '<yourTableName>',
  'columnFamily'      = '<yourColumnFamily>',
  'dynamicColumnSink' = 'true'
);

Each time a record is written, the connector adds or updates one column in the Lindorm row identified by <pk1, pk2, pk3>. Other columns in that row remain unchanged.

Data type mappings

All Lindorm data is stored in binary format. The following table shows how the connector converts between Flink SQL types and Lindorm binary representations.

Flink SQL type Write to Lindorm Read from Lindorm
CHAR / VARCHAR StringData::toBytes StringData::fromBytes
BOOLEAN Bytes::toBytes(boolean) Bytes::toBigDecimal
BINARY / VARBINARY Direct bytes Direct bytes
DECIMAL Bytes::toBytes(BigDecimal) Bytes::toBigDecimal
TINYINT First byte of byte[] bytes[0]
SMALLINT Bytes::toBytes(short) Bytes::toShort
INT Bytes::toBytes(int) Bytes::toInt
BIGINT Bytes::toBytes(long) Bytes::toLong
FLOAT Bytes::toBytes(float) Bytes::toFloat
DOUBLE Bytes::toBytes(double) Bytes::toDouble
DATE Bytes::toBytes(int) with days since 1970-01-01 Bytes::toInt → days since 1970-01-01
TIME Bytes::toBytes(int) with milliseconds since 00:00:00 Bytes::toInt → milliseconds since 00:00:00
TIMESTAMP Bytes::toBytes(long) with milliseconds since 1970-01-01 00:00:00 Bytes::toLong → milliseconds since 1970-01-01 00:00:00

Bytes methods are in the com.alibaba.lindorm.client.core.utils.Bytes class. StringData methods (CHAR/VARCHAR rows) are in the org.apache.flink.table.data.StringData class.

Metrics

The following sink metrics are available. For details, see Metrics.

Metric Description
numBytesOut Total bytes written to the sink
numBytesOutPerSecond Bytes written per second
numRecordsOut Total records written to the sink
numRecordsOutPerSecond Records written per second

FAQ

Lindorm connection errors and solutions