使用Kudu连接器可以查询、插入和删除存储在Kudu里的数据。

背景信息

本文为您介绍Kudu连接器相关的内容和操作,具体如下:

前提条件

已创建Presto集群和Hadoop集群,且Hadoop集群选择了Kudu服务,详情请参见创建集群

使用限制

  • Kudu版本需要为1.10及以上。
  • Presto集群和Hadoop集群网络互通。
  • Kudu表名称和列名称仅支持小写字母。

修改Kudu连接器配置

修改Kudu连接器配置,详情请参见配置连接器

进入EMR控制台的Presto服务的配置页面,在服务配置区域,单击kudu.properties页签。您可以看到以下参数,参数值请根据您实际情况修改。
参数 描述
kudu.client.master-addresses Kudu主地址列表,多个地址时使用逗号(,)分隔。

支持以下格式:example.com、example.com:7051、192.0.2.1、192.0.2.1:7051、[2001:db8::1]、[2001:db8::1]:7051和2001:db8::1。

默认值为localhost。

kudu.schema-emulation.enabled 是否开启Schema模拟功能。 取值如下:
  • false(默认值):不开启Schema模拟功能。
  • true:开启Schema模拟功能。
注意 您可以在kudu.properties页签,单击右上角的自定义配置添加该配置项,添加配置项详情请参见添加组件参数
kudu.schema-emulation.prefix Schema模拟功能的前缀。
注意kudu.schema emulation.enabled=true时,需要设置此参数。

标准前缀为'presto::`,也支持空前缀。

kudu.client.default-admin-operation-timeout 管理操作(例如,Create Table、Delete Table等)的默认超时。

默认值为30s。

kudu.client.default-operation-timeout 用户操作的默认超时。

默认值为30s。

kudu.client.default-socket-read-timeout 等待来自Socket的数据时使用的默认超时。

默认值为10s。

kudu.client.disable-statistics 是否启用Kudu客户端的统计信息收集功能。取值如下:
  • false(默认值):禁用。
  • true:启用。

数据查询

Apache Kudu不支持Schema,但是通过配置Kudu Connector可以支持Schema功能。

不开启Schema模拟功能(默认行为)

Schema模拟默认是关闭的,此时kudu的表都在default的Schema下。

例如,您可以通过执行SELECT * FROM kudu.default.orders来查询表orders,如果Catalog和 Schema分别指定为kududefault,则查询语句可以简化为SELECT * FROM orders

