本文为您介绍云原生多模数据库Lindorm的宽表作为维表的DDL定义、WITH参数、类型映射和代码示例。

什么是云原生多模数据库Lindorm

Lindorm是面向物联网、互联网、车联网等设计和优化的云原生多模超融合数据库,是日志、监控、账单、广告、社交、出行、风控等场景首选数据库,也是为阿里巴巴核心业务提供支撑的数据库之一。详情请参见什么是云原生多模数据库Lindorm

Lindorm具备以下特性:
  • 支持宽表、时序、文本、对象、流、空间等多种数据的统一访问和融合处理。
  • 兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多种标准接口和无缝集成三方生态工具。

前提条件

  • 已经创建了Lindorm宽表引擎以及数据表,详情请参见创建实例
  • Lindorm集群与Flink全托管集群处于网络连通的环境下,例如在同一个VPC下。

使用限制

仅Flink计算引擎VVR 4.0.8及以上版本支持Lindorm。

注意事项

声明维表时,必须要指名主键,维表JOIN时,ON的条件必须包含所有主键的等值条件。

DDL定义

CREATE TABLE white_list (
 id varchar,
 name varchar,
 age int,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'lindorm', 
 'seedserver' = '<yourSeedServer>',
 'namespace' = '<yourNamespace>',
 'username' = '<yourUsername>',
 'password' = '<yourPassword>',
 'tableName' = '<yourTableName>',
 'columnFamily' = '<yourColumnFamily>'
);

WITH参数

参数 说明 是否必填 备注
seedserver Lindorm服务器的连接地址。 Flink全托管使用Java API的方式访问Lindorm。

Lindorm服务器的连接地址的格式为host:port。详情参见通过HBase Java API连接并使用宽表引擎

namespace Lindorm的命名空间。 无。
username 连接Lindorm所用到的用户名。 无。
password 连接Lindorm所用到的密码。 无。
tableName Lindorm表名。 无。
columnFamily Lindorm表的列族名。 如果创建Lindorm表时未指定列族名,则需要您填写为f。
retryIntervalMs 读取失败时,再次重试读取的时间间隔。 单位为ms,默认值为1000ms。
maxRetryTimes 最大尝试次数。 默认值为5次。
partitionedJoin 是否额外使用JoinKey进行分区。 参数取值如下:
  • true:用JoinKey进行分区,将数据分发到Join节点,提高缓存命中率。
  • false(默认值):不使用JoinKey进行分区。
shuffleEmptyKey 遇到空Key时,是否将Key为空的记录随机向下游Shuffle。 参数取值如下:
  • true:随机往下游做Shuffle。
  • false(默认值):从下游中编号为0的并发开始做Shuffle,即从第一个并发开始。

Cache参数

参数 说明 是否必填 备注
cache 缓存策略。 目前Lindorm支持以下两种缓存策略:
  • None(默认值):无缓存。
  • LRU:只保留最近使用的数据。

    需要配置相关参数:缓存大小(cacheSize)和缓存失效超时时间(cacheTTLMs)。

cacheSize 缓存数据的行数。 默认值为10000行。当选择LRU缓存策略后,可以设置缓存大小。
cacheTTLMs 缓存失效超时时间。 单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。

类型映射

Flink会根据表定义的类型对相应字段的Lindorm数据 (byte[] bytes) 进行解析。解析的方法如下表所示。
Flink SQL类型 对Lindorm数据进行的操作
CHAR org.apache.flink.table.data.StringData::fromBytes
VARCHAR
BOOLEAN com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal
BINARY 直接返回bytes。
VARBINARY
DECIMAL com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal
TINYINT 直接返回bytes[0]。
SMALLINT com.alibaba.lindorm.client.core.utils.Bytes::toShort
INT com.alibaba.lindorm.client.core.utils.Bytes::toInt
BIGINT com.alibaba.lindorm.client.core.utils.Bytes::toLong
FLOAT com.alibaba.lindorm.client.core.utils.Bytes::toFloat
DOUBLE com.alibaba.lindorm.client.core.utils.Bytes::toDouble
DATE com.alibaba.lindorm.client.core.utils.Bytes::toInt 得到自1970.01.01以来的天数。
TIME com.alibaba.lindorm.client.core.utils.Bytes::toInt 得到自当天00:00:00以来的毫秒数。
TIMESTAMP com.alibaba.lindorm.client.core.utils.Bytes::toLong 得到自1970.01.01 00:00:00以来的毫秒数。

代码示例

CREATE TEMPORARY TABLE lindorm_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT lindorm_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM lindorm_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON lindorm_source.id = lindorm_hbase_dim.id;