本文为您介绍如何创建实时计算云数据库HBase版维表。

注意
  • Blink-3.3.0以下版本仅支持HBase企业标准版。
  • Blink-3.3.0及以上版本同时支持HBase企业标准版和HBase性能增强版。
  • Blink-3.5.0及以上版本支持HBase写入主备切换。
  • 云数据库HBase版维表的JOIN语法详情请参见维表JOIN语句
  • 实时计算HBase维表不支持自建的开源HBase。

示例

  • HBase企业标准版示例代码如下。
    CREATE TABLE hbase (
       `key` varchar, 
       `name` varchar,
        PRIMARY KEY (`key`), --HBase中的rowkey字段。
        PERIOD FOR SYSTEM_TIME --维表标识。
       ) with (
        TYPE = 'cloudhbase',
        zkQuorum = '<yourzkQuorum>',
        columnFamily = '<yourColumnFamilyName>',
        tableName = '<yourTableName>'
    );
  • HBase性能增强版示例代码如下。
    CREATE TABLE hbase (
       `key` varchar, 
       `name` varchar,
        PRIMARY KEY (`key`), --HBase中的rowkey字段。
        PERIOD FOR SYSTEM_TIME --维表标识。
       ) with (
        TYPE = 'cloudhbase',
        endPoint = '<host:port>',--HBase增强版的Java API访问地址。
        userName  = 'root', --HBase用户名。
        password = 'root', --HBase密码。
        columnFamily = '<yourColumnFamilyName>',
        tableName = '<yourTableName>'
    );
  • Blink-3.5.0以上HBase性能增强版示例代码如下。
    create table liuxd_user_behavior_test_front (
        row_key varchar,
        from_topic varchar,
        origin_data varchar,
        record_create_time varchar,
        primary key (row_key)
    ) with (
        type = 'cloudhbase',
        zkQuorum = '<host:port>', --HBase增强版的Java API访问地址。
        userName  = 'root', --用户名。
        password = 'root', --密码。
        columnFamily = '<yourColumnFamily>',
        tableName = '<yourTableName>',
        batchSize = '500'
    )
  • Blink-3.5.0以上支持HBase写入主备切换示例代码如下。
    create table liuxd_user_behavior_test_front (
        row_key varchar,
        from_topic varchar,
        origin_data varchar,
        record_create_time varchar,
        primary key (row_key)
    ) with (
        type = 'cloudhbase',
        zkQuorum = '<host:port>', --HBase高可用访问地址。
        haClusterID = 'ha-xxx', --HBase高可用实例ID。
        userName  = 'root', --用户名。
        password = 'root', --密码。
        columnFamily = '<yourColumnFamily>',
        tableName = '<yourTableName>',
        batchSize = '500' 
    )
说明
  • 在声明维表时,必须要指名主键。
  • 在维表进行JOIN时,ON的条件必须包含所有主键的等值条件。示例中HBase中的主键是row_key
  • HBase企业标准版和HBase性能增强版DDL的区别为连接参数不同:
    • HBase企业标准版使用连接参数zkQuorum
    • HBase性能增强版使用连接参数endPoint
    • Blink-3.5.0以上标准版和增强版使用连接参数统一为zkQuorum

参数

参数 注释说明 是否必填 备注
zkQuorum HBase集群配置的zk地址,是以逗号(,)分隔的主机列表。 可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。
说明 仅在HBase企业标准版中生效。
zkNodeParent 集群配置在zk上的路径。 可以在hbase-site.xm文件中查看hbase.zookeeper.quorum相关配置。
说明 仅在HBase企业标准版中生效。
endPoint HBase地域名称。 可在购买的HBase实例控制台中获取。
说明 仅在HBase性能增强版中生效。
userName 用户名。 可选。
说明 仅在HBase性能增强版中生效。
password 密码。 可选。
说明 仅在HBase性能增强版中生效。
tableName HBase表名。 无。
columnFamily 列族名。 目前只支持插入同一列族。
maxRetryTimes 最大尝试次数。 默认10次。
partitionedJoin 是否使用joinKey进行分区。 可选,默认为false。在设置partitionedJoin为true时,则使用joinKey进行分区,将数据分发到各JOIN节点,提高缓存命中率。
shuffleEmptyKey 是否将上游EMPTY KEY随机发送到下游节点。 可选,默认为true。参数取值如下:
  • true:如果上游有多个EMPTY KEY,会将所有EMPTY KEY随机发送到各个JOIN节点。
  • false:如果上游有多个EMPTY KEY,会将所有EMPTY KEY发送至一个JOIN节点。
说明 shuffleEmptyKeypartitionedJoin生效后才能使用。

CACHE参数

参数 注释说明 是否必填 备注
cache 缓存策略。 默认NoneLRUALL可选。
cacheSize 缓存大小。 当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存超时时间,单位为毫秒。 当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList 缓存策略选择ALL时启用。更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 可选,默认为空。自定义输入格式如下。
2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00
使用逗号(,)分隔多个黑名单,使用箭头(->)分割黑名单的起始和结束时间。
cacheScanLimit 缓存策略选择ALL时启用。读取全量HBase数据,服务端一次RPC返回给客户端的行数。 可选,默认为100条。
目前RDS和DRDS提供如下三种缓存策略:
  • None:无缓存。
  • LRU:最近使用策略缓存。需要配置参数:缓存大小(cacheSize)和缓存超时时间(cacheTTLMs)。
  • ALL:全量缓存策略。在Job运行前会将远程表中所有数据加载到内存中,之后所有的维表查询都会通过Cache进行。Cache无法命中则KEY不存在,并在缓存过期后重新加载一遍全量缓存。全量缓存策略适合远程表数据量小且miss key(源表数据和维表join时,on里面的条件关联不上)特别多的场景。全量缓存相关配置:缓存更新间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。
    说明 因为系统会异步加载维表数据,所以在使用CACHE ALL的时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

代码示例

包含HBase维表的实时计算作业代码示例如下。
create table source (
   id   TINYINT,
   name BIGINT
) with (
   type = 'random'
);

create table dim (
  id   TINYINT,
score BIGINT
  primary key(id),
  PERIOD FOR SYSTEM_TIME
)with(
   type = 'cloudhbase',
   zkQuorum = '<yourzkQuorum>',
   columnFamily = '<yourColumnFamilyName>',
   tableName = '<yourTableName>'
);

CREATE table result_infor(
  id BIGINT,
  score BIGINT
)with(
   type='rds'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.score
FROM source as t
JOIN dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.id = w.id;