本文为您介绍如何创建实时计算MaxCompute维表。

说明
  • 实时计算 2.1.1及以上版本支持MaxCompute维表功能。
  • 维表的Query语法参见: 维表JOIN语句
  • 使用MaxCompute表作为维表,需要先赋权读权限给MaxCompute账号。

语法示例

实时计算支持使用MaxCompute作为维表。MaxCompute维表声明示例如下。
CREATE TABLE white_list (
  id varchar,
  name varchar,
  age int,
  PRIMARY KEY (id), 
  PERIOD FOR SYSTEM_TIME --定义了维表的标识。
) WITH (
  type = 'odps',
  endPoint = 'http://service.cn.maxcompute.aliyun-inc.com/api',
  project = '<projectName>',
  tableName = '<tableName>',
  accessId = '<yourAccessKeyId>',
  accessKey = '<yourAccessKeySecret>',
  `partition` = 'ds=2018****',
  cache = 'ALL'
)           
说明
  • 声明维表时,必须要指名主键,维表JOIN的时候,ON的条件必须包含所有主键的等值条件。
  • MaxCompute维表主键必须有唯一性,否则会被去重。
  • parition是关键字,需要使用反引号注释`partition`
  • 如果是分区表,目前不支持将分区列写入到DDL定义中。

WITH参数

参数 参数说明 是否必选 备注
endPoint MaxCompute服务地址 参见开通MaxCompute服务的Region和服务连接对照表
tunnelEndpoint MaxCompute Tunnel服务的连接地址 可选,参见开通MaxCompute服务的Region和服务连接对照表
说明 VPC环境下为必填。
project MaxCompute项目名称 无。
tableName 表名 无。
accessId AccessKey ID 无。
accessKey AccessKey Secret 无。
partition 分区名
  • 固定分区
    • 只存在一个分区MaxCompute表

      例如,如果只存在1个分区列ds,则`partition` = 'ds=20180905' 表示读ds=20180905分区的数据。

      ,值
    • 存在多个分区的MaxCompute表

      例如,如果存在2个分区列dshh,则`partition`='ds=20180905,hh=*'表示读ds=20180905分区的数据。

      说明 分区过滤时需要声明所有分区的值。例如,上述示例中,只声明`partition` = 'ds=20180905',则不会读取任何分区。
  • 非固定分区
    • 实时计算2.2.0及以上版本支持`partition` = 'max_pt()' 功能, 即每次加载所有分区列表中字典序最大的分区。
    • 实时计算3.2.2及以上版本支持`partition` = 'max_pt_with_done()'功能,即每次加载所有分区列表中字典序最大且伴随有.done的分区。
maxRowCount 可加载的最大表格数量 默认为100000。
说明 如果您的数据超过100000,需要设置maxRowCount参数。建议设定值比实际值大。

CACHE参数

参数 参数说明 备注
cache 缓存策略。 仅支持ALL策略。
cacheSize 缓存大小。 可以设置缓存大小,MaxCompute默认缓存值为100000行。
cacheTTLMs 缓存超时时间,单位为毫秒。 当选择 ALL 策略,则为缓存Reload的间隔时间,默认为不重新加载。
cacheReloadTimeBlackList ALL Cache时启用,更新时间黑名单,防止在此时间内做cache更新(如双11场景)。 默认为空,格式为2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00。用逗号,来分隔多个黑名单,用箭头->来分割黑名单的起始结束时间。
cacheScanLimit ALL Cache 时启用,读取全量HBase数据,服务端一次RPC返回给客户端的行数。 默认为100条。
说明
  • 目前MaxCompute维表只支持ALL全量缓存策略,必须显式声明。
  • cache ALL: 全量缓存策略,即在Job运行前,会将远程表中所有数据读取至内存中,之后所有的维表查询进行缓存操作,缓存命中不到则认为不存在,并在缓存过期后重新加载一遍全量缓存。全量缓存策略适合远程表不大、miss key特别多的场景。全量缓存策略需要配置缓存更新间隔(cacheTTLMs)和更新时间黑名单(cacheReloadTimeBlackList)。
  • 使用cache ALL时,由于需要进行异步Reload,需要增大维表JOIN的节点内存(通常为MaxCompute维表实际大小的4倍,具体值与MaxCompute存储压缩算法有关)。在使用超大MaxCompute维表时,若出现频繁gc导致作业异常,且在增加维表JOIN节点内存仍无改善的情况下,建议改为支持LRU cache策略的KV型维表(如Hbase)。

