本文为您介绍如何使用云原生多模数据库Lindorm连接器。

背景信息

Lindorm是面向物联网、互联网、车联网等设计和优化的云原生多模超融合数据库,是日志、监控、账单、广告、社交、出行、风控等场景首选数据库,也是为阿里巴巴核心业务提供支撑的数据库之一。详情请参见什么是云原生多模数据库Lindorm

Lindorm具备以下特性:
  • 支持宽表、时序、文本、对象、流、空间等多种数据的统一访问和融合处理。
  • 兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多种标准接口和无缝集成三方生态工具。
Lindorm连接器支持的信息如下。
类别详情
支持类型维表和结果表
运行模式仅支持流模式
数据格式暂不适用
特有监控指标
  • 维表:无
  • 结果表:
    • numBytesOut
    • numBytesOutPerSecond
    • numRecordsOut
    • numRecordsOutPerSecond
说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类SQL
支持的Lindorm引擎宽表引擎
是否支持更新或删除结果表数据

前提条件

  • 已经创建了Lindorm宽表引擎以及数据表,详情请参见创建实例
  • Lindorm集群与Flink全托管集群处于网络连通的环境下,例如在同一个VPC下。

使用限制

仅Flink计算引擎VVR 4.0.8及以上版本支持Lindorm。

语法结构

CREATE TABLE white_list (
 id varchar,
 name varchar,
 age int,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'lindorm',
 'seedserver' = '<yourSeedServer>',
 'namespace' = '<yourNamespace>',
 'username' = '<yourUsername>',
 'password' = '<yourPassword>',
 'tableName' = '<yourTableName>',
 'columnFamily' = '<yourColumnFamily>'
);

WITH参数

类型参数说明数据类型是否必填默认值备注
通用connector表类型。String固定值为lindorm。
seedserverLindorm服务器的连接地址。StringFlink全托管使用Java API的方式访问Lindorm。Lindorm服务器的连接地址的格式为host:port。详情请参见通过HBase Java API连接并使用宽表引擎
namespaceLindorm的命名空间。String无。
username连接Lindorm所用到的用户名。String无。
password连接Lindorm所用到的密码。String无。
tableNameLindorm表名。String无。
columnFamilyLindorm表的列族名。String如果创建Lindorm表时未指定列族名,则填写缺省列族名f。
retryIntervalMs读取或写入失败时,再次重试读取的时间间隔。Integer1000单位为毫秒。
maxRetryTimes最大尝试次数。Integer5无。
结果表独有bufferSize一次批量写入数据的条数。Integer500无。
flushIntervalMs当数据量比较少时,多长时间写入一次。Integer2000单位为毫秒。
ignoreDelete是否忽略Delete操作。Booleanfalse参数取值如下:
  • true:忽略。
  • false(默认):不忽略。
isDynamicTable是否开启动态表模式。关于动态表模式的介绍,请参见动态表模式Booleanfalse参数取值如下:
  • true:开启动态表模式。
  • false(默认):不开启动态表模式。
说明 实时计算引擎VVR 6.0.1及以上版本支持该参数。
维表独有partitionedJoin是否额外使用JoinKey进行分区。Booleanfalse参数取值如下:
  • true:用JoinKey进行分区,将数据分发到Join节点,提高缓存命中率。
  • false(默认值):不使用JoinKey进行分区。
shuffleEmptyKey遇到空Key时,是否将Key为空的记录随机向下游Shuffle。Booleanfalse参数取值如下:
  • true:随机往下游做Shuffle。
  • false(默认值):从下游中编号为0的并发开始做Shuffle,即从第一个并发开始。
cache缓存策略。StringNone目前Lindorm支持以下两种缓存策略:
  • None(默认值):无缓存。
  • LRU:只保留最近使用的数据。
需要配置相关参数:缓存大小(cacheSize)和缓存失效超时时间(cacheTTLMs)。
cacheSize缓存数据的行数。Integer1000当选择LRU缓存策略后,使用本参数可以设置缓存大小。
cacheTTLMs缓存失效超时时间。Integer单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。

动态表模式

动态表模式适用于在表定义中并未指定列名的情况,根据作业运行情况动态创建数据列并插入的场景。例如统计每天每小时的交易量,以天作为主键,小时作为列,每个小时的数据都是动态生成的,示例如下。

主键列名:0点列名:1点
2025-06-014532
2025-06-027634

动态表需要遵循特殊的DDL定义。其主键需要定义为前若干列,最后两列中前一列的值作为列名变量,最后一列的值作为该列对应的值,且要求最后两列的类型均为varchar。代码示例如下。

CREATE TABLE lindorm_dynamic_output (
 pk1 varchar,
 pk2 varchar,
 pk3 varchar,
 c1 varchar,
 c2 varchar,
 PRIMARY KEY(pk1, pk2, pk3) not enforced
) WITH (
 'connector' = 'lindorm', 
 'seedserver' = '<yourSeedServer>',
 'namespace' = '<yourNamespace>',
 'username' = '<yourUsername>',
 'password' = '<yourPassword>',
 'tableName' = '<yourTableName>',
 'columnFamily' = '<yourColumnFamily>'
);

上述定义中,pk1、pk2、pk3为主键,c1、c2为动态表模式所必须的两列且一定为最后两列,不可存在其他的非主键的列。每次写入数据时,会在主键<pk1, pk2, pk3>对应的条目中添加或更改一列,列名为c1的值,该列的值为c2的值。每次一条数据来临时,只会添加或更改一列对应的值,其他列的值不会改变。

类型映射

Lindorm中数据均为二进制形式,通过Flink某个字段类型来转换或解析二进制的Bytes方法如下。
Flink SQL类型转换为写入的Bytes使用的方法从Lindorm读取Bytes之后的解析
CHARorg.apache.flink.table.data.StringData::toBytesorg.apache.flink.table.data.StringData::fromBytes
VARCHAR
BOOLEANcom.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal
BINARY直接为bytes的形式。直接返回bytes。
VARBINARY
DECIMALcom.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal
TINYINT直接将数据封装成byte[]的第一个byte。直接返回bytes[0]。
SMALLINTcom.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)com.alibaba.lindorm.client.core.utils.Bytes::toShort
INTcom.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)com.alibaba.lindorm.client.core.utils.Bytes::toInt
BIGINTcom.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)com.alibaba.lindorm.client.core.utils.Bytes::toLong
FLOATcom.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)com.alibaba.lindorm.client.core.utils.Bytes::toFloat
DOUBLEcom.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)com.alibaba.lindorm.client.core.utils.Bytes::toDouble
DATE获取自1970.01.01以来的天数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自1970.01.01以来的天数。
TIME获取自当天00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自当天00:00:00以来的毫秒数。
TIMESTAMP获取自1970.01.01 00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。com.alibaba.lindorm.client.core.utils.Bytes::toLong得到自1970.01.01 00:00:00以来的毫秒数。

代码示例

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;