本文为您介绍Spark SQL中的SCAN语法。请注意,这个从EMR-3.23.0开始新增的语法。

为什么需要SCAN语法

SCAN语法定义了如何去读一个Table,它本质上也是一张逻辑表。为什么需要在表定义之外需要再定义SCAN呢?根据Spark DataSource V2的数据模型设计,首先Table不再局限于批数据源或者流数据源,其次出于Multi-Catalog支持的设计,Spark将直接通过External Catalog来解析各种数据源数据。所以:

  • 当我们定义Table时(本质上还是DataSource V1的用法):我们只需要定义Table数据源的基本信息,无需也不该定义如何读这个Table的参数。例如Kafka数据源表,我们只需要定义Kafka连接地址,需要定义的topic即可。无需在Table定义中指定Kafka的读取速率等信息,例如“maxOffsetsPerTrigger”,这些是运行期的参数。
  • 当我们使用External Catalog来提供数据源元信息时,无需显式地在Spark Session中建表。这时,很显然地需要一种语法来定义如何读数据源,这里就是SCAN。

需要注意的是,本语法是支持DataSource V2设计的一种尝试,同时也实现了在Spark SQL中的批流查询统一。例如,我们定义了一个Kafka数据源表,然后定义两个SCAN,分别对应于批读和流读。

最后强调的是:

  • SCAN语法定义了如何去读一个Table,它本质上也是定义了一张逻辑表,在Spark内部为临时视图实例。退出Spark Session后SCAN定义自动清理。
  • SCAN定义的视图,只能用作数据源表,不能作为数据输出表。
  • 可以直接处理原始表,但只能进行批读。当原始表没有批读接口实现,则出错。

语法

CREATE SCAN tbName_alias
ON tbName
USING queryType
OPTIONS (propertyName=propertyValue[,propertyName=propertyValue]*)

queryType支持两种类型:

  • BATCH:将源表tbName按照批读的方式,定义一个临时视图tbName_alias。
  • STREAM:将源表tbName按照流读的方式,定义一个临时视图tbName_alias。

OPTIONS中可以定义数据源读取的运行期参数。运行期参数主要包含那么些如何定义读的行为参数。不同的数据源实现,运行期参数可能是不同的。例如Kafka数据源,“maxOffsetsPerTrigger”就是一种运行期参数。详细地运行期参数可以参考“数据源”一节。

除了数据源特定的参数,还有一些数据源公用的参数可以配置:

参数名 说明 默认值
watermark.column 指定表中哪个字段为事件时间
watermark.delayThreshold 相对于最新数据记录,延迟数据的最小等待时间,例如“1 minute”或者“5 hours”。

示例

  • 首先建一个LogService数据源表
    spark-sql> CREATE TABLE loghub_table_intput_test(content string)
             > USING loghub
             > OPTIONS
             > (...)
  • 离线处理SLS数据,统计截止当前数据条数
    spark-sql> CREATE SCAN loghub_table_intput_test_batch
             > ON loghub_table_intput_test
             > USING BATCH;
    spark-sql> SELECT COUNT(*) FROM loghub_table_intput_test_batch;
  • 流式处理SLS数据
    spark-sql> CREATE TABLE loghub_table_output_test(content string)
             > USING loghub
             > OPTIONS
             > (...)
    
    spark-sql> CREATE SCAN loghub_table_intput_test_stream
             > ON loghub_table_intput_test
             > USING STREAM
             > OPTIONS(
             > "watermark.column"="data_time",
             > "watermark.delayThreshold"="2 minutes")
    
    
    -- 测试一种非法操作:对流表进行select,报错。
    spark-sql> SELECT COUNT(*) FROM loghub_table_test_stream;
    Error in query: Queries with streaming sources must be executed with writeStream.start();; 
    spark-sql> INSERT INTO loghub_table_output_test SELECT content FROM loghub_table_intput_test_stream;