EMR-3.23.0版本开始支持SCAN语法。

背景信息

SCAN语法支持DataSource V2设计,实现在Spark SQL中的统一批流查询。例如,您可以定义一个Kafka数据源表,然后定义两个SCAN,分别对应批式读和流式读。

SCAN语法定义Table时只需要定义Table数据源的基本信息,无需定义如何读这个Table的参数。SCAN语法约束如下:
  • SCAN语法定义的视图,仅能用作数据源表,不可作为数据输出表。
  • SCAN语法定义可以直接处理原始表,但仅能进行批式读。

语法

CREATE SCAN tbName_alias
ON tbName
USING queryType
OPTIONS (propertyName=propertyValue[,propertyName=propertyValue]*)
queryType支持两种类型:
  • BATCH:将源表tbName按照批读的方式,定义一个临时视图tbName_alias。
  • STREAM:将源表tbName按照流读的方式,定义一个临时视图tbName_alias。

OPTIONS中可以定义数据源读取的运行期参数,针对不同的数据源,运行期参数不同。运行期参数主要包含如何定义流式读的行为参数。

示例

  1. 建一个LogService数据源表。
    spark-sql> CREATE TABLE loghub_table_intput_test(content string)
             > USING loghub
             > OPTIONS
             > (...)
  2. 离线处理SLS数据,统计截止当前数据条数。
    spark-sql> CREATE SCAN loghub_table_intput_test_batch
             > ON loghub_table_intput_test
             > USING BATCH;
    spark-sql> SELECT COUNT(*) FROM loghub_table_intput_test_batch;
  3. 流式处理SLS数据。
    spark-sql> CREATE TABLE loghub_table_output_test(content string)
             > USING loghub
             > OPTIONS
             > (...);
    
    spark-sql> CREATE SCAN loghub_table_intput_test_stream
             > ON loghub_table_intput_test
             > USING STREAM;
    测试非法操作:例如对流表进行Select。
    spark-sql> SELECT COUNT(*) FROM loghub_table_test_stream;
    报错如下所示。
    Error in query: Queries with streaming sources must be executed with writeStream.start();
    正确对流表进行Select语句如下。
    spark-sql> INSERT INTO loghub_table_output_test SELECT content FROM loghub_table_intput_test_stream;