本节介绍流式查询配置的相关概念及配置参数。

查询配置

在使用Spark SQL进行流式查询前,您需要了解以下两个概念:

  • 数据源配置:即Table的定义。
  • 查询实例配置:具体每个Stream Query运行时的参数配置。

Table的定义只包含数据源的配置,例如,Kafka数据源的连接地址、Topic名等。由于我们可以对一个Table同时做多个业务无关的查询,所以Table定义中不应该包含具体的查询实例的运行配置。

每一个查询实例均需要单独配置。为了减少对查询SQL进行不必要的修改,我们给查询实例设置了一个queryName。通过queryName,我们可配置各个查询实例的运行参数。查询实例参数设置使用的是SET语法,详情请参见配置说明

查询配置约定:每个查询实例的queryName均为SQL上下文中最近的一个,示例说明如下:

  • 情况一
    SET streaming.query.name=one_test_job
    
    -- query 1
    INSERT INO tb_test_1 SELECT ...
    
    -- query 2
    INSERT INO tb_test_2 SELECT ...
    
    -- 以上query1和query2的queryName都是"one_test_job",当然这是一种非法情况,每个查询实例必须有唯一queryName
  • 情况二
    SET streaming.query.name=one_test_job_1
    
    SET streaming.query.name=one_test_job_2
    
    -- query 1
    CREATE TABLE tb_test_1 AS SELECT ...
    
    -- query1的queryName是"one_test_job_2"

查询实例包括:

  • INSERT INTO ...
  • CREATE TABLE ... AS SELECT ...

配置说明

配置类别 对应于DataFrame API SQL配置格式 说明 是否必选
queryName writeStream.queryName(...) SET streaming.query.name=$queryName 每个Stream Query的名称,各个Query的配置项会根据名称来区别。
option writeStream.option(...) SET spark.sql.streaming.query.options.$queryName.$optionName=$optionValue checkpointLocation:checkpoint目录。
自定义。
outputMode writeStream.outputMode(...) SET spark.sql.streaming.query.outputMode.$queryName=$outputMode output模式,默认为append模式。
trigger writeStream.trigger(...) SET spark.sql.streaming.query.trigger.$queryName=$triggerType trigger模式,默认为ProcessingTime
说明 当前只支持ProcessingTime模式。
SET spark.sql.streaming.query.trigger.intervalMs.$queryName=$intervalMs trigger间隔,单位毫秒,默认为0。