Kudu连接器
使用Kudu连接器可以查询、插入和删除存储在Kudu里的数据。
背景信息
本文为您介绍Kudu连接器相关的内容和操作,具体如下:
前提条件
已创建Trino集群和Hadoop集群,且Hadoop集群选择了Kudu服务,详情请参见创建集群。
使用限制
Kudu版本需要为1.10及以上。
Trino集群和Hadoop集群网络互通。
Kudu表名称和列名称仅支持小写字母。
修改Kudu连接器配置
修改Kudu连接器配置,详情请参见配置连接器。
进入EMR控制台的Trino服务的配置页面,在服务配置区域,单击kudu.properties页签。您可以看到以下参数,参数值请根据您实际情况修改。
参数 | 描述 |
kudu.client.master-addresses | Kudu主地址列表,多个地址时使用逗号(,)分隔。 支持以下格式:example.com、example.com:7051、192.0.2.1、192.0.2.1:7051、[2001:db8::1]、[2001:db8::1]:7051和2001:db8::1。 默认值为localhost。 说明 为了确保能够对Kudu表进行写入和查询操作,请将默认的localhost更改为Kudu master节点的IP地址或主机名,例如:master-1-1。 |
kudu.schema-emulation.enabled | 是否开启Schema模拟功能。 取值如下:
重要 您可以在kudu.properties页签,单击新增配置项添加该配置项,添加配置项详情请参见添加配置项。 |
kudu.schema-emulation.prefix | Schema模拟功能的前缀。 重要 当 标准前缀为 |
kudu.client.default-admin-operation-timeout | 管理操作(例如,Create Table、Delete Table等)的默认超时。 默认值为30s。 |
kudu.client.default-operation-timeout | 用户操作的默认超时。 默认值为30s。 |
kudu.client.default-socket-read-timeout | 等待来自Socket的数据时使用的默认超时。 默认值为10s。 |
kudu.client.disable-statistics | 是否启用Kudu客户端的统计信息收集功能。取值如下:
|
数据查询
Apache Kudu不支持Schema,但是通过配置Kudu Connector可以支持Schema功能。
不开启Schema模拟功能(默认行为)
Schema模拟默认是关闭的,此时kudu的表都在default
的Schema下。
例如,您可以通过执行SELECT * FROM kudu.default.orders
来查询表orders
,如果Catalog和 Schema分别指定为kudu
和default
,则查询语句可以简化为SELECT * FROM orders
。
Kudu的表名称可以包含任意字符,因此需要用双引号(")引用表名称。例如,查询表special.table!
,执行语句为SELECT * FROM kudu.default."special.table!"
。
示例如下:
在
default
的Schema下创建表users
。CREATE TABLE kudu.default.users ( user_id int WITH (primary_key = true), first_name varchar, last_name varchar ) WITH ( partition_by_hash_columns = ARRAY['user_id'], partition_by_hash_buckets = 2 );
说明创建表时必须指定必要的表信息,例如,主键、列的编码格式或压缩格式、Hash分区或Range分区等。
查看表信息。
DESCRIBE kudu.default.users;
返回如下类似信息。
Column | Type | Extra | Comment ------------+---------+-------------------------------------------------+--------- user_id | integer | primary_key, encoding=auto, compression=default | first_name | varchar | nullable, encoding=auto, compression=default | last_name | varchar | nullable, encoding=auto, compression=default | (3 rows)
插入数据。
INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
查询数据。
SELECT * FROM kudu.default.users;
开启Schema模拟功能
如果在连接器的配置文件etc/catalog/kudu.properties里设置了开启Schema模拟功能,则表会根据命名约定被映射到对应的Schema里。
如果设置了
kudu.schema-emulation.enabled=true
和kudu.schema-emulation.prefix=
,则映射关系如下表。Kudu表名
Presto表名
orders
kudu.default.orders
part1.part2
kudu.part1.part2
x.y.z
kudu.x."y.z"
说明由于Kudu不能直接支持Schema ,Presto会创建一个特殊表
$schemas
用来管理Schema。如果设置了
kudu.schema-emulation.enabled=true
和kudu.schema-emulation.prefix=presto::
,则映射关系如下表。Kudu表名
Presto表名
orders
kudu.default.orders
part1.part2
kudu.default."part1.part2"
x.y.z
kudu.default."x.y.z"
presto::part1.part2
kudu.part1.part2
presto::x.y.z
kudu.x."y.z"
说明因为Kudu不能直接支持Schema ,所以Presto会创建一个特殊表
presto::$schemas
用来管理Schema。
数据类型映射
下表为您介绍Presto数据类型和Kudu数据类型的对应情况。
Presto数据类型 | Kudu数据类型 | 备注 |
BOOLEAN | BOOL | 无 |
TINYINT | INT8 | |
SMALLINT | INT16 | |
INTEGER | INT32 | |
BIGINT | INT64 | |
REAL | FLOAT | |
DOUBLE | DOUBLE | |
VARCHAR | STRING | 从Presto表创建Kudu表执行 |
VARBINARY | BINARY | |
TIMESTAMP | UNIXTIME_MICROS | kudu列的µs精度降低到ms精度。 |
DECIMAL | DECIMAL | 仅支持Kudu server的1.7.0及后续版本。 |
DATE | 无 | 不支持 从Presto表创建Kudu表执行 |
CHAR | 无 | 不支持 |
TIME | ||
JSON | ||
TIME WITH TIMEZONE | ||
TIMESTAMP WITH TIME ZONE | ||
INTERVAL YEAR TO MO NTH | ||
INTERVAL DAY TO SEC OND | ||
ARRAY | ||
MAP | ||
IPADDRESS |
支持的Presto SQL语法
不支持ALTER SCHEMA ... RENAME TO ...
。
SQL语法 | 备注 |
| 无 |
| 无 |
| 无 |
| 无 |
| 仅在Schema启用时可用。 |
| 仅在Schema启用时可用。 |
| 创建表,详情请参见创建表。 |
| 无 |
| 无 |
| 无 |
| 增加列,详情请参见增加列。 |
| 仅对非主键可用。 |
| |
| 无 |
| 无 |
| 无 |
| 无 |
| 作用同 |
| 增加Range分区,详情请参见Range分区。 |
| 删除Range分区,详情请参见Range分区。 |
创建表
创建表需要指定列、数据类型和分区信息,也可以根据您实际的情况指定列编码格式或压缩格式。创建表示例如下。
CREATE TABLE user_events (
user_id int WITH (primary_key = true),
event_name varchar WITH (primary_key = true),
message varchar,
details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 5,
number_of_replicas = 3
);
该示例中主键为user_id
和event_name
,通过user_id
列的HASH值划分为5个分区,number_of_replicas
设置为3。
创建表时相关参数描述如下:
主键列必须放在前面,分区列必须选取自主键列。
number_of_replicas
:可选项,该值指定了tablet的副本数,且必须为奇数。如果没有指定该配置,则使用Kudu Master 默认配置的副本数。Kudu支持Hash和Range两种类型的分区。Hash分区根据Hash值分发数据行到多个桶中的一个桶。Range分区使用有序的Range分区键分发数据行,具体的Range分区必须被显式创建。Kudu支持多级分区,一个表至少要有一个Hash或Range分区,但最多有一个Range分区,可以有多个Hash分区。
列属性
除了指定列名称和类型,还可以指定其他列属性。
列属性名 | 类型 | 描述 |
primary_key | BOOLEAN | 设置为true,则表示使用该列作为主键。 Kudu主键需要满足唯一性约束。当待插入数据行的主键已经存在,再插入与已有相同主键值的行,则会导致更新已有的数据行,详情请参见Primary Key Design。 |
nullable | BOOLEAN | 设置为true,则表示该列可以取null。 重要 主键列不可为null。 |
encoding | VARCHAR | 指定列编码格式以节省存储空间和提高查询性能。 如果没有指定该属性,则Kudu根据列数据类型自动编码。取值为auto、plain、bitshuffle、runlength、prefix、dictionary和group_varint,详情信息请参见Column Encoding。 |
compression | VARCHAR | 指定列压缩格式。 如果没有指定该属性,Kudu会使用默认压缩格式。取值为default、no、lz4、snappy和zlib,详情信息请参见Column compression。 |
示例如下。
CREATE TABLE mytable (
name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
...
) WITH (...);
分区设计
一个表至少要有一个Hash或Range分区,但最多只能有一个Range分区,可以有多个Hash分区。分区信息如下:
Hash分区
定义一组分区
您可以使用表属性partition_by_hash_columns配置分区列,表属性
partition_by_hash_buckets
配置分区个数,所有的分区列必须是主键列的子集。示例如下。CREATE TABLE mytable ( col1 varchar WITH (primary_key=true), col2 varchar WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['col1', 'col2'], partition_by_hash_buckets = 4 )
说明该示例定义了(col1,col2)的Hash分区,数据分布到4个分区。
定义两组分区
如果需要定义两个独立的Hash分区组,可以再设置表属性
partition_by_second_hash_columns
和partition_by_second_hash_buckets
。示例如下。CREATE TABLE mytable ( col1 varchar WITH (primary_key=true), col2 varchar WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['col1'], partition_by_hash_buckets = 2, partition_by_second_hash_columns = ARRAY['col2'], partition_by_second_hash_buckets = 3 )
说明该示例定义了两组Hash分区,第一组Hash分区按照列col1对数据行分布到2个分区,第二组Hash分区按照列col2对数据行分布到3个分区,因此该表会有共计2*3=6个分区。
Range分区
kudu表最多可以有一个Range分区,可以使用表属性
partition_by_range_columns
定义。在创建表的时候,可以使用表属性range_partitions
定义分区范围,表属性kudu.system.add_range_partition
和kudu.system.drop_range_partition
可以对已有的表进行Range分区管理。示例如下。CREATE TABLE events ( rack varchar WITH (primary_key=true), machine varchar WITH (primary_key=true), event_time timestamp WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['rack'], partition_by_hash_buckets = 2, partition_by_second_hash_columns = ARRAY['machine'], partition_by_second_hash_buckets = 3, partition_by_range_columns = ARRAY['event_time'], range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"}, {"lower": "2018-01-01T00:00:00", "upper": null}]' )
说明该示例定义了二组Hash分区和一个对
event_time
列进行Range分区的表,Range分区范围由2018-01-01T00:00:00
进行分割。管理Range分区
对于已经存在的表,有2个存储过程可以增加和删除Range分区。
分区操作示例如下:
增加一个Range分区
CALL kudu.system.add_range_partition(<YOUR_SCHEMA_NAME>, <YOUR_TABLE_NAME>, <range_partition_as_json_string>)
删除一个Range分区
CALL kudu.system.drop_range_partition(<YOUR_SCHEMA_NAME>, <YOUR_TABLE_NAME>, <range_partition_as_json_string>)
参数
描述
<YOUR_SCHEMA_NAME>
表所在的Schema。
<YOUR_TABLE_NAME>
表名称。
<range_partition_as_json_string>
JSON格式表示的Range分区的上下边界,格式为
'{"lower": <value>, "upper": <value>}'
,如果分区有多个列,则格式为'{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}'
,上下边界具体的取值形式由列数据类型决定。数据类型和JSON字符串类型对应关系如下:BIGINT:
‘{“lower”: 0, “upper”: 1000000}’
SMALLINT:
‘{“lower”: 10, “upper”: null}’
VARCHAR:
‘{“lower”: “A”, “upper”: “M”}’
TIMESTAMP:
‘{“lower”: “2018-02-01T00:00:00.000”, “upper”: “2018-02-01T12:00:00.000”}’
BOOLEAN:
‘{“lower”: false, “upper”: true}’
VARBINARY:Base64编码的字符串值
说明设置为null时,则表示无限边界。
示例如下。
CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')
说明该示例
myschema
里的表events
增加了一个Range分区,该分区的下界是2018-01-01
,即精确值是2018-01-01T00:00:00.000
,分区的上界是2018-06-01
。您可以使用SQL语句
SHOW CREATE TABLE
查询已经存在的Range分区,分区信息通过表属性range_partitions
展示。
增加列
您可以使用SQL语句ALTER TABLE ... ADD COLUMN ...
为已经存在的表增加数据列,还可以使用列属性来增加数据列。列属性的详细信息,可以在创建表模块中查看。
ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')