本文为您介绍云原生多模数据库Lindorm的宽表作为结果表的DDL定义、WITH参数、类型映射和代码示例。

什么是云原生多模数据库Lindorm

Lindorm是面向物联网、互联网、车联网等设计和优化的云原生多模超融合数据库,适用于日志、监控、账单、广告、社交、出行、风控等场景,是为阿里巴巴核心业务提供支撑的数据库之一。详情请参见什么是云原生多模数据库Lindorm

Lindorm具备以下特性:
  • 支持宽表、时序、文本、对象、流、空间等多种数据的统一访问和融合处理。
  • 兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多种标准接口和无缝集成三方生态工具。

前提条件

  • 已经创建了Lindorm宽表引擎以及数据表,详情请参见创建实例
  • Lindorm集群与Flink全托管集群处于网络连通的环境下,例如在同一个VPC下。

使用限制

仅Flink计算引擎VVR 4.0.8及以上版本支持Lindorm。

DDL定义

CREATE TABLE lindorm_output (
 pk1 varchar,
 pk2 varchar,
 pk3 varchar,
 c1 bigint,
 c2 double,
 c3 varchar,
 PRIMARY KEY(pk1, pk2, pk3) not enforced
) WITH (
 'connector' = 'lindorm', 
 'seedserver' = '<yourSeedServer>',
 'namespace' = '<yourNamespace>',
 'username' = '<yourUsername>',
 'password' = '<yourPassword>',
 'tableName' = '<yourTableName>',
 'columnFamily' = '<yourColumnFamily>'
);

WITH参数

参数 说明 是否必填 备注
seedserver Lindorm服务器的连接地址。 Flink全托管使用Java API的方式访问Lindorm。

Lindorm服务器的连接地址的格式为host:port。详情请参见通过HBase Java API连接并使用宽表引擎

namespace Lindorm的命名空间。 无。
username 连接Lindorm所用到的用户名。 无。
password 连接Lindorm所用到的密码。 无。
tableName Lindorm表名。 无。
columnFamily Lindorm表的列族名。 如果创建Lindorm表时未指定列族名,则填写为f。
retryIntervalMs 写入失败时,重试写入的时间间隔。 单位为ms,默认值为1000ms。
maxRetryTimes 最大尝试次数。 默认值为5次。
bufferSize 一次批量写入数据的条数。 默认值为500条。
flushIntervalMs 当数据量比较少时,多长时间写入一次。 单位为ms,默认值为2000ms。
ignoreDelete 是否忽略Delete操作。 参数取值如下:
  • true:忽略。
  • false(默认):不忽略。
isDynamicTable 是否开启动态表模式。关于动态表模式的介绍,请参见动态表模式一节。 参数取值如下:
  • true:开启动态表模式。
  • false(默认):不开启动态表模式。
说明 实时计算引擎VVR 6.0.1及以上版本支持该参数。

动态表模式

动态表模式适用于在表定义中并未指定列名的情况,根据作业运行情况动态创建数据列并插入的场景。例如统计每天每小时的交易量,以天作为主键,小时作为列,每个小时的数据都是动态生成的,如下:

主键 列名:0点 列名:1点
2025-06-01 45 32
2025-06-02 76 34

动态表需要遵循特殊的DDL定义。其主键需要定义为前若干列,最后两列中前一列的值作为列名变量,最后一列的值作为该列对应的值,且要求最后两列的类型均为varchar。具体例子如下:

CREATE TABLE lindorm_dynamic_output (
 pk1 varchar,
 pk2 varchar,
 pk3 varchar,
 c1 varchar,
 c2 varchar,
 PRIMARY KEY(pk1, pk2, pk3) not enforced
) WITH (
 'connector' = 'lindorm', 
 'seedserver' = '<yourSeedServer>',
 'namespace' = '<yourNamespace>',
 'username' = '<yourUsername>',
 'password' = '<yourPassword>',
 'tableName' = '<yourTableName>',
 'columnFamily' = '<yourColumnFamily>'
);

上述定义中,pk1、pk2、pk3 为主键,c1、c2 为动态表模式所必须的两列且一定为最后两列,不可存在其他的非主键的列。每次写入数据时,会将主键<pk1, pk2, pk3>对应的条目中添加/更改一列,列名为c1的值,该列的值为c2的值。每次一条数据来临时,只会添加/更改一列对应的值,其他列的值不会改变。

类型映射

Flink会根据表定义的类型对相应字段进行序列化,然后写入Lindorm。方法如下:
Flink SQL类型 在写入Lindorm表前对数据进行的操作
CHAR org.apache.flink.table.data.StringData::toBytes
VARCHAR
BOOLEAN com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)
BINARY 直接为bytes的形式。
VARBINARY
DECIMAL com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)
TINYINT 直接将数据封装成byte[]的第一个byte。
SMALLINT com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)
INT com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)
BIGINT com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)
FLOAT com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)
DOUBLE com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)
DATE 获取自1970.01.01以来的天数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) 。
TIME 获取自当天00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。
TIMESTAMP 获取自1970.01.01 00:00:00以来的毫秒数后,调用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。

代码示例

CREATE TEMPORARY TABLE datagen_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH(
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT datagen_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM datagen_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON lindorm_source.id = lindorm_hbase_dim.id;