本文为您介绍如何使用Spark SQL开发一个流式处理作业。

说明 :EMR-3.23.0(含)后续版本已不建议使用这个模板,但仍然会支持。

查询语句块

类似streaming.query.name等作业参数无法通过SQL表达,因此需要在SQL查询语句前使用SET进行必要的参数配置。

合法的查询语句块如下。
SET streaming.query.name=${queryName};
queryStatement

作业模板

-- 创建数据库。
-- dbName:数据库名。
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- 创建Log Service数据表。
-- slsTableName:Log Service表的名称。
-- logProjectName:LogService的项目名。
-- logStoreName:LogService的LogStore名。
-- accessKeyId:阿里云账号的AccessKey ID。
-- accessKeySecret:阿里云账号的AccessKey Secret。
-- endpoint:LogService的LogStore所在Endpoint。
-- 当显式指定LogStore的各个字段时,必须定义为STRING类型。
-- 保留6个系统字段:(`__logProject__` STRING, `__logStore__` STRING, `__shard__` INT, `__time__` TIMESTAMP, `__topic__` STRING, `__source__` STRING)
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- 创建HDFS数据表,需要完成表的列字段定义。
-- hdfsTableName:HDFS表的名称。
-- location: 存储数据路径,支持HDFS和OSS路径。
-- 数据格式支持:delta, csv, json, orc, parquet等,默认为delta。
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING delta
LOCATION '${location}';

-- 需要为每个流式查询定义一些运行参数。
-- streaming.query.name: 流式查询作业名称。
-- spark.sql.streaming.checkpointLocation.${queryName}:本次流式查询作业的Checkpoint路径。
SET streaming.query.name=${queryName};
SET spark.sql.streaming.query.options.${queryName}.checkpointLocation=${checkpointLocation};
-- 以下为可选参数。
-- outputMode:查询结果输出方式,默认为append。
-- trigger:查询执行模式,可选ProcessingTime,默认为ProcessingTime。
-- trigger.intervalMs:批次间隔,单位毫秒,默认为0。
-- SET spark.sql.streaming.query.outputMode.${queryName}=${outputMode};
SET spark.sql.streaming.query.trigger.${queryName}=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.${queryName}=30;

INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM ${slsTableName}
WHERE ${condition};
说明 Endpoint请参见服务入口

参数

以下列出几个关键性参数。
参数 说明 默认值
streaming.query.name 查询名。 必须显式配置。
spark.sql.streaming.query.options.${queryName}.checkpointLocation 流式作业的Checkpoint路径。 必须显式配置。
spark.sql.streaming.query.outputMode.${queryName} 查询的Output模式。 append
spark.sql.streaming.query.trigger.${queryName} 查询执行模式,当前只支持ProcessingTime模式。 ProcessingTime
spark.sql.streaming.query.trigger.intervalMs.${queryName} 查询批次间隔时间,单位毫秒。 0