文档

云原生多模数据库Lindorm

更新时间:

本文为您介绍如何使用云原生多模数据库Lindorm连接器。

背景信息

Lindorm是面向物联网、互联网、车联网等设计和优化的云原生多模超融合数据库,是日志、监控、账单、广告、社交、出行、风控等场景首选数据库,也是为阿里巴巴核心业务提供支撑的数据库之一。详情请参见什么是云原生多模数据库Lindorm

具备以下特性:

  • 支持宽表、时序、文本、对象、流、空间等多种数据的统一访问和融合处理。

  • 兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多种标准接口和无缝集成三方生态工具。

Lindorm连接器支持的信息如下。

类别

详情

支持类型

维表和结果表

运行模式

仅支持流模式

数据格式

暂不适用

特有监控指标

  • 维表:无

  • 结果表:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

说明

指标含义详情,请参见监控指标说明

API种类

SQL

支持的Lindorm引擎

宽表引擎

是否支持更新或删除结果表数据

前提条件

  • 已经创建了Lindorm宽表引擎以及数据表,详情请参见创建实例

  • Lindorm集群与Flink全托管集群处于网络连通的环境下,例如在同一个VPC下。

使用限制

仅Flink计算引擎VVR 4.0.8及以上版本支持Lindorm。

语法结构

CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

WITH参数

类型

参数

说明

数据类型

是否必填

默认值

备注

通用

connector

表类型。

String

固定值为lindorm。

seedserver

Lindorm服务器的连接地址。

String

Flink全托管使用Java API的方式访问Lindorm。Lindorm服务器的连接地址的格式为host:port。详情请参见基于HBase Java API的应用开发

namespace

Lindorm的命名空间。

String

无。

username

连接Lindorm所用到的用户名。

String

无。

password

连接Lindorm所用到的密码。

String

无。

tableName

Lindorm表名。

String

无。

columnFamily

Lindorm表的列族名。

String

如果创建Lindorm表时未指定列族名,则填写默认列族名f。

retryIntervalMs

读取或写入失败时,再次重试读取的时间间隔。

Integer

1000

单位为毫秒。

maxRetryTimes

最大尝试次数。

Integer

5

无。

结果表独有

bufferSize

一次批量写入数据的条数。

Integer

500

无。

flushIntervalMs

当数据量比较少时,多长时间写入一次。

Integer

2000

单位为毫秒。

ignoreDelete

是否忽略Delete操作。

Boolean

false

参数取值如下:

  • true:忽略。

  • false(默认):不忽略。

dynamicColumnSink

是否开启动态表模式。关于动态表模式的介绍,请参见动态表模式

Boolean

false

参数取值如下:

  • true:开启动态表模式。

  • false(默认):不开启动态表模式。

说明

实时计算引擎VVR 6.0.2及以上版本支持该参数。

维表独有

partitionedJoin

是否额外使用JoinKey进行分区。

Boolean

false

参数取值如下:

  • true:用JoinKey进行分区,将数据分发到Join节点,提高缓存命中率。

  • false(默认值):不使用JoinKey进行分区。

shuffleEmptyKey

遇到空Key时,是否将Key为空的记录随机向下游Shuffle。

Boolean

false

参数取值如下:

  • true:随机往下游做Shuffle。

  • false(默认值):从下游中编号为0的并发开始做Shuffle,即从第一个并发开始。

cache

缓存策略。

String

None

目前Lindorm支持以下两种缓存策略:

  • None(默认值):无缓存。

  • LRU:只保留最近使用的数据。

需要配置相关参数:缓存大小(cacheSize)和缓存失效超时时间(cacheTTLMs)。

cacheSize

缓存数据的行数。

Integer

1000

当选择LRU缓存策略后,使用本参数可以设置缓存大小。

cacheTTLMs

缓存失效超时时间。

Integer

单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。

cacheEmpty

是否缓存join结果为空的数据。

Boolean

true

无。

async

是否异步返回数据。

Boolean

false

参数取值如下:

  • true:表示异步返回数据。

  • false(默认值):表示不进行异步返回数据。

asyncLindormRpcTimeoutMs

在异步请求数据时的超时时间。

Integer

300000

单位毫秒。

动态表模式

动态表模式适用于在表定义中并未指定列名的情况,根据作业运行情况动态创建数据列并插入的场景。例如统计每天每小时的交易量,以天作为主键,小时作为列,每个小时的数据都是动态生成的,示例如下。

主键

列名:0点

列名:1点

2025-06-01

45

32

2025-06-02

76

34

动态表需要遵循特殊的DDL定义。其主键需要定义为前若干列,最后两列中前一列的值作为列名变量,最后一列的值作为该列对应的值,且要求最后两列的类型均为varchar。代码示例如下。

CREATE TABLE lindorm_dynamic_output(
pk1varchar,
pk2varchar,
pk3varchar,
c1varchar,
c2varchar,
PRIMARYKEY(pk1,pk2,pk3)notenforced
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

上述定义中,pk1、pk2、pk3为主键,c1、c2为动态表模式所必须的两列且一定为最后两列,不可存在其他的非主键的列。每次写入数据时,会在主键<pk1, pk2, pk3>对应的条目中添加或更改一列,列名为c1的值,该列的值为c2的值。每次一条数据来临时,只会添加或更改一列对应的值,其他列的值不会改变。

类型映射

Lindorm中数据均为二进制形式,通过Flink某个字段类型来转换或解析二进制的Bytes方法如下。

Flink SQL类型

转换为写入的Bytes使用的方法

从Lindorm读取Bytes之后的解析

CHAR

org.apache.flink.table.data.StringData::toBytes

org.apache.flink.table.data.StringData::fromBytes

VARCHAR

BOOLEAN

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

BINARY

直接为bytes的形式。

直接返回bytes。

VARBINARY

DECIMAL

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

TINYINT

直接将数据封装成byte[]的第一个byte。

直接返回bytes[0]。

SMALLINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)

com.alibaba.lindorm.client.core.utils.Bytes::toShort

INT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)

com.alibaba.lindorm.client.core.utils.Bytes::toInt

BIGINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)

com.alibaba.lindorm.client.core.utils.Bytes::toLong

FLOAT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)

com.alibaba.lindorm.client.core.utils.Bytes::toFloat

DOUBLE

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)

com.alibaba.lindorm.client.core.utils.Bytes::toDouble

DATE

获取自1970.01.01以来的天数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。

com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自1970.01.01以来的天数。

TIME

获取自当天00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。

com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自当天00:00:00以来的毫秒数。

TIMESTAMP

获取自1970.01.01 00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。

com.alibaba.lindorm.client.core.utils.Bytes::toLong得到自1970.01.01 00:00:00以来的毫秒数。

代码示例

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;