本文为您介绍云原生多模数据库Lindorm的宽表作为维表的DDL定义、WITH参数、类型映射和代码示例。
什么是云原生多模数据库Lindorm
Lindorm是面向物联网、互联网、车联网等设计和优化的云原生多模超融合数据库,是日志、监控、账单、广告、社交、出行、风控等场景首选数据库,也是为阿里巴巴核心业务提供支撑的数据库之一。详情请参见什么是云原生多模数据库Lindorm。
Lindorm具备以下特性:
- 支持宽表、时序、文本、对象、流、空间等多种数据的统一访问和融合处理。
- 兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多种标准接口和无缝集成三方生态工具。
前提条件
- 已经创建了Lindorm宽表引擎以及数据表,详情请参见创建实例。
- Lindorm集群与Flink全托管集群处于网络连通的环境下,例如在同一个VPC下。
使用限制
仅Flink计算引擎VVR 4.0.8及以上版本支持Lindorm。
注意事项
声明维表时,必须要指名主键,维表JOIN时,ON的条件必须包含所有主键的等值条件。
DDL定义
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
seedserver | Lindorm服务器的连接地址。 | 是 | Flink全托管使用Java API的方式访问Lindorm。
Lindorm服务器的连接地址的格式为host:port。详情参见通过HBase Java API连接并使用宽表引擎。 |
namespace | Lindorm的命名空间。 | 是 | 无。 |
username | 连接Lindorm所用到的用户名。 | 是 | 无。 |
password | 连接Lindorm所用到的密码。 | 是 | 无。 |
tableName | Lindorm表名。 | 是 | 无。 |
columnFamily | Lindorm表的列族名。 | 是 | 如果创建Lindorm表时未指定列族名,则需要您填写为f。 |
retryIntervalMs | 读取失败时,再次重试读取的时间间隔。 | 否 | 单位为ms,默认值为1000ms。 |
maxRetryTimes | 最大尝试次数。 | 否 | 默认值为5次。 |
partitionedJoin | 是否额外使用JoinKey进行分区。 | 否 | 参数取值如下:
|
shuffleEmptyKey | 遇到空Key时,是否将Key为空的记录随机向下游Shuffle。 | 否 | 参数取值如下:
|
Cache参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
cache | 缓存策略。 | 否 | 目前Lindorm支持以下两种缓存策略:
|
cacheSize | 缓存数据的行数。 | 否 | 默认值为10000行。当选择LRU缓存策略后,可以设置缓存大小。 |
cacheTTLMs | 缓存失效超时时间。 | 否 | 单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。 |
类型映射
Flink会根据表定义的类型对相应字段的Lindorm数据 (byte[] bytes) 进行解析。解析的方法如下表所示。
Flink SQL类型 | 对Lindorm数据进行的操作 |
---|---|
CHAR | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | |
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | 直接返回bytes。 |
VARBINARY | |
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | 直接返回bytes[0]。 |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | com.alibaba.lindorm.client.core.utils.Bytes::toInt 得到自1970.01.01以来的天数。 |
TIME | com.alibaba.lindorm.client.core.utils.Bytes::toInt 得到自当天00:00:00以来的毫秒数。 |
TIMESTAMP | com.alibaba.lindorm.client.core.utils.Bytes::toLong 得到自1970.01.01 00:00:00以来的毫秒数。 |
代码示例
CREATE TEMPORARY TABLE lindorm_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
INSERT INTO lindorm_hbase_sink
SELECT lindorm_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM lindorm_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON lindorm_source.id = lindorm_hbase_dim.id;