本文主要介绍E-MapReduce Relational Cache的创建、更新和使用等。

概述

Relational Cache是E-MapReduce-3.22.0及以上版本对应的Spark支持的高级特性,类似于传统数据仓库的物化视图,用户可以将任意Table或者View保存为Relational Cache。Relational Cache主要用于查询模式相对比较固定的业务场景,通过提前设计Relational Cache,对数据进行预计算和预组织,从而提高业务查询的速度,常见的使用场景包括MOLAP多维分析、报表生成、数据Dashboard、跨集群数据同步等。Relational Cache可以优化用户查询操作,Spark自动发现可用的Relational Cache,并优化执行计划,对用户完全透明。

Relational Cache相关操作

  • 创建Relational Cache

    CACHE TABLE table_name
      [REFRESH ON (DEMAND | COMMIT)]
      [(ENABLE | DISABLE) REWRITE]
      [USING datasource
      [OPTIONS (key1=val1, key2=val2, ...)]
      [PARTITIONED BY (col_name1, col_name2, ...)]
      [LOCATION customized_path]
      [COMMENT table_comment]
      [TBLPROPERTIES (key1=val1, key2=val2, ...)]]
      [AS select_statement]
    名称 说明
    REFRESH ON DEMAND|COMMIT
    • DEMAND:显式执行REFRESH命令时,才会更新Relational Cache数据。
    • COMMIT:当通过Spark修改原始表数据时,自动更新Relational Cache数据。
      说明 默认为ON DEMAND模式。
    ENABLE | DISABLE QUERY REWRITE
    • EANBLE:允许Relational Cache用于执行计划重写。
    • DISABLE:在任何情况下,不允许Relational Cache用于执行计划重写。
      说明 默认为ENABLE模式。
    USING <data source> 指定Cache数据的存储格式,Spark支持的data source类型包括TEXT、CSV、 JSON、JDBC、PARQUET、ORC、HIVE等。
    PARTITIONED BY 根据指定字段对Cache数据分区,为每个分区创建一个目录。
    LOCATION 用户可以通过LOCATION指定Cache数据存储的位置,表示Cache类型为EXTERNAL。未指定时,Spark将Cache数据存储在spark.sql.warehouse.dir子目录下,Cache类型为MANAGED。在执行DROP TABLE/VIEW或者UNCACHE的时候,对于MANAGED类型的Cache,会同时删除cache元信息和数据,对于EXTERNAL类型的Cache,只会删除Cache元信息,不会删除数据。
    AS select_statement 当没有AS部分时,表或视图必须已经存在,直接Cache数据。当存在AS部分时,创建视图并Cache视图数据。
    示例:
    -- CREATE AND CACHE VIEW
    CACHE TABLE employee_flat_cache
      ENABLE REWRITE
      REFERSH ON COMMIT
      USING PARQUET
      OPTIONS ('compression'='snappy')
      AS SELECT * FROM employee, dep WHERE e_depId = d_depId
    
    CACHE TABLE employee_flat_cache
      USING JSON
      LOCATION '/usr/spark/_CACHED/employee_flat_cache'
      PARTITIONED BY (d_depName)
      AS SELECT * FROM employee, dep WHERE e_depId = d_depId  
    
    -- CREATE VIEW THEN CACHE
    CREATE VIEW employee_flat_cache AS SELECT * FROM employee, dep WHERE e_depId = d_depId
    CACHE TABLE employee_flat_cache
      ENABLE REWRITE
      REFERSH ON COMMIT
      USING PARQUET
      OPTIONS ('compression'='snappy')
  • 修改Cache Refresh策略
    ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
  • 是否允许Cache被用作执行计划重写。

    ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
  • 展示所有Cache列表

    SHOW CACHES
  • 显示Cache的详细信息

    (DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name

Relational Cache的更新

当Cache依赖的源表数据发生改变时,需要同步更新Cache中的数据,如果Cache依赖的源表全部是Spark管理的表(即通过Spark创建的MANAGED类型的表),可以支持ON COMMIT模式的更新,不过对于其他类型的表,例如Hive表,因为可能在Spark之外更新数据,Spark无法保证能够感知到数据的变化,所以只能支持ON DEMAND模式,通过REFRESH命令触发Cache的更新操作,REFRESH命令的语法规范如下:

REFRESH TABLE cache_name [WITH TABLE base_table_name PARTITION (pk1='', pk2='')]

根据最新原始表数据更新Cache,如果Cache支持PCT(Partition Change Tracking),可以指定更新的基表分区信息,进行增量更新。示例如下:

CREATE TABLE dep (d_depId STRING, d_name STRING) USING PARQUET;
CREATE TABLE employee (empId STRING, age INT, e_depId STRING) PARTITIONED BY(e_depId) USING PARQUET;
CACHE TABLE employee_flat_cache
  ENABLE REWRITE
  REFERSH ON DEMAND
  USING PARQUET
  AS SELECT * FROM employee, dep WHERE e_depId = d_depId;
DESC EXTENDED employee_flat_cache;
INSERT INTO TABLE employee PARTITION (e_depId = '10') VALUES ('s101', 30), ('m23', 28);
-- 全量更新
REFRESH TABLE employee_flat_cache;
-- 增量更新
REFRESH TABLE employee_flat_cache WITH TABLE employee PARTITION (e_depId = '10')
			

Relational Cache的使用

  • Relational Cache构建完成后,对用户完全透明,Spark会根据用户提交的查询自动寻找合适的Relational Cache,并改写执行计划,基于Cache中预计算的结果,大大提升查询的响应速度。

  • spark.sql.cache.queryRewrite参数用于控制开启或打开Relational Cache的执行计划重写,用户可以在spark session级别打开或关闭该功能。