本文介绍流式查询配置的相关概念及配置参数。
查询配置
在使用Spark SQL进行流式查询前,您需要了解以下两个概念:
- 数据源配置:即Table的定义。
Table的定义只包含数据源的配置,例如,Kafka数据源的连接地址和Topic名等。因为您可以对一个Table同时做多个业务无关的查询,所以Table定义中不应该包含具体的查询实例的运行配置。
- 查询实例配置:具体每个Stream Query运行时的参数配置。
每一个查询实例均需要单独配置。通过queryName,可以减少对查询SQL进行不必要的修改。查询实例参数设置使用的是
SET
语法,详情请参见配置说明。
查询实例包括:
INSERT INTO ...
CREATE TABLE ... AS SELECT ...
每个查询实例的queryName均为SQL上下文中最近的一个,查询示例说明如下:
- 情况一
SET streaming.query.name=one_test_job -- query 1 INSERT INTO tb_test_1 SELECT ... -- query 2 INSERT INTO tb_test_2 SELECT ... -- 以上query1和query2的queryName都是"one_test_job",当然这是一种非法情况,每个查询实例必须有唯一queryName。
- 情况二
SET streaming.query.name=one_test_job_1 SET streaming.query.name=one_test_job_2 -- query 1 CREATE TABLE tb_test_1 AS SELECT ... -- query1的queryName是"one_test_job_2"。
配置说明
配置类别 | 对应于DataFrame API | SQL配置格式 | 说明 | 是否必选 |
---|---|---|---|---|
queryName | writeStream.queryName(...) | SET streaming.query.name=$queryName | 每个Stream Query的名称,各个Query的配置项会根据名称来区分。 | 是 |
option | writeStream.option(...) | SET spark.sql.streaming.query.options.$queryName.$optionName=$optionValue | checkpointLocation:checkpoint目录。 | 是 |
自定义。 | 否 | |||
outputMode | writeStream.outputMode(...) | SET spark.sql.streaming.query.outputMode.$queryName=$outputMode | output模式,默认为append模式。 | 否 |
trigger | writeStream.trigger(...) | SET spark.sql.streaming.query.trigger.$queryName=$triggerType | trigger模式,默认为ProcessingTime。 | 否 |
SET spark.sql.streaming.query.trigger.intervalMs.$queryName=$intervalMs | trigger间隔,单位毫秒,默认为0。 | 否 |