Kudu的表名称可以包含任意字符,因此需要用双引号(")引用表名称。例如,查询表special.table!,执行语句为SELECT * FROM kudu.default."special.table!"

示例如下:
  1. default的Schema下创建表users
    CREATE TABLE kudu.default.users (
      user_id int WITH (primary_key = true),
      first_name varchar,
      last_name varchar
    ) WITH (
      partition_by_hash_columns = ARRAY['user_id'],
      partition_by_hash_buckets = 2
    );
    说明 创建表时必须指定必要的表信息,例如,主键、列的编码格式或压缩格式、Hash分区或Range分区等。
  2. 查看表信息。
    DESCRIBE kudu.default.users;
    返回如下类似信息。
       Column   |  Type   |                      Extra                      | Comment
    ------------+---------+-------------------------------------------------+---------
     user_id    | integer | primary_key, encoding=auto, compression=default |
     first_name | varchar | nullable, encoding=auto, compression=default    |
     last_name  | varchar | nullable, encoding=auto, compression=default    |
    (3 rows)
  3. 插入数据。
    INSERT INTO kudu.default.users VALUES (1, 'Donald', 'Duck'), (2, 'Mickey', 'Mouse');
  4. 查询数据。
    SELECT * FROM kudu.default.users;

开启Schema模拟功能

如果在连接器的配置文件etc/catalog/kudu.properties里设置了开启Schema模拟功能,则表会根据命名约定被映射到对应的Schema里。
  • 如果设置了kudu.schema-emulation.enabled=truekudu.schema-emulation.prefix=,则映射关系如下表。
    Kudu表名 Presto表名
    orders kudu.default.orders
    part1.part2 kudu.part1.part2
    x.y.z kudu.x."y.z"
    说明 由于Kudu不能直接支持Schema ,Presto会创建一个特殊表$schemas用来管理Schema。
  • 如果设置了kudu.schema-emulation.enabled=truekudu.schema-emulation.prefix=presto::,则映射关系如下表。
    Kudu表名 Presto表名
    orders kudu.default.orders
    part1.part2 kudu.default."part1.part2"
    x.y.z kudu.default."x.y.z"
    presto::part1.part2 kudu.part1.part2
    presto::x.y.z kudu.x."y.z"
    说明 因为Kudu不能直接支持Schema ,所以Presto会创建一个特殊表presto::$schemas用来管理Schema。

数据类型映射

下表为您介绍Presto数据类型和Kudu数据类型的对应情况。
Presto数据类型 Kudu数据类型 备注
BOOLEAN BOOL
TINYINT INT8
SMALLINT INT16
INTEGER INT32
BIGINT INT64
REAL FLOAT
DOUBLE DOUBLE
VARCHAR STRING 从Presto表创建Kudu表执行CREATE TABLE ... AS ...时,会损失可选的VARCHAR最大长度。
VARBINARY BINARY
TIMESTAMP UNIXTIME_MICROS kudu列的µs精度降低到ms精度。
DECIMAL DECIMAL 仅支持Kudu server的1.7.0及后续版本。
DATE 不支持

从Presto表创建Kudu表执行CREATE TABLE ... AS ...时,列的DATE类型会转为STRING类型。

CHAR 不支持
TIME
JSON
TIME WITH TIMEZONE
TIMESTAMP WITH TIME ZONE
INTERVAL YEAR TO MO NTH
INTERVAL DAY TO SEC OND
ARRAY
MAP
IPADDRESS

支持的Presto SQL语法

说明 不支持ALTER SCHEMA ... RENAME TO ...
SQL语法 备注
SELECT
INSERT INTO ... VALUES
INSERT INTO ... SELECT ...
DELETE
DROP SCHEMA 仅在Schema启用时可用。
CREATE SCHEMA 仅在Schema启用时可用。
CREATE TABLE 创建表,详情请参见创建表
CREATE TABLE ... AS
DROP TABLE
ALTER TABLE ... RENAME TO ...
ALTER TABLE ... ADD COLUMN ... 增加列,详情请参见增加列
ALTER TABLE ... RENAME COLUMN ... 仅对非主键可用。
ALTER TABLE ... DROP COLUMN ...
SHOW SCHEMAS
SHOW TABLES
SHOW CREATE TABLE
SHOW COLUMNS FROM
DESCRIBE 作用同SHOW COLUMNS FROM
CALL kudu.system.add_range_partition 增加Range分区,详情请参见Range分区
CALL kudu.system.drop_range_partition 删除Range分区,详情请参见Range分区

创建表

创建表需要指定列、数据类型和分区信息,也可以根据您实际的情况指定列编码格式或压缩格式。创建表示例如下。
CREATE TABLE user_events (
  user_id int WITH (primary_key = true),
  event_name varchar WITH (primary_key = true),
  message varchar,
  details varchar WITH (nullable = true, encoding = 'plain')
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 5,
  number_of_replicas = 3
);

该示例中主键为user_idevent_name,通过user_id列的HASH值划分为5个分区,number_of_replicas设置为3。

创建表时相关参数描述如下:
  • 主键列必须放在前面,分区列必须选取自主键列。
  • number_of_replicas:可选项,该值指定了tablet的副本数,且必须为奇数。如果没有指定该配置,则使用Kudu Master 默认配置的副本数。
  • Kudu支持Hash和Range两种类型的分区。Hash分区根据Hash值分发数据行到多个桶中的一个桶。Range分区使用有序的Range分区键分发数据行,具体的Range分区必须被显式创建。Kudu支持多级分区,一个表至少要有一个Hash或Range分区,但最多有一个Range分区,可以有多个Hash分区。

列属性

除了指定列名称和类型,还可以指定其他列属性。
列属性名 类型 描述
primary_key BOOLEAN 设置为true,则表示使用该列作为主键。

Kudu主键需要满足唯一性约束。当待插入数据行的主键已经存在,再插入与已有相同主键值的行,则会导致更新已有的数据行,详情请参见Primary Key Design

nullable BOOLEAN 设置为true,则表示该列可以取null。
注意 主键列不可为null。
encoding VARCHAR 指定列编码格式以节省存储空间和提高查询性能。

如果没有指定该属性,则Kudu根据列数据类型自动编码。取值为auto、plain、bitshuffle、runlength、prefix、dictionary和group_varint,详情信息请参见Column Encoding

compression VARCHAR 指定列压缩格式。

如果没有指定该属性,Kudu会使用默认压缩格式。取值为default、no、lz4、snappy和zlib,详情信息请参见Column compression

示例如下。
CREATE TABLE mytable (
  name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
  index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
  comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
   ...
) WITH (...);

分区设计

一个表至少要有一个Hash或Range分区,但最多只能有一个Range分区,可以有多个Hash分区。分区信息如下:
  • Hash分区
    • 定义一组分区
      您可以使用表属性partition_by_hash_columns配置分区列,表属性partition_by_hash_buckets配置分区个数,所有的分区列必须是主键列的子集。示例如下。
      CREATE TABLE mytable (
        col1 varchar WITH (primary_key=true),
        col2 varchar WITH (primary_key=true),
        ...
      ) WITH (
        partition_by_hash_columns = ARRAY['col1', 'col2'],
        partition_by_hash_buckets = 4
      )
      说明 该示例定义了(col1,col2)的Hash分区,数据分布到4个分区。
    • 定义两组分区
      如果需要定义两个独立的Hash分区组,可以再设置表属性partition_by_second_hash_columnspartition_by_second_hash_buckets。示例如下。
      CREATE TABLE mytable (
        col1 varchar WITH (primary_key=true),
        col2 varchar WITH (primary_key=true),
        ...
      ) WITH (
        partition_by_hash_columns = ARRAY['col1'],
        partition_by_hash_buckets = 2,
        partition_by_second_hash_columns = ARRAY['col2'],
        partition_by_second_hash_buckets = 3
      )
      说明 该示例定义了两组Hash分区,第一组Hash分区按照列col1对数据行分布到2个分区,第二组Hash分区按照列col2对数据行分布到3个分区,因此该表会有共计2*3=6个分区。
  • Range分区
    kudu表最多可以有一个Range分区,可以使用表属性partition_by_range_columns定义。在创建表的时候,可以使用表属性range_partitions定义分区范围,表属性kudu.system.add_range_partitionkudu.system.drop_range_partition可以对已有的表进行Range分区管理。示例如下。
    CREATE TABLE events (
      rack varchar WITH (primary_key=true),
      machine varchar WITH (primary_key=true),
      event_time timestamp WITH (primary_key=true),
      ...
    ) WITH (
      partition_by_hash_columns = ARRAY['rack'],
      partition_by_hash_buckets = 2,
      partition_by_second_hash_columns = ARRAY['machine'],
      partition_by_second_hash_buckets = 3,
      partition_by_range_columns = ARRAY['event_time'],
      range_partitions = '[{"lower": null, "upper": "2018-01-01T00:00:00"},
                           {"lower": "2018-01-01T00:00:00", "upper": null}]'
    )
    说明 该示例定义了二组Hash分区和一个对event_time列进行Range分区的表,Range分区范围由2018-01-01T00:00:00进行分割。
  • 管理Range分区

    对于已经存在的表,有2个存储过程可以增加和删除Range分区。

    分区操作示例如下:
    • 增加一个Range分区
      CALL kudu.system.add_range_partition(<your_schema_name>, <your_table_name>, <range_partition_as_json_string>)
    • 删除一个Range分区
      CALL kudu.system.drop_range_partition(<your_schema_name>, <your_table_name>, <range_partition_as_json_string>)
    参数 描述
    <your_schema_name> 表所在的Schema。
    <your_table_name> 表名称。
    <range_partition_as_json_string> JSON格式表示的Range分区的上下边界,格式为'{"lower": <value>, "upper": <value>}',如果分区有多个列,则格式为'{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}',上下边界具体的取值形式由列数据类型决定。数据类型和JSON字符串类型对应关系如下:
    • BIGINT:‘{“lower”: 0, “upper”: 1000000}’
    • SMALLINT:‘{“lower”: 10, “upper”: null}’
    • VARCHAR:‘{“lower”: “A”, “upper”: “M”}’
    • TIMESTAMP:‘{“lower”: “2018-02-01T00:00:00.000”, “upper”: “2018-02-01T12:00:00.000”}’
    • BOOLEAN:‘{“lower”: false, “upper”: true}’
    • VARBINARY:Base64编码的字符串值
    说明 设置为null时,则表示无限边界。
    示例如下。
    CALL kudu.system.add_range_partition('myschema', 'events', '{"lower": "2018-01-01", "upper": "2018-06-01"}')
    说明 该示例myschema里的表events增加了一个Range分区,该分区的下界是2018-01-01,即精确值是2018-01-01T00:00:00.000,分区的上界是2018-06-01
    您可以使用SQL语句SHOW CREATE TABLE查询已经存在的Range分区,分区信息通过表属性range_partitions展示。

增加列

您可以使用SQL语句ALTER TABLE ... ADD COLUMN ...为已经存在的表增加数据列,还可以使用列属性来增加数据列。列属性的详细信息,可以在创建表模块中查看。
ALTER TABLE mytable ADD COLUMN extraInfo varchar WITH (nullable = true, encoding = 'plain')