使用Kudu连接器可以查询、插入和删除存储在Kudu里的数据。
背景信息
前提条件
已创建Presto集群和Hadoop集群,且Hadoop集群选择了Kudu服务,详情请参见创建集群。
使用限制
- Kudu版本需要为1.10及以上。
- Presto集群和Hadoop集群网络互通。
- Kudu表名称和列名称仅支持小写字母。
修改Kudu连接器配置
修改Kudu连接器配置,详情请参见配置连接器。
参数 | 描述 |
---|---|
kudu.client.master-addresses | Kudu主地址列表,多个地址时使用逗号(,)分隔。
支持以下格式:example.com、example.com:7051、192.0.2.1、192.0.2.1:7051、[2001:db8::1]、[2001:db8::1]:7051和2001:db8::1。 默认值为localhost。 |
kudu.schema-emulation.enabled | 是否开启Schema模拟功能。 取值如下:
注意 您可以在kudu.properties页签,单击右上角的自定义配置添加该配置项,添加配置项详情请参见添加组件参数。
|
kudu.schema-emulation.prefix | Schema模拟功能的前缀。
注意 当
kudu.schema emulation.enabled=true 时,需要设置此参数。
标准前缀为 |
kudu.client.default-admin-operation-timeout | 管理操作(例如,Create Table、Delete Table等)的默认超时。
默认值为30s。 |
kudu.client.default-operation-timeout | 用户操作的默认超时。
默认值为30s。 |
kudu.client.default-socket-read-timeout | 等待来自Socket的数据时使用的默认超时。
默认值为10s。 |
kudu.client.disable-statistics | 是否启用Kudu客户端的统计信息收集功能。取值如下:
|
数据查询
Apache Kudu不支持Schema,但是通过配置Kudu Connector可以支持Schema功能。
不开启Schema模拟功能(默认行为)
Schema模拟默认是关闭的,此时kudu的表都在default
的Schema下。
例如,您可以通过执行SELECT * FROM kudu.default.orders
来查询表orders
,如果Catalog和 Schema分别指定为kudu
和default
,则查询语句可以简化为SELECT * FROM orders
。
Kudu的表名称可以包含任意字符,因此需要用双引号(")引用表名称。例如,查询表special.table!
,执行语句为SELECT * FROM kudu.default."special.table!"
。
- 在
default
的Schema下创建表users
。CREATE TABLE kudu.default.users ( user_id int WITH (primary_key = true), first_name varchar, last_name varchar ) WITH ( partition_by_hash_columns = ARRAY['user_id'], partition_by_hash_buckets = 2 );
说明 创建表时必须指定必要的表信息,例如,主键、列的编码格式或压缩格式、Hash分区或Range分区等。 - 查看表信息。
DESCRIBE kudu.default.users;
返回如下类似信息。Column | Type | Extra | Comment ------------+---------+-------------------------------------------------+--------- user_id | integer | primary_key, encoding=auto, compression=default | first_name | varchar | nullable, encoding=auto, compression=default | last_name | varchar | nullable, encoding=auto, compression=default | (3 rows)
- 插入数据。
INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
- 查询数据。
SELECT * FROM kudu.default.users;
开启Schema模拟功能
- 如果设置了
kudu.schema-emulation.enabled=true
和kudu.schema-emulation.prefix=
,则映射关系如下表。Kudu表名 Presto表名 orders
kudu.default.orders
part1.part2
kudu.part1.part2
x.y.z
kudu.x."y.z"
说明 由于Kudu不能直接支持Schema ,Presto会创建一个特殊表$schemas
用来管理Schema。 - 如果设置了
kudu.schema-emulation.enabled=true
和kudu.schema-emulation.prefix=presto::
,则映射关系如下表。Kudu表名 Presto表名 orders
kudu.default.orders
part1.part2
kudu.default."part1.part2"
x.y.z
kudu.default."x.y.z"
presto::part1.part2
kudu.part1.part2
presto::x.y.z
kudu.x."y.z"
说明 因为Kudu不能直接支持Schema ,所以Presto会创建一个特殊表presto::$schemas
用来管理Schema。
数据类型映射
Presto数据类型 | Kudu数据类型 | 备注 |
---|---|---|
BOOLEAN | BOOL | 无 |
TINYINT | INT8 | |
SMALLINT | INT16 | |
INTEGER | INT32 | |
BIGINT | INT64 | |
REAL | FLOAT | |
DOUBLE | DOUBLE | |
VARCHAR | STRING | 从Presto表创建Kudu表执行CREATE TABLE ... AS ... 时,会损失可选的VARCHAR最大长度。
|
VARBINARY | BINARY | |
TIMESTAMP | UNIXTIME_MICROS | kudu列的µs精度降低到ms精度。 |
DECIMAL | DECIMAL | 仅支持Kudu server的1.7.0及后续版本。 |
DATE | 无 | 不支持
从Presto表创建Kudu表执行 |
CHAR | 无 | 不支持 |
TIME | ||
JSON | ||
TIME WITH TIMEZONE | ||
TIMESTAMP WITH TIME ZONE | ||
INTERVAL YEAR TO MO NTH | ||
INTERVAL DAY TO SEC OND | ||
ARRAY | ||
MAP | ||
IPADDRESS |
支持的Presto SQL语法
ALTER SCHEMA ... RENAME TO ...
。
SQL语法 | 备注 |
---|---|
SELECT |
无 |
INSERT INTO ... VALUES |
无 |
INSERT INTO ... SELECT ... |
无 |
DELETE |
无 |
DROP SCHEMA |
仅在Schema启用时可用。 |
CREATE SCHEMA |
仅在Schema启用时可用。 |
CREATE TABLE |
创建表,详情请参见创建表。 |
CREATE TABLE ... AS |
无 |
DROP TABLE |
无 |
ALTER TABLE ... RENAME TO ... |
无 |
ALTER TABLE ... ADD COLUMN ... |
增加列,详情请参见增加列。 |
ALTER TABLE ... RENAME COLUMN ... |
仅对非主键可用。 |
ALTER TABLE ... DROP COLUMN ... |
|
SHOW SCHEMAS |
无 |
SHOW TABLES |
无 |
SHOW CREATE TABLE |
无 |
SHOW COLUMNS FROM |
无 |
DESCRIBE |
作用同SHOW COLUMNS FROM 。
|
CALL kudu.system.add_range_partition |
增加Range分区,详情请参见Range分区。 |
CALL kudu.system.drop_range_partition |
删除Range分区,详情请参见Range分区。 |
创建表
CREATE TABLE user_events (
user_id int WITH (primary_key = true),
event_name varchar WITH (primary_key = true),
message varchar,
details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 5,
number_of_replicas = 3
);
该示例中主键为user_id
和event_name
,通过user_id
列的HASH值划分为5个分区,number_of_replicas
设置为3。
- 主键列必须放在前面,分区列必须选取自主键列。
number_of_replicas
:可选项,该值指定了tablet的副本数,且必须为奇数。如果没有指定该配置,则使用Kudu Master 默认配置的副本数。- Kudu支持Hash和Range两种类型的分区。Hash分区根据Hash值分发数据行到多个桶中的一个桶。Range分区使用有序的Range分区键分发数据行,具体的Range分区必须被显式创建。Kudu支持多级分区,一个表至少要有一个Hash或Range分区,但最多有一个Range分区,可以有多个Hash分区。
列属性
列属性名 | 类型 | 描述 |
---|---|---|
primary_key | BOOLEAN | 设置为true,则表示使用该列作为主键。
Kudu主键需要满足唯一性约束。当待插入数据行的主键已经存在,再插入与已有相同主键值的行,则会导致更新已有的数据行,详情请参见Primary Key Design。 |
nullable | BOOLEAN | 设置为true,则表示该列可以取null。
注意 主键列不可为null。
|
encoding | VARCHAR | 指定列编码格式以节省存储空间和提高查询性能。
如果没有指定该属性,则Kudu根据列数据类型自动编码。取值为auto、plain、bitshuffle、runlength、prefix、dictionary和group_varint,详情信息请参见Column Encoding。 |
compression | VARCHAR | 指定列压缩格式。
如果没有指定该属性,Kudu会使用默认压缩格式。取值为default、no、lz4、snappy和zlib,详情信息请参见Column compression。 |
CREATE TABLE mytable (
name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
...
) WITH (...);
分区设计
- Hash分区
- 定义一组分区
您可以使用表属性partition_by_hash_columns配置分区列,表属性
partition_by_hash_buckets
配置分区个数,所有的分区列必须是主键列的子集。示例如下。CREATE TABLE mytable ( col1 varchar WITH (primary_key=true), col2 varchar WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['col1', 'col2'], partition_by_hash_buckets = 4 )
说明 该示例定义了(col1,col2)的Hash分区,数据分布到4个分区。 - 定义两组分区
如果需要定义两个独立的Hash分区组,可以再设置表属性
partition_by_second_hash_columns
和partition_by_second_hash_buckets
。示例如下。CREATE TABLE mytable ( col1 varchar WITH (primary_key=true), col2 varchar WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['col1'], partition_by_hash_buckets = 2, partition_by_second_hash_columns = ARRAY['col2'], partition_by_second_hash_buckets = 3 )
说明 该示例定义了两组Hash分区,第一组Hash分区按照列col1对数据行分布到2个分区,第二组Hash分区按照列col2对数据行分布到3个分区,因此该表会有共计2*3=6个分区。
- 定义一组分区
- Range分区
kudu表最多可以有一个Range分区,可以使用表属性
partition_by_range_columns
定义。在创建表的时候,可以使用表属性range_partitions
定义分区范围,表属性kudu.system.add_range_partition
和kudu.system.drop_range_partition
可以对已有的表进行Range分区管理。示例如下。CREATE TABLE events ( rack varchar WITH (primary_key=true), machine varchar WITH (primary_key=true), event_time timestamp WITH (primary_key=true), ... ) WITH ( partition_by_hash_columns = ARRAY['rack'], partition_by_hash_buckets = 2, partition_by_second_hash_columns = ARRAY['machine'], partition_by_second_hash_buckets = 3, partition_by_range_columns = ARRAY['event_time'], range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"}, {"lower": "2018-01-01T00:00:00", "upper": null}]' )
说明 该示例定义了二组Hash分区和一个对event_time
列进行Range分区的表,Range分区范围由2018-01-01T00:00:00
进行分割。 - 管理Range分区
对于已经存在的表,有2个存储过程可以增加和删除Range分区。
分区操作示例如下:- 增加一个Range分区
CALL kudu.system.add_range_partition(<your_schema_name>, <your_table_name>, <range_partition_as_json_string>)
- 删除一个Range分区
CALL kudu.system.drop_range_partition(<your_schema_name>, <your_table_name>, <range_partition_as_json_string>)
参数 描述 <your_schema_name>
表所在的Schema。 <your_table_name>
表名称。 <range_partition_as_json_string>
JSON格式表示的Range分区的上下边界,格式为 '{"lower": <value>, "upper": <value>}'
,如果分区有多个列,则格式为'{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}'
,上下边界具体的取值形式由列数据类型决定。数据类型和JSON字符串类型对应关系如下:- BIGINT:
‘{“lower”: 0, “upper”: 1000000}’
- SMALLINT:
‘{“lower”: 10, “upper”: null}’
- VARCHAR:
‘{“lower”: “A”, “upper”: “M”}’
- TIMESTAMP:
‘{“lower”: “2018-02-01T00:00:00.000”, “upper”: “2018-02-01T12:00:00.000”}’
- BOOLEAN:
‘{“lower”: false, “upper”: true}’
- VARBINARY:Base64编码的字符串值
说明 设置为null时,则表示无限边界。CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')
说明 该示例myschema
里的表events
增加了一个Range分区,该分区的下界是2018-01-01
,即精确值是2018-01-01T00:00:00.000
,分区的上界是2018-06-01
。SHOW CREATE TABLE
查询已经存在的Range分区,分区信息通过表属性range_partitions
展示。 - 增加一个Range分区
增加列
ALTER TABLE ... ADD COLUMN ...
为已经存在的表增加数据列,还可以使用列属性来增加数据列。列属性的详细信息,可以在创建表模块中查看。ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')