本文为您介绍如何 Spark SQL 开发一个流式处理作业。

说明 :EMR-3.23.0(含)后续版本已不建议使用这个模板,但仍然会支持。

查询语句块

由于某些作业相关的参数无法在 SQL 查询语句中很好的表达出来,我们需要在具体的 SQL 查询语句前使用 “SET” 语句进行必要性的参数配置。其中一个重要参数是 streaming.query.name,每个 SQL 查询必须关联一个唯一性的 streaming.query.name 。基于这个 “queryStatement”, 我们可以为每个 SQL 查询配置其他参数,例如 checkpoint 等。我们约定,每个 SQL 查询必须前置 “SET streaming.query.name”,否则会查询出错。所以,一个合法的查询语句块是:
SET streaming.query.name=${queryName};

queryStatement

作业模板

-- dbName:需要将表建在哪个数据库下
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- 创建SLS数据表
-- slsTableName:SLS表的名称
-- logProjectName:LogService的项目名
-- logStoreName:LogService的logstore名
-- accessKeyId:阿里云accessKeyId
-- accessKeySecret:阿里云accessKeySecret
-- endpoint:LogService的logstore所在endpoint,可以参考:服务入口
CREATE TABLE IF NOT EXISTS ${slsTableName}
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- 创建HDFS数据表,需要完成表的列字段定义
-- hdfsTableName:HDFS表的名称
-- location: 存储数据路劲,支持HDFS和OSS路径
-- 数据格式支持:csv, json, orc, parquet等,默认为parquet
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING PARQUET
LOCATION '${location}';

-- 需要为每个流式查询定义一些运行参数,包括:
-- streaming.query.name: 流式查询作业名称
-- spark.sql.streaming.checkpointLocation.${queryName}:本次流式查询作业的checkpoint路径
SET streaming.query.name=${queryName};
SET spark.sql.streaming.query.options.${queryName}.checkpointLocation=${checkpointLocation};
-- 以下为可选参数,按需打开:
-- outputMode:查询结果输出默认,默认为append
-- trigger:查询执行模式,可选ProcessingTime,默认为ProcessingTime
-- trigger.intervalMs:批次间隔,单位毫秒,默认为0
-- SET spark.sql.streaming.query.outputMode.${queryName}=${outputMode};
SET spark.sql.streaming.query.trigger.${queryName}=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.${queryName}=30;

INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM ${slsTableName}
WHERE ${condition}

参数

以下列出几个关键性参数:
参数名 说明 默认值
streaming.query.name 查询名 必须显式配置
spark.sql.streaming.query.options.${queryName}.checkpointLocation 流式作业的 checkpoint 路径 必须显式配置
spark.sql.streaming.query.outputMode.${queryName} 查询的 Output 模式 append
spark.sql.streaming.query.trigger.${queryName} 查询执行模式,当前只支持ProcessingTime模式 ProcessingTime
spark.sql.streaming.query.trigger.intervalMs.${queryName} 查询批次间隔时间,单位毫秒 0