类型映射

MaxCompute BLINK
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY
STRING VARCHAR
说明 实时计算MaxCompute维表暂时仅支持上述MaxCompute字段类型。

代码示例

包含MaxCompute维表的实时计算作业代码示例如下。

CREATE TABLE datahub_input1 (
 id            BIGINT,
 name        VARCHAR,
 age           BIGINT
) WITH (
type='datahub'
);
CREATE TABLE odps_dim (
  name VARCHAR,
  phoneNumber BIGINT, 
  PRIMARY KEY (name), 
  PERIOD FOR SYSTEM_TIME --定义了维表的标识。
) with (
  type = 'odps',
   endPoint = '<yourEndpointName>', 
   project = '<yourProjectName>',
   tableName = '<yourTableName>',
   accessId = '<yourAccessId>',
   accessKey = '<yourAccessPassword>',
  `partition` = 'ds=20180905',--动态分区、固定分区请参考WITH参数说明。
  cache = 'ALL'
)

CREATE table result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
)with(
type='print'
);

INSERT INTO result_infor
SELECT
t.id
,w.phoneNumber
,t.name
FROM datahub_input1 as t
JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定此声明。
ON t.name = w.name;

常见问题

  • max_pt()max_pt_with_done()的区别是什么?
    max_pt()仅仅是选取的是所有分区中字典序最大的分区。max_pt_with_done()选取的是所有分区中字典序最大,且伴随有.done分区的分区。
    分区列表
    ds=20190101
    ds=20190101.done
    ds=20190102
    ds=20190102.done
    ds=20190103
    示例中max_pt()max_pt_with_done()的区别如下:
    • `partition`='max_pt_with_done()'匹配的分区是ds=20190102
    • `partition`='max_pt()',匹配的分区是ds=20190103
    说明 仅实时计算3.3.2及以上版本支持`partition`='max_pt_with_done()'功能。
  • Q:如何查询维表JOIN的关联率、缓存命中率等Metric信息?
    A:查询步骤如下:
    1. 登录kmonitor
    2. 在左侧导航栏单击 > Dashboard
    3. Add页签,单击Graph
    4. Panel Title列表,选择Edit
    5. Metrics页签,配置参数。
      • Tenant:选择blink
      • Metric:填写Metric名称,格式如blink.<项目名>.<作业名>.dim.<对应Metric名称>。对应Metric名称参见下表。
        参数 参数说明 对应Metric名称
        fetch qps 查询维表总QPS,包括命中和不命中。 blink.projectName.jobName.dimJoin.fetchQPS
        fetchHitQPS 维表命中QPS,包括换成命中和查询物理维表命中。 blink.projectName.jobName.dimJoin.fetchHitQPS
        cacheHitQPS 维表缓存命中QPS。 blink.projectName.jobName.dimJoin.cacheHitQPS
        dimJoin.fetchHit 维表关联率。 blink.projectName.jobName.dimJoin.fetchHit
        dimJoin.cacheHit 维表缓存关联率。 blink.projectName.jobName.dimJoin.cacheHit
  • Q:如何查看MaxCompute分区名?
    A:查看MaxCompute分区名步骤如下:
    1. 数据地图,搜索表名称。
    2. 所有表区域,单击目标表名。
    3. 数据表详情页面,右侧明细信息 > 分区信息 > 分区名查看MaxCompute分区名。
  • Q:任务出现了RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTas的Failover该怎么处理?

    A:实时计算1.0版本中维表JOIN存在一定的问题,推荐升级到实时计算2.1.1及以上版本。如果需要继续使用原有版本,需要对作业进行暂停恢复操作,根据failover history中第一次出现failover的具体报错信息进行排查。

  • Q:公有云的Endpoint和Tunnel Endpoint是指什么?如果配置错误会产生什么结果?
    A:Endpoint和Tunnel Endpoint参数说明参见配置Endpoint。VPC 环境中这两个参数如果配置错误可能会导致任务异常。
    • Endpoint配置错误:任务上线停滞在91%的进度。
    • Tunnel Endpoint配置错误:任务运行失败。